对 EMRFS S3 优化的提交协议的要求 - Amazon EMR
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

对 EMRFS S3 优化的提交协议的要求

满足以下条件时使用 EMRFS S3 优化的提交协议:

  • 你运行使用 Spark SQL DataFrames、或数据集来覆盖分区表的 Spark 作业。

  • 您可以运行分区覆盖模式为 dynamic 的 Spark 任务。

  • 在 Amazon 中,分段上传已启用。EMR这是默认模式。有关更多信息,请参阅 EMRFSS3 优化的提交协议和分段上传

  • 的文件系统缓存已启EMRFS用。这是默认模式。检查设置 fs.s3.impl.disable.cache 是否设置为 false

  • 使用 Spark 的内置数据来源支持。内置数据来源支持用于以下情况:

    • 当任务写入内置数据来源或表时。

    • 当任务写入 Hive 元存储 Parquet 表时。当 spark.sql.hive.convertInsertingPartitionedTablespark.sql.hive.convertMetastoreParquet 都设置为 true 时,就会发生这种情况。这些是默认设置。

    • 当作业写入 Hive 元数据仓表ORC时。当 spark.sql.hive.convertInsertingPartitionedTablespark.sql.hive.convertMetastoreOrc 都设置为 true 时,就会发生这种情况。这些是默认设置。

  • 写入默认分区位置的 Spark 任务操作,例如 ${table_location}/k1=v1/k2=v2/,使用提交协议。如果任务操作写入自定义分区位置,则不使用协议,例如,如果使用 ALTER TABLE SQL 命令设置自定义分区位置。

  • 必须使用 Spark 的以下值:

    • spark.sql.sources.commitProtocolClass 必须设置为 org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol。这是亚马逊EMR版本 5.30.0 及更高版本以及 6.2.0 及更高版本的默认设置。

    • partitionOverwriteMode 写入选项或 spark.sql.sources.partitionOverwriteMode 必须设置为 dynamic。默认设置为 static

      注意

      Spark 2.4.0 中引入了 partitionOverwriteMode 写入选项。对于亚马逊EMR发行版 5.19.0 中包含的 Spark 版本 2.3.2,请设置该属性。spark.sql.sources.partitionOverwriteMode

    • 如果 Spark 任务覆盖了 Hive 元存储 Parquet 表,则 spark.sql.hive.convertMetastoreParquetspark.sql.hive.convertInsertingPartitionedTablespark.sql.hive.convertMetastore.partitionOverwriteMode 必须设置为 true。这些是默认设置。

    • 如果 Spark 作业覆盖到 Hive 元数据仓ORC表,则spark.sql.hive.convertMetastore.partitionOverwriteMode必须将和spark.sql.hive.convertMetastoreOrcspark.sql.hive.convertInsertingPartitionedTable、和设置为。true这些是默认设置。

例 – 动态分区覆盖模式

在此 Scala 示例中已触发优化。首先,将 partitionOverwriteMode 属性设置为 dynamic。这只会覆盖您正在写入数据的那些分区。然后,通过 partitionBy 指定动态分区列,并将写入模式设置为 overwrite

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") // "overwrite" instead of "insert" .option("partitionOverwriteMode", "dynamic") // "dynamic" instead of "static" .partitionBy("dt") // partitioned data instead of unpartitioned data .parquet("s3://EXAMPLE-DOC-BUCKET/output") // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"

不使用 EMRFS S3 优化的提交协议时

通常,EMRFSS3 优化的提交协议与开源默认 Spark SQL 提交协议的工作原理相同。org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol以下情况下不会进行优化。

情况 为什么不使用提交协议
当你写信给 HDFS 提交协议仅支持使用写入 Amazon S3 EMRFS。
当您使用 S3A 文件系统时 仅支持提交协议EMRFS。
当你使用 MapReduce 或 Spark 时 RDD API 提交协议仅支持使用 Spark SQL DataFrame、或数据集APIs。
当没有触发动态分区覆盖时 提交协议仅优化动态分区覆盖情况。有关其他情况,请参阅 使用 EMRFS S3 优化的提交器

以下 Scala 示例演示了 EMRFS S3 优化的提交协议委托的其他一些情况。SQLHadoopMapReduceCommitProtocol

例 – 具有自定义分区位置的动态分区覆盖模式

在此示例中,Scala 程序以动态分区覆盖模式覆盖两个分区。一个分区具有自定义分区位置。另一个分区使用默认分区位置。EMRFSS3 优化的提交协议只会改进使用默认分区位置的分区。

val table = "dataset" val inputView = "tempView" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .createTempView(inputView) // Set partition overwrite mode to 'dynamic' spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic") spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")

Scala 代码创建以下 Amazon S3 对象:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
注意

在早期 Spark 版本中写入自定义分区位置可能会导致数据丢失。在此示例中,分区 dt='2019-01-28' 将丢失。有关更多详细信息,请参阅 SPARK-35106。此问题已在亚马逊EMR版本 5.33.0 及更高版本中修复,不包括 6.0.x 和 6.1.x。

当写入到自定义位置的分区时,Spark 会使用类似于上一个示例的提交算法,如下所述。与前面的示例一样,该算法会导致顺序重命名,这可能会影响性能。

Spark 2.4.0 中的算法遵循以下步骤:

  1. 在将输出写入自定义位置的分区时,任务会写入到 Spark 的暂存目录下的文件中,该目录是在最终输出位置下创建的。文件名中包含一个随机名称UUID,以防文件冲突。任务尝试跟踪每个文件以及最终所需的输出路径。

  2. 在任务成功完成后,它会为驱动程序提供这些文件及其最终所需的输出路径。

  3. 所有任务都完成后,作业提交阶段会按顺序将为自定义位置的分区写入的所有文件重命名为其最终输出路径。

  4. 暂存目录会在作业提交阶段完成之前删除。