Amazon EMR
Amazon EMR 版本指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

Amazon EMR 的早期 AMI 版本的 Spark 应用程序细节

以交互方式或批处理模式使用 Spark

Amazon EMR 可让您以两种模式运行 Spark 应用程序:

  • 交互式

  • 批处理

当您使用控制台或 AWS CLI 启动长时间运行的集群时,可以使用 SSH 以 Hadoop 用户身份连接到主节点,并使用 Spark 外壳以交互方式开发并运行 Spark 应用程序。与批处理环境相比,以交互方式使用 Spark 能够让您更轻松地对 Spark 应用程序进行原型设计或测试。在交互模式下成功修改 Spark 应用程序后,您可以将该应用程序 JAR 或 Python 程序放到 Amazon S3 上集群主节点的本地文件系统上。然后,您可以将应用程序作为批处理工作流程提交。

在批处理模式中,将 Spark 脚本上传到 Amazon S3 或本地主节点文件系统,然后将此工作作为步骤提交到集群。Spark 步骤可提交到长时间运行的集群或暂时性集群。

创建安装了 Spark 的集群

使用控制台启动安装了 Spark 的集群

  1. 通过以下网址打开 Amazon EMR 控制台:https://console.amazonaws.cn/elasticmapreduce/

  2. 选择 Create cluster

  3. 对于 Software Configuration (软件配置),请选择您需要的 AMI 发布版。

  4. 对于 Applications to be installed (要安装的应用程序),从列表中选择 Spark,然后选择 Configure and add (配置并添加)

  5. 添加参数以按需更改 Spark 配置。有关更多信息,请参阅配置 Spark。选择 Add

  6. 根据需要选择其他选项,然后选择 Create cluster (创建集群)

以下示例演示如何使用 Java 创建带 Spark 的集群:

AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials); SupportedProductConfig sparkConfig = new SupportedProductConfig() .withName("Spark"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("Spark Cluster") .withAmiVersion("3.11.0") .withNewSupportedProducts(sparkConfig) .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myKeyName") .withInstanceCount(1) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType("m3.xlarge") .withSlaveInstanceType("m3.xlarge") ); RunJobFlowResult result = emr.runJobFlow(request);

配置 Spark

您在创建集群时通过运行位于 Github 上的 awslabs/emr-bootstrap-actions/spark 存储库的引导操作配置 Spark。有关引导操作接受的参数,请参阅存储库中的 README。引导操作配置 $SPARK_CONF_DIR/spark-defaults.conf 文件中的属性。有关设置的更多信息,请参阅 Spark 文档中的 Spark 配置主题。您可以将以下 URL 中的“latest”替换为您要安装的 Spark 的版本号,例如,2.2.0 http://spark.apache.org/docs/latest/configuration.html

您也可以在每次提交应用程序时动态配置 Spark。使用 spark 配置文件提供了便于执行程序自动充分利用资源分配的设置。有关更多信息,请参阅覆盖 Spark 默认配置设置

更改 Spark 默认设置

以下示例演示如何使用 AWS CLI 创建 spark.executor.memory 设置为 2G 的集群。

注意

包含了 Linux 行继续符 (\) 以提高可读性。可以在 Linux 命令中删除或使用它们。对于 Windows,请删除它们或将其替换为脱字号 (^)。

aws emr create-cluster --name "Spark cluster" --ami-version 3.11.0 \ --applications Name=Spark, Args=[-d,spark.executor.memory=2G] --ec2-attributes KeyName=myKey \ --instance-type m3.xlarge --instance-count 3 --use-default-roles

向 Spark 提交工作

要向集群提交工作,请使用步骤在 EMR 集群上运行 spark-submit 脚本。使用 AmazonElasticMapReduceClient 中的 addJobFlowSteps 方法添加此步骤:

AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials); StepFactory stepFactory = new StepFactory(); AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(); req.withJobFlowId("j-1K48XXXXXXHCB"); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); StepConfig sparkStep = new StepConfig() .withName("Spark Step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(stepFactory.newScriptRunnerStep("/home/hadoop/spark/bin/spark-submit","--class","org.apache.spark.examples.SparkPi","/home/hadoop/spark/lib/spark-examples-1.3.1-hadoop2.4.0.jar","10")); stepConfigs.add(sparkStep); req.withSteps(stepConfigs); AddJobFlowStepsResult result = emr.addJobFlowSteps(req);

覆盖 Spark 默认配置设置

建议您为不同的应用程序覆盖 Spark 默认配置值。您可以在提交应用程序时使用步骤完成此操作 (实质上是向 spark-submit 传递选项)。例如,您可能需要通过更改 spark.executor.memory 来更改为执行者进程分配的内存。您可以为 --executor-memory 开关提供与下类似的参数:

/home/hadoop/spark/bin/spark-submit --executor-memory 1g --class org.apache.spark.examples.SparkPi /home/hadoop/spark/lib/spark-examples*.jar 10

同样地,您也可以调节 --executor-cores--driver-memory。在步骤中,您可以向步骤提供以下参数:

--executor-memory 1g --class org.apache.spark.examples.SparkPi /home/hadoop/spark/lib/spark-examples*.jar 10

您还可以使用 --conf 选项调节没有内置开关的设置。有关可调节的其他设置的更多信息,请参阅 Apache Spark 文档中的动态加载 Spark 属性主题。