在 Amazon Glue 中使用 Parquet 格式 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon Glue 中使用 Parquet 格式

Amazon Glue 从源中检索数据,并将数据写入以各种数据格式存储和传输的目标。如果您的数据以 Parquet 数据格式存储或传输,本文档将向您介绍供您使用 Amazon Glue 中的数据的可用功能。

Amazon Glue 支持使用 Parquet 格式。此格式是一种以性能为导向、基于列的数据格式。有关标准颁发机构对此格式的简介,请参阅 Apache Parquet Documentation Overview(Apache Parquet 文档概述)。

您可以使用 Amazon Glue 从 Amazon S3 和流式处理媒体源读取 Parquet 文件,以及将 Parquet 文件写入 Amazon S3。您可以读取并写入包含 S3 中的 Parquet 文件的 bzipgzip 存档。请在 S3 连接参数 上而非本页中讨论的配置中配置压缩行为。

下表显示了哪些常用 Amazon Glue 功能支持 Parquet 格式选项。

读取 写入 流式处理读取 对小文件进行分组 作业书签
支持 支持 支持 不支持 支持*

* 在 Amazon Glue 版本 1.0+ 中受支持

示例:从 S3 读取 Parquet 文件或文件夹

先决条件:您将需要至您想要读取的 Parquet 文件或文件夹的 S3 路径(s3path)。

配置:在函数选项中,请指定 format="parquet"。在您的 connection_options 中,请使用 paths 键指定 s3path

您可以在 connection_options 中配置读取器与 S3 的交互方式。有关详细信息,请参阅 Amazon Glue 中 ETL 的连接类型和选项:S3 连接参数

您可以配置读取器如何解释 format_options 中的 Parquet 文件。有关详细信息,请参阅 Parquet 配置参考

以下 Amazon Glue ETL 脚本显示了从 S3 读取 Parquet 文件或文件夹的过程:

Python

在本示例中,使用 create_dynamic_frame.from_options 方法。

# Example: Read Parquet from S3 from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "s3", connection_options = {"paths": ["s3://s3path/"]}, format = "parquet" )

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

dataFrame = spark.read.parquet("s3://s3path/")
Scala

在本示例中,使用 getSourceWithFormat 方法。

// Example: Read Parquet from S3 import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dynamicFrame = glueContext.getSourceWithFormat( connectionType="s3", format="parquet", options=JsonOptions("""{"paths": ["s3://s3path"]}""") ).getDynamicFrame() } }

您还可以使用脚本(org.apache.spark.sql.DataFrame)中的 DataFrames。

spark.read.parquet("s3://s3path/")

示例:将 Parquet 文件和文件夹写入 S3

先决条件:您将需要一个初始化的 DataFrame(dataFrame)或 DynamicFrame(dynamicFrame)。您还需要预期 S3 输出路径 s3path

配置:在函数选项中,请指定 format="parquet"。在您的 connection_options 中,请使用 paths 键指定 s3path

您可以在 connection_options 中进一步修改编写器与 S3 的交互方式。有关详细信息,请参阅 Amazon Glue 中 ETL 的连接类型和选项:S3 连接参数。您可以配置自己的操作在 format_options 中写入文件的内容的方式。有关详细信息,请参阅 Parquet 配置参考

以下 Amazon Glue ETL 脚本显示了将 Parquet 文件和文件夹写入 S3 的过程。

我们通过 useGlueParquetWriter 配置键为自定义 Parquet 编写器提供 DynamicFrames 的性能优化。要确定此编写器是否适合您的工作负载,请参阅 Glue Parquet 编写器

Python

在本示例中,使用 write_dynamic_frame.from_options 方法。

# Example: Write Parquet to S3 # Consider whether useGlueParquetWriter is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) glueContext.write_dynamic_frame.from_options( frame=dynamicFrame, connection_type="s3", format="parquet", connection_options={ "path": "s3://s3path", }, format_options={ # "useGlueParquetWriter": True, }, )

您还可以使用脚本(pyspark.sql.DataFrame)中的 DataFrames。

df.write.parquet("s3://s3path/")
Scala

在本示例中,请使用 getSinkWithFormat 方法。

// Example: Write Parquet to S3 // Consider whether useGlueParquetWriter is right for your workflow. import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.{DynamicFrame, GlueContext} import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getSinkWithFormat( connectionType="s3", options=JsonOptions("""{"path": "s3://s3path"}"""), format="parquet" ).writeDynamicFrame(dynamicFrame) } }

您还可以使用脚本(org.apache.spark.sql.DataFrame)中的 DataFrames。

df.write.parquet("s3://s3path/")

Parquet 配置参考

您可以在 Amazon Glue 库指定 format="parquet" 的任何位置使用以下 format_options

  • useGlueParquetWriter – 指定使用具有 DynamicFrame 工作流性能优化的自定义 Parquet 编写器。有关使用情况的详细信息,请参阅 Glue Parquet 编写器

    • 类型:布尔值,默认值:false

  • compression – 指定使用的压缩编解码器。值与 org.apache.parquet.hadoop.metadata.CompressionCodecName 完全兼容。

    • 类型:枚举文本,默认值:"snappy"

    • 值:"uncompressed""snappy""gzip""lzo"

  • blockSize – 指定内存中缓冲的行组的字节大小。您可以用它来调整性能。大小应精确地划分为若干兆字节。

    • 类型:数值,默认值:134217728

    • 默认值等于 128MB。

  • pageSize – 指定页面的大小(以字节为单位)。您可以用它来调整性能。页面是必须完全读取以访问单个记录的最小单位。

    • 类型:数值,默认值:1048576

    • 默认值等于 1MB。

注意

此外,基础 SparkSQL 代码所接受的任何选项均可通过 connection_options 映射参数传递给此格式。例如,您可以为 Amazon Glue Spark 读取器设置 Spark 配置(如 mergeSchema),以合并所有文件的架构。

使用 Amazon Glue Parquet 编写器优化写入性能

注意

Amazon Glue Parquet 编写器以前一直通过 glueparquet 格式类型访问。这种访问模式已不再提倡。请改用启用了 useGlueParquetWriterparquet 类型。

Amazon Glue Parquet 编写器具有允许更快地写入 Parquet 文件的性能增强功能。传统编写器在写入之前计算架构。Parquet 格式不会以可快速检索的方式存储架构,因此可能需要一些时间。使用 Amazon Glue Parquet 编写器时,不需要预计算的架构。在数据传入时,编写器会动态计算和修改架构。

指定 useGlueParquetWriter 时请注意以下限制:

  • 编写器仅支持架构发展(例如添加或删除列)但不支持更改列类型,例如使用 ResolveChoice

  • 写入器不支持写入空 DataFrame,例如,写入纯架构文件。通过设置 enableUpdateCatalog=True 实现与 Amazon Glue Data Catalog 的集成时,尝试写入空 DataFrame 不会更新数据目录。这将导致在数据目录中创建一个没有架构的表。

如果您的转换不需要这些限制,则开启 Amazon Glue Parquet 编写器应该能提高性能。