配置 Spark - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

配置 Spark

您可以使用配置分类配置 Amazon EMR 上的 Spark。有关使用配置分类的更多信息,请参阅 配置应用程序

Amazon EMR 上的 Spark 的配置分类包括如下:

  • spark – 将 maximizeResourceAllocation 属性设置为 true 或 false。在设置为 true 时,Amazon EMR 将基于集群硬件配置自动配置 spark-defaults 属性。有关更多信息,请参阅使用 maximizeResourceAllocation

  • spark-defaults – 在 spark-defaults.conf 文件中设置值。有关更多信息,请参阅 Spark 文档中的 Spark 配置

  • spark-env – 在 spark-env.sh 文件中设置值。有关更多信息,请参阅 Spark 文档中的环境变量

  • spark-hive-site – 在 hive-site.xml 中为 Spark 设置值。

  • spark-log4j –(Amazon EMR 版本 6.7.x 及更低版本)在 log4j.properties 文件中设置值。有关更多信息,请参阅 Github 上的 log4j.properties.template 文件。

  • spark-log4j2 –(Amazon EMR 版本 6.8.0 及更高版本)在 log4j2.properties 文件中设置值。有关更多信息,请参阅 Github 上的 log4j2.properties.template 文件。

  • spark-metrics – 在 metrics.properties 文件中设置值。有关设置和更多信息,请参阅 Github 上的 metrics.properties.template 文件和 Spark 文档中的指标

注意

如果您要将 Spark 工作负载从另一个平台迁移到 Amazon EMR,我们建议您在添加自定义配置之前通过Amazon EMR 设置的 Spark 默认值检测工作负载。大多数客户都通过我们的默认设置见证了性能有所改善

Amazon EMR 设置的 Spark 默认值

下表说明 Amazon EMR 如何在 spark-defaults 中设置影响应用程序的默认值。

Amazon EMR 设置的 Spark 默认值
设置 描述 默认值
spark.executor.memory

每个执行程序进程要使用的内存量。例如,1g2g

此设置由集群中的核心实例和任务实例类型决定。

spark.executor.cores

要对每个执行程序使用的内核的数量。

此设置由集群中的核心实例和任务实例类型决定。

spark.dynamicAllocation.enabled

如果为 true,则使用动态资源分配,以基于工作负载增大和减小注册到应用程序的执行程序的数目。

true(使用 Amazon EMR 4.4.0 及更高版本)

注意

Spark Shuffle Service 由 Amazon EMR 自动配置。

spark.sql.hive.advancedPartitionPredicatePushdown.enabled

如果为 true,则启用高级分区谓词下推到 Hive 元数据仓。

true
spark.sql.hive.stringLikePartitionPredicatePushdown.enabled

startsWithcontainsendsWith 筛选条件向下推送到 Hive 元数据仓中。

注意

Glue 不支持 startsWithcontainsendsWith 谓词下推。如果您使用的是 Glue 元数据仓,并且由于这些函数的谓词向下推而遇到错误,请将此配置设置为 false

true

在 Amazon EMR 6.1.0 上配置 Spark 垃圾回收

使用 spark.driver.extraJavaOptionsspark.executor.extraJavaOptions 设置自定义垃圾回收配置会导致 Amazon EMR 6.1 中的驱动程序或执行程序启动失败,因为垃圾回收配置与 Amazon EMR 6.1.0 的配置存在冲突。对于 Amazon EMR 6.1.0,默认垃圾回收配置通过 spark.driver.defaultJavaOptionsspark.executor.defaultJavaOptions 设置。此配置仅适用于 Amazon EMR 6.1.0。与垃圾回收无关的 JVM 选项(例如用于配置日志记录(-verbose:class)的选项)仍然可以通过 extraJavaOptions 设置。有关更多信息,请参阅 Spark 应用程序属性

使用 maximizeResourceAllocation

您可以使用 spark 配置分类将 maximizeResourceAllocation 设置为 true,以将执行程序配置为使用集群中的每个节点上尽可能多的资源。maximizeResourceAllocation 特定于 Amazon EMR。当您启用 maximizeResourceAllocation,EMR 会计算核心实例组中实例上的执行程序可用的最大计算和内存资源。然后,它将根据计算出的最大值设置相应的 spark-defaults 设置。

注意

您不应在集群上将 maximizeResourceAllocation 选项与其他分布式应用程序(如 HBase)一起使用。Amazon EMR 对分布式应用程序使用自定义 YARN 配置,这可能与 maximizeResourceAllocation 冲突并导致 Spark 应用程序失败。

以下是一个 maximizeResourceAllocation 设置为 true 的 Spark 分类配置。

[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
启用 spark-defaults时在 maximizeResourceAllocation 中配置的设置
设置 描述 Value
spark.default.parallelism 在用户未设置的情况下由转换 (如联接、reduceByKey 和并行化) 返回的 RDD 中的分区数。

对 YARN 容器可用的 CPU 内核数的 2 倍。

spark.driver.memory 要用于驱动程序进程(即初始化 SparkContext)的内存量。(例如,1g、2g)。

基于集群中的实例类型配置设置。但是,由于 Spark 驱动程序可在主实例或某个核心实例(例如,分别在 YARN 客户端和集群模式中)上运行,因此将根据这两个实例组中的实例类型的较小者进行设置。

spark.executor.memory 每个执行者进程要使用的内存量。(例如,1g、2g)

基于集群中的核心和任务实例类型配置设置。

spark.executor.cores 要对每个执行程序使用的内核的数量。 基于集群中的核心和任务实例类型配置设置。
spark.executor.instances 执行程序数。

基于集群中的核心和任务实例类型配置设置。除非同时将 spark.dynamicAllocation.enabled 显式设置为 true,否则将设置。

配置节点停用行为

在使用 Amazon EMR 发行版 5.9.0 或更高版本时,Amazon EMR 上的 Spark 包含一组功能,有助于确保 Spark 正常处理因手动调整大小或自动扩展策略请求引起的节点终止。Amazon EMR 会在 Spark 中实施拒绝名单机制,该机制的基础是 YARN 停用机制。此机制有助于确保不会在即将停用的节点上计划新任务,同时允许正在运行的任务完成。此外,有些功能可以在节点终止导致随机数据块丢失时帮助更快地恢复 Spark 任务。可以更快触发并优化重新计算进程,从而加快重新计算和减少阶段重试,并防止因丢失随机数据块引发的提取失败所导致的任务失败。

重要

Amazon EMR 发行版 5.11.0 中添加了 spark.decommissioning.timeout.threshold 设置,用于提升使用竞价型实例时的 Spark 恢复能力。在早期发行版中,当节点使用竞价型实例且该实例因出价而终止时,Spark 可能无法正常地处理终止。任务可能失败,而且随机重新计算可能花费大量时间。为此,如果您使用竞价型实例,建议使用发行版 5.11.0 或更高版本。

Spark 节点停用设置
设置 描述 默认值

spark.blacklist.decommissioning.enabled

当设置为 true,YARN 中的 Spark 拒绝名单节点处于 decommissioning 状态。Spark 不在于该节点上运行的执行程序上安排新任务。允许已经在运行的任务完成。

true

spark.blacklist.decommissioning.timeout

处于 decommissioning 状态的节点被加入拒绝名单的时间量。默认情况下,此值设置成一小时,这也是 yarn.resourcemanager.decommissioning.timeout 的默认值。要确保节点在其整个退役周期内都处于拒绝名单中,请将此值设置为等于或大于 yarn.resourcemanager.decommissioning.timeout。在停用超时过期后,节点转为 decommissioned 状态,并且 Amazon EMR 可能终止节点的 EC2 实例。如果超时过期后有任何任务仍在运行,这些任务会丢失或被终止并在于其他节点上运行的其他执行程序上重新安排。

1h

spark.decommissioning.timeout.threshold

在 Amazon EMR 发行版 5.11.0 或更高版本中可用。以秒为单位指定。当某个节点转为停用状态时,如果主机将在等于或小于此值的时段后停用,Amazon EMR 不仅会将此节点加入拒绝名单,还会清除主机状态(通过 spark.resourceManager.cleanupExpiredHost 指定),而不会等待此节点转换为停用状态。这使 Spark 能够更好地处理竞价型实例终止,因为无论 yarn.resourcemager.decommissioning.timeout 的值如何,Spot 实例都会在 20 秒的超时时间后停用,因此可能没有足够的时间提供其他节点来随机读取文件。

20s

spark.resourceManager.cleanupExpiredHost

当设置为 true 时,Spark 会注销处于 decommissioned 状态的节点上的执行程序存储的所有已缓存数据和随机数据块。这会加速恢复过程。

true

spark.stage.attempt.ignoreOnDecommissionFetchFailure

当设置为 true 时,有助于防止 Spark 进入失败阶段并因为从退役节点获取失败次数过多而导致任务失败。从处于 decommissioned 状态的节点获取随机数据块失败的次数不会计入连续获取失败的最大数量。

true

Spark ThriftServer 环境变量

Spark 将 Hive Thrift 服务器端口环境变量 HIVE_SERVER2_THRIFT_PORT 设置为 10001。

更改 Spark 默认设置

您可以使用 spark-defaults 配置分类或 spark 配置分类中的 maximizeResourceAllocation 设置更改 spark-defaults.conf 中的默认值。

以下过程说明如何使用 CLI 或控制台修改设置。

使用 CLI 创建一个 spark.executor.memory 设为 2g 的集群
  • 使用以下命令创建一个安装了 Spark 且 spark.executor.memory 设为 2g 的集群,该集群引用存储在 Amazon S3 中的 myConfig.json 文件。

    aws emr create-cluster --release-label emr-5.36.1 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    注意

    为了便于读取,包含 Linux 行继续符(\)。它们可以通过 Linux 命令删除或使用。对于 Windows,请将它们删除或替换为脱字号(^)。

    myConfig.json:

    [ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]
使用控制台创建一个 spark.executor.memory 设为 2g 的集群
  1. 导航到 Amazon EMR 新控制台,然后从侧面导航栏中选择切换到旧控制台。有关切换到旧控制台后预期情况的更多信息,请参阅 Using the old console

  2. 依次选择 Create cluster (创建集群)Go to advanced options (转到高级选项)

  3. 选择 Spark

  4. Edit software settings (编辑软件设置) 下,将 Enter configuration (输入配置) 保留选中状态并输入以下配置:

    classification=spark-defaults,properties=[spark.executor.memory=2G]
  5. 选择其他选项,选择 ,然后选择 Create cluster (创建集群)

设置 maximizeResourceAllocation
  • 使用 Amazon CLI 创建一个安装了 Spark 且 maximizeResourceAllocation 设为 true 的集群,该集群引用存储在 Amazon S3 中的 myConfig.json 文件 。

    aws emr create-cluster --release-label emr-5.36.1 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    注意

    为了便于读取,包含 Linux 行继续符(\)。它们可以通过 Linux 命令删除或使用。对于 Windows,请将它们删除或替换为脱字号(^)。

    myConfig.json:

    [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
注意

对于 Amazon EMR 5.21.0 及更高版本,您可以覆盖集群配置,并为运行的集群中的每个实例组指定额外的配置分类。要完成此操作,您可以使用 Amazon EMR 控制台、Amazon Command Line Interface(Amazon CLI)或 Amazon SDK。有关更多信息,请参阅为运行的集群中的实例组提供配置

从 Apache Log4j 1.x 迁移到 Log4j 2.x

Apache Spark 版本 3.2.x 及更早版本使用旧版 Apache Log4j 1.x 和 log4j.properties 文件在 Spark 进程中配置 Log4j。Apache Spark 版本 3.3.0 及更高版本使用 Apache Log4j 2.x 和 log4j2.properties 文件在 Spark 进程中配置 Log4j。

如果您使用低于 6.8.0 的 Amazon EMR 版本配置了 Apache Spark Log4j,则必须删除旧版 spark-log4j 配置分类并迁移到 spark-log4j2 配置分类和密钥格式,然后才能升级到 Amazon EMR 6.8.0 或更高版本。在 Amazon EMR 版本 6.8.0 及更高版本中,旧版 spark-log4j 分类会导致集群创建失败并出现 ValidationException 错误。不会因为与 Log4j 不兼容相关的故障而向您收费,但您必须删除已失效的 spark-log4j 配置分类才能继续。

有关从 Apache Log4j 1.x 迁移到 Log4j 2.x 的更多信息,请参阅 Github 上的《Apache Log4j 迁移指南》和 Spark Log4j 2 模板

注意

对于 Amazon EMR,Apache Spark 使用 log4j2.properties 文件,而不是《Apache Log4j 迁移指南》中所述的 .xml 文件。此外,我们不建议使用 Log4j 1.x 桥接方法转换为 Log4j 2.x。