对 EMRFS S3 优化的提交者的要求 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

对 EMRFS S3 优化的提交者的要求

当满足以下条件时,将使用 EMRFS S3 优化的提交器:

  • 您运行使用 Sp SQL ark 或数据集将文件写入 Amazon S3 的 Spark 任务。 DataFrames从 Amazon EMR 6.4.0 开始,此提交器可用于所有常见格式,ORC包括 parquet 和基于文本的格式(包括CSV和)。JSON对于 Amazon EMR 6.4.0 之前的版本,仅支持 Parquet 格式。

  • 在 Amazon 中,分段上传已启用。EMR这是默认模式。有关更多信息,请参阅 EMRFSS3 优化的提交者和分段上传

  • 使用 Spark 的内置文件格式支持。内置文件格式支持用于以下情况:

    • 对于 Hive metastore 表,当 spark.sql.hive.convertMetastoreParquet Parquet 表设置为,true对于亚马逊 EMR 6.4.0 或更true高版本spark.sql.hive.convertMetastoreOrc的 Orc 表,则设置为。这些是默认设置。

    • 当任务写入文件格式数据来源或表时,例如,使用 USING parquet 子句创建目标表。

    • 当作业写入未分区的 Hive 元存储 Parquet 表时。Spark 的内置 Parquet 支持不支持分区的 Hive 表,这是一个已知限制。有关更多信息,请参阅《Apac he Spark SQL》 DataFrames 和《数据集指南》中的 Hive metastore Parquet 表转换

  • 写入默认分区位置的 Spark 任务操作,例如 ${table_location}/k1=v1/k2=v2/,使用提交程序。如果任务操作写入自定义分区位置,则不使用提交程序,例如,如果使用 ALTER TABLE SQL 命令设置自定义分区位置。

  • 必须使用 Spark 的以下值:

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled 属性必须设置为 true。这是亚马逊 EMR 5.20.0 及更高版本的默认设置。在亚马逊 EMR 5.19.0 中,默认值为。false有关配置此值的信息,请参阅为亚马逊 5.19.0 启用 EMRFS S3 优化的提交器 EMR

    • 如果写入未分区的 Hive 元数据仓表,则仅支持 Parquet 和 Orc 文件格式。 spark.sql.hive.convertMetastoreParquettrue如果写入未分区 Parquet Hive 元数据仓表,则必须设置为。 spark.sql.hive.convertMetastoreOrctrue如果写入未分区的 Orc Hive 元数据仓表,则必须设置为。这些是默认设置。

    • spark.sql.parquet.output.committer.class 必须设置为 com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter。这是默认设置。

    • spark.sql.sources.commitProtocolClass必须设置为org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol是亚马逊 EMR 5.x 系列版本 5.30.0 及更高版本以及亚马逊 6. EMR x 系列版本 6.2.0 及更高版本的默认设置。 org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol是先前 Amazon EMR 版本的默认设置。

    • 如果 Spark 作业用动态分区列覆盖分区的 Parquet 数据集,则 partitionOverwriteMode 写入选项和 spark.sql.sources.partitionOverwriteMode 必须设置为 static。这是默认设置。

      注意

      Spark 2.4.0 中引入了 partitionOverwriteMode 写入选项。对于亚马逊EMR发行版 5.19.0 中包含的 Spark 版本 2.3.2,请设置该属性。spark.sql.sources.partitionOverwriteMode

不使用 EMRFS S3 优化的提交者的场合

通常,在EMRFS以下情况下不使用 S3 优化的提交器。

情况 为什么不使用提交程序
当你写信给 HDFS 提交者仅支持使用EMRFS写入 Amazon S3。
当您使用 S3A 文件系统时 提交者只支持EMRFS。
当你使用 MapReduce 或 Spark 时 RDD API 提交者仅支持使用 Spark SQL DataFrame、或数据集APIs。

以下 Scala 示例演示了一些其他情况,这些情况会阻止 EMRFS S3 优化的提交器全部使用(第一个示例)和部分使用(第二个示例)。

例 – 动态分区覆盖模式

以下 Scala 示例指示 Spark 使用不同的提交算法,这样可以完全阻止使用 EMRFS S3 优化的提交器。该代码将 partitionOverwriteMode 属性设置为 dynamic,以仅覆盖您的数据所写入到的分区。然后,通过 partitionBy 指定动态分区列,并将写入模式设置为 overwrite

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://EXAMPLE-DOC-BUCKET/output")

必须配置所有三个设置,以避免使用 EMRFS S3 优化的提交器。当您执行此操作时,Spark 会执行在 Spark 的提交协议中指定的不同提交算法。对于 EMR 5.30.0 之前的 Amazon 5.x 版本和 EMR 6.2.0 之前的 Amazon 6.x 版本,提交协议使用 Spark 的暂存目录,该目录是在以开头的输出位置下创建的临时目录。.spark-staging该算法按顺序对分区目录进行重命名,这可能会对性能产生负面影响。有关 Amazon 5.30.0 及更高EMR版本以及 6.2.0 及更高版本的更多信息,请参阅。使用 EMRFS S3 优化的提交协议

Spark 2.4.0 中的算法遵循以下步骤:

  1. 任务尝试将其输出写入 Spark 的暂存目录下的分区目录,例如 ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/

  2. 对于写入的每个分区,任务尝试都跟踪相对分区路径,例如 k1=v1/k2=v2

  3. 任务成功完成后,它会为驱动程序提供它跟踪的所有相对分区路径。

  4. 完成所有任务后,作业提交阶段将收集成功任务尝试在 Spark 的暂存目录下写入的所有分区目录。Spark 使用目录树重命名操作按顺序将这些目录的每一个都重命名为其最终输出位置。

  5. 暂存目录会在作业提交阶段完成之前删除。

例 – 自定义分区位置

在此示例中,Scala 代码插入到两个分区中。一个分区具有自定义分区位置。另一个分区使用默认分区位置。EMRFSS3 优化的提交器仅用于将任务输出写入使用默认分区位置的分区。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala 代码创建以下 Amazon S3 对象:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

当写入到自定义位置的分区时,Spark 会使用类似于上一个示例的提交算法,如下所述。与前面的示例一样,该算法会导致顺序重命名,这可能会影响性能。

  1. 在将输出写入自定义位置的分区时,任务会写入到 Spark 的暂存目录下的文件中,该目录是在最终输出位置下创建的。文件名中包含一个随机名称UUID,以防文件冲突。任务尝试跟踪每个文件以及最终所需的输出路径。

  2. 在任务成功完成后,它会为驱动程序提供这些文件及其最终所需的输出路径。

  3. 所有任务都完成后,作业提交阶段会按顺序将为自定义位置的分区写入的所有文件重命名为其最终输出路径。

  4. 暂存目录会在作业提交阶段完成之前删除。