Amazon EMR
Amazon EMR 版本指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

访问 Spark 外壳

Spark 外壳基于 Scala REPL (读取-求值-输出-循环)。它让您能够以交互方式创建 Spark 程序并将工作提交到框架。您可以通过 SSH 连接主节点并调用 spark-shell,从而访问 Spark 外壳。有关连接主节点的更多信息,请参阅 Amazon EMR 管理指南 中的使用 SSH 连接到主节点。以下示例使用存储在 Amazon S3 中的 Apache HTTP Server 访问日志。

注意

这些示例中使用的存储桶对能够访问 US East (N. Virginia) 的客户端可用。

默认情况下,Spark 外壳创建其自己的 SparkContext 对象 (称作 sc)。如果 REPL 中需要,您可以使用此上下文。sqlContext 也可在此外壳中使用,它是一种 HiveContext

例 使用 Spark 外壳统计存储在 Amazon S3 中的某个文件中的某个字符串的出现次数

本示例使用 sc 读取 Amazon S3 中的 textFile。

scala> sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@404721db scala> val textFile = sc.textFile("s3://elasticmapreduce/samples/hive-ads/tables/impressions/dt=2009-04-13-08-05/ec2-0-51-75-39.amazon.com-2009-04-13-08-05.log")

Spark 创建 textFile 及关联的数据结构。然后,示例会统计此日志文件中包含字符串“cartoonnetwork.com”的行数:

scala> val linesWithCartoonNetwork = textFile.filter(line => line.contains("cartoonnetwork.com")).count() linesWithCartoonNetwork: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23 <snip> <Spark program runs> scala> linesWithCartoonNetwork res2: Long = 9

例 使用基于 Python 的 Spark 外壳统计存储在 Amazon S3 中的某个文件中的某个字符串的出现次数

Spark 还包含一个基于 Python 的外壳 pyspark,您可以用它来设计以 Python 编写的 Spark 程序的原型。与使用 spark-shell 的方法一样,在主节点上调用 pyspark 即可;它包含同样的 SparkContext 对象。

>>> sc <pyspark.context.SparkContext object at 0x7fe7e659fa50> >>> textfile = sc.textFile("s3://elasticmapreduce/samples/hive-ads/tables/impressions/dt=2009-04-13-08-05/ec2-0-51-75-39.amazon.com-2009-04-13-08-05.log")

Spark 创建 textFile 及关联的数据结构。然后,示例会统计此日志文件中包含字符串“cartoonnetwork.com”的行数。

>>> linesWithCartoonNetwork = textfile.filter(lambda line: "cartoonnetwork.com" in line).count() 15/06/04 17:12:22 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries 15/06/04 17:12:22 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev EXAMPLE] 15/06/04 17:12:23 INFO fs.EmrFileSystem: Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation <snip> <Spark program continues> >>> linesWithCartoonNetwork 9