Amazon EMR
Amazon EMR 版本指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

经 EMRFS S3 优化的提交程序的要求

满足以下条件时,将使用经 EMRFS S3 优化的提交程序:

  • 您可以运行使用 Spark SQL、DataFrame 或 Dataset 的 Spark 作业来写入 Parquet 文件。

  • 分段上传在 Amazon EMR 中已启用。这是默认值。有关更多信息,请参阅经 EMRFS S3 优化的提交程序和分段上传

  • 将会使用 Spark 的内置 Parquet 支持。内置 Parquet 支持用于以下情况:

    • spark.sql.hive.convertMetastoreParquet 设置为 true。这是默认设置。

    • 当作业写入 Parquet 数据源或表时。例如,目标表是使用 USING parquet 子句创建的。

    • 当作业写入未分区的 Hive 元存储 Parquet 表时。Spark 的内置 Parquet 支持不支持分区的 Hive 表,这是一个已知限制。有关更多信息,请参阅 Apache Spark SQL、DataFrame 和 Dataset 指南中的 Hive 元存储 Parquet 表转换

  • 写入默认分区位置的 Spark 作业操作(例如,${table_location}/k1=v1/k2=v2/)使用提交程序。如果作业操作写入自定义分区位置,则不使用提交程序。例如,如果自定义分区位置是使用 ALTER TABLE SQL 命令设置的。

  • 必须使用 Spark 的以下值:

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled 属性必须设置为 true。这是 Amazon EMR 5.20.0 及更高版本的默认设置。对于 Amazon EMR 5.19.0,默认值是 false。有关配置此值的信息,请参阅为 Amazon EMR 5.19.0 启用经 EMRFS S3 优化的提交程序

    • 如果写入未分区的 Hive 元存储表,则 spark.sql.hive.convertMetastoreParquet 必须设置为 true 。这是默认设置。

    • spark.sql.parquet.output.committer.class 必须设置为 com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter。这是默认设置。

    • spark.sql.sources.commitProtocolClass 必须设置为 org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol。这是默认设置。

    • 如果 Spark 作业用动态分区列覆盖分区的 Parquet 数据集,则 partitionOverwriteMode 写入选项和 spark.sql.sources.partitionOverwriteMode 必须设置为 static。这是默认设置。

      注意

      Spark 2.4.0 中引入了 partitionOverwriteMode 写入选项。对于随附了 Amazon EMR 版本 5.19.0 的 Spark 版本 2.3.2,请设置 spark.sql.sources.partitionOverwriteMode 属性。

当不使用经 EMRFS S3 优化的提交程序时

在以下情况下不使用提交程序:

  • 当向 HDFS 写入时

  • 当使用 S3A 文件系统时

  • 当使用 Parquet 以外的输出格式(例如 ORC 或文本)时

  • 当使用 MapReduce 或 Spark 的 RDD API 时

以下示例演示了使用 Scala 编写的实现,这些实现不整个使用经 EMRFS S3 优化的提交程序(第一个示例),也不部分使用经 EMRFS S3 优化的提交程序(第二个示例)。

例 –动态分区覆盖模式

在以下 Scala 代码中,未使用提交程序,因为 partitionOverwriteMode 设置为 dynamic,动态分区列由 partitionBy 指定,并且写入模式设置为 overwrite

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://bucket/output")

在此示例中,Spark 不执行使用经 EMRFS S3 优化的提交程序或任何已配置的输出提交程序,而是执行使用 Spark 的暂存目录的不同的提交算法,该目录是在以 .spark-staging 开头的输出位置下创建的临时目录。该算法导致分区目录的顺序重命名,这可能会对性能产生负面影响。

Spark 2.4.0 中的算法遵循以下步骤:

  1. 任务尝试将其输出写入 Spark 的暂存目录下的分区目录(例如,${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/)。

  2. 对于写入的每个分区,任务尝试都会跟踪相对分区路径(例如,k1=v1/k2=v2)。

  3. 任务成功完成后,它会为驱动程序提供它跟踪的所有相对分区路径。

  4. 完成所有任务后,作业提交阶段将收集成功任务尝试在 Spark 的暂存目录下写入的所有分区目录。Spark 使用目录树重命名操作按顺序将这些目录的每一个都重命名为其最终输出位置。

  5. 暂存目录会在作业提交阶段完成之前删除。

例 –自定义分区位置

在此示例中,Scala 代码插入到两个分区中。一个分区具有自定义分区位置。另一个分区使用默认分区位置。经 EMRFS S3 优化的提交程序仅用于将任务输出写入到使用默认分区位置的分区。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala 代码创建以下 Amazon S3 对象:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

当写入到自定义位置的分区时,Spark 会使用类似于上一个示例的提交算法,如下所述。与前面的示例一样,该算法会导致顺序重命名,这可能会对性能产生负面影响:

  1. 在将输出写入自定义位置的分区时,任务会写入到 Spark 的暂存目录下的文件中,该目录是在最终输出位置下创建的。该文件的名称包含一个随机 UUID,以防止文件冲突。任务尝试跟踪每个文件以及最终所需的输出路径。

  2. 在任务成功完成后,它会为驱动程序提供这些文件及其最终所需的输出路径。

  3. 所有任务都完成后,作业提交阶段会按顺序将为自定义位置的分区写入的所有文件重命名为其最终输出路径。

  4. 暂存目录会在作业提交阶段完成之前删除。