运行 EMR Serverless 作业时使用 Spark 配置
您可以在 type 参数设置为 SPARK 的情况下在应用程序上运行 Spark 作业。作业必须与适用于 Amazon EMR 发行版的 Spark 版本兼容。例如,当您在 Amazon EMR 发行版 6.6.0 上运行作业时,作业必须与 Apache Spark 3.2.0 兼容。有关每个发行版的应用程序版本信息,请参阅 Amazon EMR Serverless 发行版。
Spark 作业参数
使用 StartJobRun API 运行 Spark 作业时,请指定以下参数。
Spark 作业运行时角色
使用 executionRoleArn 指定应用程序用来执行 Spark 作业的 IAM 角色 ARN。此角色必须具有以下权限:
-
从数据驻留的 S3 存储桶或其他数据来源读取数据
-
从 PySpark 脚本或 JAR 文件驻留的 S3 存储桶或前缀中读取数据
-
写入要写入最终输出的 S3 存储桶
-
将日志写入
S3MonitoringConfiguration指定的 S3 存储桶或前缀 -
访问 KMS 密钥(如果使用 KMS 密钥加密 S3 存储桶中的数据)
-
访问 Amazon Glue Data Catalog(如果使用 SparkSQL)
如果 Spark 作业从/向其他数据来源读取/写入数据,请在此 IAM 角色中指定相应的权限。如果不向 IAM 角色提供这些权限,作业可能会失败。有关更多信息,请参阅 Amazon EMR Serverless 的作业运行时角色 和 存储日志。
Spark 作业驱动程序参数
使用 jobDriver 为作业提供输入。对于要运行的作业类型,作业驱动程序参数只接受一个值。对于 Spark 作业,参数值为 sparkSubmit。您可以使用此作业类型通过 Spark 提交运行 Scala、Java、PySpark 以及任何其他支持的作业。Spark 作业具有以下参数:
-
sparkSubmitParameters:这些是您希望发送给作业的其他 Spark 参数。使用此参数可覆盖默认 Spark 属性,例如驱动程序内存或执行程序数量,如--conf或--class参数中定义的属性。 -
entryPointArguments:这是您希望传递给主 JAR 或 Python 文件的参数数组。您应该使用 Entrypoint 代码来处理读取这些参数。用逗号分隔数组中的每个参数。 -
entryPoint:这是 Amazon S3 中对要运行的主 JAR 或 Python 文件的引用。如果您正在运行 Scala 或 Java JAR,请使用--class参数指定SparkSubmitParameters中的主入口类。
有关更多信息,请参阅使用 spark-submit 启动应用程序
Spark 配置覆盖参数
使用 configurationOverrides 覆盖监控级别和应用程序级别配置属性。该参数接受包含以下两个字段的 JSON 对象:
-
monitoringConfiguration:使用该字段指定希望 EMR Serverless 作业存储 Spark 作业日志的 Amazon S3 URL(s3MonitoringConfiguration)。确保使用托管应用程序的同一 Amazon Web Services 账户 以及运行作业的同一 Amazon Web Services 区域 创建此存储桶。 -
applicationConfiguration:您可以在此字段中提供一个配置对象,来覆盖应用程序的默认配置。您可以使用简写语法提供配置,或可引用 JSON 文件中的配置对象。配置对象包含分类、属性和可选的嵌套配置。属性由您希望在该文件中覆盖的设置组成。您可以在一个 JSON 对象中为多个应用程序指定多个分类。注意
可用的配置分类因特定的 EMR Serverless 发行版而异。例如,自定义 Log4j
spark-driver-log4j2和spark-executor-log4j2的分类仅适用于 6.8.0 及更高版本。
如果在应用程序覆盖和 Spark 提交参数中使用相同的配置,则 Spark 提交参数优先。配置按优先级从高到低排列如下:
-
EMR Serverless 在创建
SparkSession时提供的配置。 -
使用
--conf参数作为sparkSubmitParameters的一部分提供的配置。 -
启动作业时,作为应用程序覆盖的一部分提供的配置。
-
创建应用程序时,作为
runtimeConfiguration的一部分提供的配置。 -
Amazon EMR 对发行版使用的优化配置。
-
应用程序的默认开源配置。
有关在应用程序级别声明配置以及在作业运行期间覆盖配置的更多信息,请参阅 EMR Serverless 的默认应用程序配置。
Spark 动态资源分配优化
使用 dynamicAllocationOptimization 优化 EMR Serverless 中的资源使用情况。在 Spark 配置分类中将此属性设置为 true 表示 EMR Serverless 需要优化执行程序资源分配,以便 Spark 请求和取消执行程序的速率与 EMR Serverless 创建和释放工作线程的速率更好地保持一致。这样,EMR Serverless 可以更好地跨阶段重用工作线程,从而在运行多阶段作业时降低成本,同时性能保持不变。
此属性适用于所有 Amazon EMR 发行版。
以下是使用 dynamicAllocationOptimization 进行配置分类的示例。
[ { "Classification": "spark", "Properties": { "dynamicAllocationOptimization": "true" } } ]
如果使用动态分配优化,请考虑以下几点:
-
此优化适用于您为其启用了动态资源分配的 Spark 作业。
-
为实现最佳成本效益,我们建议根据工作负载情况,使用作业级别设置
spark.dynamicAllocation.maxExecutors或应用程序级最大容量设置来配置工作线程的扩展上限。 -
在简单的作业中,您可能看不到成本改善。例如,如果您的作业在一个小数据集上运行或在一个阶段内完成运行,Spark 可能不需要大量的执行程序或多个扩展事件。
-
如果作业顺序是大阶段、小阶段然后又是大阶段,则作业运行时可能会出现回归。由于 EMR Serverless 能有效使用资源,可能会导致大阶段的可用工作线程减少,从而延长运行时间。
Spark 作业属性
下表列出了提交 Spark 作业时可以覆盖的可选 Spark 属性及其默认值。
| 键 | 描述 | 默认值 |
|---|---|---|
spark.archives |
以逗号分隔的存档列表,Spark 将其提取到每个执行程序的工作目录中。支持的文件类型包括 .jar、
.tar.gz、.tgz 和 .zip。若要指定要提取的目录名,请在要提取的文件名后添加 #。例如 file.zip#directory。 |
NULL |
spark.authenticate |
开启 Spark 内部连接身份验证的选项。 | TRUE |
spark.driver.cores |
驱动程序使用的核心数。 | 4 |
spark.driver.extraJavaOptions |
Spark 驱动程序的额外 Java 选项。 | NULL |
spark.driver.memory |
驱动程序使用的内存量。 | 14G |
spark.dynamicAllocation.enabled |
开启动态资源分配的选项。此选项可根据工作负载扩展或缩减在应用程序中注册的执行程序数量。 | TRUE |
spark.dynamicAllocation.executorIdleTimeout |
在 Spark 将其删除之前,执行程序可保持空闲状态的时间长度。这仅适用于开启动态分配的情况。 | 60s |
spark.dynamicAllocation.initialExecutors |
开启动态分配时要运行的初始执行程序数量。 | 3 |
spark.dynamicAllocation.maxExecutors |
如果开启动态分配,则表示执行程序数量的上限。 | 对于 6.10.0 及更高版本, 对于 6.9.0 及更低版本, |
spark.dynamicAllocation.minExecutors |
如果开启动态分配,则表示执行程序数量的下限。 | 0 |
spark.emr-serverless.allocation.batch.size |
在每个执行程序分配周期中请求的容器数。每个分配周期之间有一秒的间隔。 | 20 |
spark.emr-serverless.driver.disk |
Spark 驱动程序磁盘。 | 20G |
spark.emr-serverless.driverEnv. |
为 Spark 驱动程序添加环境变量的选项。 | NULL |
spark.emr-serverless.executor.disk |
Spark 执行程序磁盘。 | 20G |
spark.emr-serverless.memoryOverheadFactor |
设置要添加到驱动程序和执行程序容器内存的内存开销。 | 0.1 |
spark.emr-serverless.driver.disk.type |
附加到 Spark 驱动程序的磁盘类型。 | Standard |
spark.emr-serverless.executor.disk.type |
附加到 Spark 执行程序的磁盘类型。 | Standard |
spark.executor.cores |
每个执行程序使用的核心数。 | 4 |
spark.executor.extraJavaOptions |
Spark 执行程序的额外 Java 选项。 | NULL |
spark.executor.instances |
要分配的 Spark 执行程序容器的数量。 | 3 |
spark.executor.memory |
每个执行程序使用的内存量。 | 14G |
spark.executorEnv. |
向 Spark 执行程序添加环境变量的选项。 | NULL |
spark.files |
以逗号分隔的文件列表,这些文件放置在每个执行程序的工作目录中。您可以使用 SparkFiles.get( 在执行程序中访问这些文件的路径。 |
NULL |
spark.hadoop.hive.metastore.client.factory.class |
Hive 元存储实现类。 | NULL |
spark.jars |
添加到驱动程序和执行程序的运行时类路径中的其他 jar 文件。 | NULL |
spark.network.crypto.enabled |
开启基于 AES 的 RPC 加密的选项。这包括 Spark 2.2.0 中添加的身份验证协议。 | FALSE |
spark.sql.warehouse.dir |
托管数据库和表的默认位置。 | $PWD/spark-warehouse 的值 |
spark.submit.pyFiles |
以逗号分隔的 .zip、.egg 或
.py 文件列表,这些文件放置在 Python 应用程序的 PYTHONPATH 中。 |
NULL |
下表列出了默认的 Spark 提交参数。
| 键 | 描述 | 默认值 |
|---|---|---|
archives |
以逗号分隔的存档列表,Spark 将其提取到每个执行程序的工作目录中。 | NULL |
class |
应用程序的主类(适用于 Java 和 Scala 应用程序)。 | NULL |
conf |
任意 Spark 配置属性。 | NULL |
driver-cores |
驱动程序使用的核心数。 | 4 |
driver-memory |
驱动程序使用的内存量。 | 14G |
executor-cores |
每个执行程序使用的核心数。 | 4 |
executor-memory |
执行程序使用的内存量。 | 14G |
files |
以逗号分隔的文件列表,这些文件放置在每个执行程序的工作目录中。您可以使用 SparkFiles.get( 在执行程序中访问这些文件的路径。 |
NULL |
jars |
以逗号分隔的 jar 文件列表,这些文件包含在驱动程序和执行程序的类路径上。 | NULL |
num-executors |
要启动的执行程序数。 | 3 |
py-files |
以逗号分隔的 .zip、.egg 或 .py 文件列表,这些文件放置在 Python 应用程序的 PYTHONPATH 中。 |
NULL |
verbose |
开启其他调试输出的选项。 | NULL |
资源配置最佳实践
通过 StartJobRun API 配置驱动程序和执行程序资源
注意
Spark 驱动程序和执行程序核心及内存属性(如果指定)必须直接在 StartJobRun API 请求中指定。
通过这种方式配置资源可确保 EMR Serverless 在运行作业之前分配正确的资源。这与用户脚本中提供的设置(例如 .py 或 .jar 文件)形成对比,这些设置评估得太晚,因为驱动程序和执行程序工作线程有时会在脚本执行开始之前预置。在提交作业时,可通过两种受支持的方式配置这些资源:
选项 1:使用 sparkSubmitParameters
"jobDriver": { "sparkSubmit": { "entryPoint": "s3://your-script-path.py", "sparkSubmitParameters": "—conf spark.driver.memory=4g \ —conf spark.driver.cores=2 \ —conf spark.executor.memory=8g \ —conf spark.executor.cores=4" } }
选项 2:使用 configurationOverrides 进行 spark-defaults 分类
"configurationOverrides": { "applicationConfiguration": [ { "classification": "spark-defaults", "properties": { "spark.driver.memory": "4g", "spark.driver.cores": "2", "spark.executor.memory": "8g", "spark.executor.cores": "4" } } ] }
Spark 示例
下面的示例展示了如何使用 StartJobRun API 运行 Python 脚本。有关使用此示例的端到端教程,请参阅开始使用 Amazon EMR Serverless。有关如何运行 PySpark 作业并添加 Python 依赖项,请在 EMR Serverless Samples
aws emr-serverless start-job-run \ --application-idapplication-id\ --execution-role-arnjob-role-arn\ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py", "entryPointArguments": ["s3://amzn-s3-demo-destination-bucket/wordcount_output"], "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1" } }'
下面的示例展示了如何使用 StartJobRun API 运行 Spark JAR。
aws emr-serverless start-job-run \ --application-idapplication-id\ --execution-role-arnjob-role-arn\ --job-driver '{ "sparkSubmit": { "entryPoint": "/usr/lib/spark/examples/jars/spark-examples.jar", "entryPointArguments": ["1"], "sparkSubmitParameters": "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" } }'