经 EMRFS S3 优化的提交程序的要求 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

经 EMRFS S3 优化的提交程序的要求

满足以下条件时,将使用经 EMRFS S3 优化的提交程序:

  • 您可以运行使用 Spark 或 Dataset 的 Spark 任务将文件写入 Amazon S3。 DataFrames从 Amazon EMR 6.4.0 开始,此提交程序可用于所有常见格式,包括 parquet、ORC 和基于文本的格式(包括 CSV 和 JSON)。对于 Amazon EMR 6.4.0 之前的发行版,仅支持 Parquet 格式。

  • 分段上传在 Amazon EMR 中已启用。这是默认值。有关更多信息,请参阅 经 EMRFS S3 优化的提交程序和分段上传

  • 使用 Spark 的内置文件格式支持。内置文件格式支持用于以下情况:

    • 对于 Hive 元存储表,当 spark.sql.hive.convertMetastoreParquet 设置为 true 时,可用于 Parquet 表,或 spark.sql.hive.convertMetastoreOrc 设置为 true 时,可用于 Amazon EMR 6.4.0 或更高版本的 Orc 表。这些是默认设置。

    • 当任务写入文件格式数据来源或表时,例如,使用 USING parquet 子句创建目标表。

    • 当作业写入未分区的 Hive 元存储 Parquet 表时。Spark 的内置 Parquet 支持不支持分区的 Hive 表,这是一个已知限制。有关更多信息,请参阅《Apac he Spark》 DataFrames 和《数据集指南》中的 Hive metastore Parquet 表转换

  • 写入默认分区位置的 Spark 任务操作,例如 ${table_location}/k1=v1/k2=v2/,使用提交程序。如果任务操作写入自定义分区位置,则不使用提交程序,例如,如果使用 ALTER TABLE SQL 命令设置自定义分区位置。

  • 必须使用 Spark 的以下值:

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled 属性必须设置为 true。这是 Amazon EMR 5.20.0 及更高版本的默认设置。对于 Amazon EMR 5.19.0,默认值是 false。有关配置此值的信息,请参阅为 Amazon EMR 5.19.0 启用经 EMRFS S3 优化的提交程序

    • 如果写入非分区的 Hive 元存储表,则仅支持 Parquet 和 Orc 文件格式。如果写入非分区的 Parquet Hive 元存储表,则须将 spark.sql.hive.convertMetastoreParquet 设置为 true。如果写入非分区的 Orc Hive 元存储表,则 spark.sql.hive.convertMetastoreOrc 须设置为 true。这些是默认设置。

    • spark.sql.parquet.output.committer.class 必须设置为 com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter。这是默认设置。

    • 必须将 spark.sql.sources.commitProtocolClass 设置为 org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol 是 Amazon EMR 5.x 系列 5.30.0 及更高版本以及 Amazon EMR 6.x 系列 6.2.0 及更高版本的默认设置。org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol 是以前 Amazon EMR 版本的默认设置。

    • 如果 Spark 作业用动态分区列覆盖分区的 Parquet 数据集,则 partitionOverwriteMode 写入选项和 spark.sql.sources.partitionOverwriteMode 必须设置为 static。这是默认设置。

      注意

      Spark 2.4.0 中引入了 partitionOverwriteMode 写入选项。对于随附了 Amazon EMR 版本 5.19.0 的 Spark 版本 2.3.2,请设置 spark.sql.sources.partitionOverwriteMode 属性。

不使用经 EMRFS S3 优化的提交程序的情况

通常,经 EMRFS S3 优化的提交程序不会在以下情况下使用。

情况 为什么不使用提交程序
当您向 HDFS 写入时 提交程序只支持使用 EMRFS 写入 Amazon S3。
当您使用 S3A 文件系统时 提交程序只支持 EMRFS。
当使用 Spark MapReduce 的 RDD API 时 提交程序只支持使用 SparkSQL DataFrame、或数据集。 APIs

以下 Scala 示例演示了防止经 EMRFS S3 优化的提交程序被整个使用(第一个示例)和部分使用(第二个示例)的一些其他情况。

例 – 动态分区覆盖模式

以下 Scala 示例指示 Spark 使用不同的提交算法,这完全阻止了经 EMRFS S3 优化的提交程序的使用。该代码将 partitionOverwriteMode 属性设置为 dynamic,以仅覆盖您的数据所写入到的分区。然后,通过 partitionBy 指定动态分区列,并将写入模式设置为 overwrite

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://amzn-s3-demo-bucket1/output")

您必须配置所有三项设置,以避免使用经 EMRFS S3 优化的提交程序。当您执行此操作时,Spark 会执行在 Spark 的提交协议中指定的不同提交算法。对于早于 5.30.0 的 Amazon EMR 5.x 发行版和早于 6.2.0 的 Amazon EMR 6.x 发行版,提交协议使用 Spark 的暂存目录,该目录是在以 .spark-staging 开头的输出位置下创建的临时目录。该算法按顺序对分区目录进行重命名,这可能会对性能产生负面影响。有关 Amazon EMR 发行版 5.30.0 及更高版本和 6.2.0 及更高版本的更多信息,请参阅 使用经 EMRFS S3 优化的提交协议

Spark 2.4.0 中的算法遵循以下步骤:

  1. 任务尝试将其输出写入 Spark 的暂存目录下的分区目录,例如 ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/

  2. 对于写入的每个分区,任务尝试都跟踪相对分区路径,例如 k1=v1/k2=v2

  3. 任务成功完成后,它会为驱动程序提供它跟踪的所有相对分区路径。

  4. 完成所有任务后,作业提交阶段将收集成功任务尝试在 Spark 的暂存目录下写入的所有分区目录。Spark 使用目录树重命名操作按顺序将这些目录的每一个都重命名为其最终输出位置。

  5. 暂存目录会在作业提交阶段完成之前删除。

例 – 自定义分区位置

在此示例中,Scala 代码插入到两个分区中。一个分区具有自定义分区位置。另一个分区使用默认分区位置。经 EMRFS S3 优化的提交程序仅用于将任务输出写入到使用默认分区位置的分区。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala 代码创建以下 Amazon S3 对象:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

当写入到自定义位置的分区时,Spark 会使用类似于上一个示例的提交算法,如下所述。与前面的示例一样,该算法会导致顺序重命名,这可能会影响性能。

  1. 在将输出写入自定义位置的分区时,任务会写入到 Spark 的暂存目录下的文件中,该目录是在最终输出位置下创建的。该文件的名称包含一个随机 UUID,以防止文件冲突。任务尝试跟踪每个文件以及最终所需的输出路径。

  2. 在任务成功完成后,它会为驱动程序提供这些文件及其最终所需的输出路径。

  3. 所有任务都完成后,作业提交阶段会按顺序将为自定义位置的分区写入的所有文件重命名为其最终输出路径。

  4. 暂存目录会在作业提交阶段完成之前删除。