Amazon EMR
Amazon EMR 版本指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

配置 Spark

在创建集群时,可以使用配置分类来配置 Amazon EMR 上的 Spark。有关使用配置分类的更多信息,请参阅配置应用程序

Amazon EMR 上的 Spark 的配置分类包括:

  • spark — 将 maximizeResourceAllocation 属性设置为 true 或 false。在设置为 true 时,Amazon EMR 自动基于集群硬件配置来配置 spark-default 属性。有关更多信息,请参阅 使用 maximizeResourceAllocation

  • spark-defaults — 在 spark-defaults.conf 文件中设置值。有关更多信息,请参阅 Spark 文档中的 Spark 配置

  • spark-env — 在 spark-env.sh 文件中设置值。有关更多信息,请参阅 Spark 文档中的环境变量

  • spark-hive-site — 在 hive-site.xml 中为 Spark 设置值。

  • spark-log4j — 在 log4j.properties 文件中设置值。有关设置和更多信息,请参阅 Github 上的 log4j.properties.template 文件。

  • spark-metrics — 在 metrics.properties 文件中设置值。有关设置和更多信息,请参阅 Github 上的 metrics.properties.template 文件和 Spark 文档中的指标

Amazon EMR 设置的 Spark 默认值

下表说明 Amazon EMR 如何在 spark-default 中设置影响应用程序的默认值。

Amazon EMR 设置的 Spark 默认值

设置 描述 Value
spark.executor.memory 每个执行程序进程要使用的内存量。 (例如,1g、2g)

基于集群中的从属实例类型配置设置。

spark.executor.cores 要对每个执行程序使用的内核的数量。 基于集群中的从属实例类型配置设置。
spark.dynamicAllocation.enabled 是否使用动态资源分配,这将基于工作负载增大和减小注册到应用程序的执行程序的数目。

true (emr-4.4.0 或更高版本)

注意

Spark Shuffle Service 自动由 Amazon EMR 配置。

使用 maximizeResourceAllocation

可以将执行程序配置为使用集群中每个节点上尽可能多的资源,方式是在创建集群时使用 spark 配置分类将 maximizeResourceAllocation 选项设置为 true。此特定于 EMR 的选项计算核心实例组中的实例上的执行程序可用的最大计算和内存资源。然后,它根据此信息设置相应的 spark-defaults 设置。

[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]

启用 maximizeResourceAllocation 时在 spark-defaults 中配置的设置

设置 描述 Value
spark.default.parallelism 在用户未设置的情况下由转换 (如联接、reduceByKey 和并行化) 返回的 RDD 中的分区数。

对 YARN 容器可用的 CPU 内核数的 2 倍。

spark.driver.memory 要用于驱动程序进程 (即初始化 SparkContext) 的内存量。 (例如,1g、2g)。

基于集群中的实例类型配置设置。但是,由于 Spark 驱动程序可在主实例或某个核心实例 (例如,分别在 YARN 客户端和集群模式中) 上运行,因此将根据这两个实例组中的实例类型的较小者进行设置。

spark.executor.memory 每个执行程序进程要使用的内存量。 (例如,1g、2g)

基于集群中的从属实例类型配置设置。

spark.executor.cores 要对每个执行程序使用的内核的数量。 基于集群中的从属实例类型配置设置。
spark.executor.instances 执行程序数。

基于集群中的从属实例类型配置设置。除非同时将 spark.dynamicAllocation.enabled 显式设置为 true,否则将设置。

启用执行者动态分配

YARN 上的 Spark 有能力动态扩展用于 Spark 应用程序的执行者数量。如果使用的是 Amazon EMR 发行版 4.4.0 及更高版本,则默认情况下,将启用动态分配。有关更多信息,请参阅 Spark 文档中的动态资源分配动态分配的属性。

配置节点退役行为

在使用 Amazon EMR 发布版 5.9.0 或更高版本时,Amazon EMR 上的 Spark 包含一组功能,有助于确保 Spark 正常处理手动调整大小或自动扩展策略请求引起的节点终止。Amazon EMR 在 Spark 中实施黑名单机制,该机制的基础是 YARN 的退役机制。此机制有助于确保不会在即将停用的节点上计划新任务,同时允许正在运行的任务完成。此外,有些功能可以在节点终止导致随机数据块丢失时帮助更快地恢复 Spark 任务。可以更快触发并优化重新计算进程,从而加快重新计算和减少阶段重试,并防止因丢失随机数据块引发的提取失败所导致的任务失败。

重要

Amazon EMR 发布版 5.11.0 中增加了 spark.decommissioning.timeout.threshold 设置,用于提升使用 Spot 实例时的 Spark 复原能力。在早期发布版中,当节点使用 Spot 实例且该实例因出价而终止时,Spark 可能无法正常地处理终止。任务可能失败,而且随机重新计算可能花费大量时间。为此,如果您使用 Spot 实例,我们建议使用发布版 5.11.0 或更高版本。

Spark 节点退役设置

设置 描述 默认值

spark.blacklist.decommissioning.enabled

当设置为 true,YARN 中的 Spark 黑名单节点处于 decommissioning 状态。Spark 不在于该节点上运行的执行程序上安排新任务。允许已经在运行的任务完成。

true

spark.blacklist.decommissioning.timeout

处于 decommissioning 状态的节点被加入黑名单的时间量。默认情况下,此值设置成一小时,这也是 yarn.resourcemanager.decomissioning.timeout 的默认值。要确保节点在其整个退役周期内都处于黑名单中,请将此值设置为等于或大于 yarn.resourcemanager.decommissioning.timeout。在退役超时过期后,节点转为 decommissioned 状态,并且 Amazon EMR 可能终止节点的 EC2 实例。如果超时过期后有任何任务仍在运行,这些任务会丢失或被终止并在于其他节点上运行的其他执行程序上重新安排。

1h

spark.decommissioning.timeout.threshold

在 Amazon EMR 发布版 5.11.0 或更高版本中可用。以秒为单位指定。当某个节点转换为停用状态时,如果主机将在等于或小于此值的时段后停用,Amazon EMR 不仅会将此节点加入黑名单,还会清除主机状态 (由 spark.resourceManager.cleanupExpiredHost 指定),而不会等待此节点转换为停用状态。这使 Spark 能够更好地处理 Spot 实例终止,因为无论 yarn.resourcemager.decommissioning.timeout 的值如何,Spot 实例都会在 20 秒的超时时间后停用,因此可能没有足够的时间提供其他节点来随机读取文件。

20s

spark.resourceManager.cleanupExpiredHost

当设置为 true 时,Spark 会注销处于 decommissioned 状态的节点上的执行程序存储的所有已缓存数据和随机数据块。这会加速恢复过程。

true

spark.stage.attempt.ignoreOnDecommissionFetchFailure

当设置为 true 时,有助于防止 Spark 进入失败阶段并因为从退役节点获取失败次数过多而导致任务失败。从处于 decommissioned 状态的节点获取随机数据块失败的次数不会计入连续获取失败的最大数量。

true

Spark ThriftServer 环境变量

Spark 将 Hive Thrift 服务器端口环境变量 HIVE_SERVER2_THRIFT_PORT 设置为 10001。

更改 Spark 默认设置

spark-defaults.conf 配置分类中创建集群或 spark-defaults 设置时,可使用 maximizeResourceAllocation 配置分类更改 spark 中的默认值。

以下过程说明如何使用 CLI 或控制台修改设置。

使用 CLI 创建一个 spark.executor.memory 设为 2G 的集群

  • 使用以下命令创建一个安装了 Spark 且 spark.executor.memory 设为 2G 的集群,该集群引用存储在 Amazon S3 中的文件 myConfig.json

    aws emr create-cluster --release-label emr-5.14.0 --applications Name=Spark \ --instance-type m4.large --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json

    注意

    包含了 Linux 行继续符 (\) 以提高可读性。可以在 Linux 命令中删除或使用它们。对于 Windows,请删除它们或将其替换为脱字号 (^)。

    myConfig.json

    [ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]

使用控制台创建一个 spark.executor.memory 设为 2G 的集群

  1. Open the Amazon EMR console at https://console.amazonaws.cn/elasticmapreduce/.

  2. Choose Create cluster, Go to advanced options.

  3. 选择 Spark

  4. Edit software settings (编辑软件设置) 下,将 Enter configuration (输入配置) 保留选中状态并输入以下配置:

    classification=spark-defaults,properties=[spark.executor.memory=2G]
  5. 选择其他选项,选择,然后选择 Create cluster (创建集群)。

设置 maximizeResourceAllocation

  • 使用 AWS CLI 创建一个安装了 Spark 且 maximizeResourceAllocation 设为 true 的集群,该集群引用存储在 Amazon S3 中的文件 myConfig.json

    aws emr create-cluster --release-label emr-5.14.0 --applications Name=Spark \ --instance-type m4.large --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json

    注意

    包含了 Linux 行继续符 (\) 以提高可读性。可以在 Linux 命令中删除或使用它们。对于 Windows,请删除它们或将其替换为脱字号 (^)。

    myConfig.json

    [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]