Amazon Glue 中的 ETL 输入和输出的格式选项 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

Amazon Glue 中的 ETL 输入和输出的格式选项

各种 Amazon Glue PySpark 和 Scala 方法和转换使用 format 参数和 format_options 参数指定其输入和/或输出格式。这些参数可以采用以下值。

注意

目前,流式处理 ETL 任务支持的唯一格式是 JSON、CSV、Parquet、ORC、Avro 和 Grok。

对于访问 Amazon Lake Formation 受管表的任务,Amazon Glue 支持读取和写入 Lake Formation 受管表支持的所有格式。有关当前 Amazon Lake Formation 受管表支持的格式列表,请参阅 Amazon Lake Formation 开发人员指南中的受管表的注释和限制

注意

对于写入 Apache Parquet,Amazon Glue ETL 仅支持为针对动态帧进行优化的自定义 Parquet 编写器类型指定选项来写入受管表。使用 parquet 格式写入受管表时,应在表参数中添加值为 useGlueParquetWriter 的键 true

format="avro"

此值指定 Apache Avro 数据格式。

您可以将以下 format_options 值与 format="avro" 结合使用:

  • version – 指定要支持的 Apache Avro 读取器/写入器格式的版本。默认值为“1.7”。您可以指定 format_options={"version": “1.8”} 以启用 Avro 逻辑类型读取和写入。有关更多信息,请参阅 Apache Avro 1.7.7 规范Apache Avro 1.8.2 规范

    Apache Avro 1.8 连接器支持以下逻辑类型转换:

对于读取器:此表显示 Avro 数据类型(逻辑类型和 Avro 基元类型)与 Avro 阅读器 1.7 和 1.8 的 Amazon Glue DynamicFrame 数据类型之间的转换。

Avro 数据类型:

逻辑类型

Avro 数据类型:

Avro 基元类型

GlueDynamicFrame 数据类型:

Avro 读取器 1.7

GlueDynamicFrame 数据类型:

Avro 读取器 1.8

小数 bytes BINARY 小数
小数 固定 BINARY 小数
日期 int INT 日期
时间(毫秒) int INT INT
时间(微秒) long LONG LONG
时间戳(毫秒) long LONG 时间戳
时间戳(微秒) long LONG LONG
持续时间(不是逻辑类型) 固定为 12 BINARY BINARY

对于写入器:此表显示 Avro 写入器 1.7 和 1.8 在 Amazon Glue DynamicFrame 数据类型与 Avro 数据类型之间的转换。

Amazon Glue DynamicFrame 数据类型 Avro 数据类型:

Avro 写入器 1.7

Avro 数据类型:

Avro 写入器 1.8

小数 字符串 十进制
日期 字符串 date
时间戳 字符串 timestamp-micros

format="csv"

此值指定 comma-separated-values 作为数据格式 (例如,请参阅 RFC 4180RFC 7111)。

您可以将以下 format_options 值与 format="csv" 结合使用:

  • separator – 指定分隔符。默认值为逗号 ",",但也可以指定任何其他字符。

  • escaper – 指定要用于转义的字符。此选项仅在读取 CSV 文件时使用。默认值为 none。如果启用,则按原样使用紧跟其后的字符,一小组已知的转义符(\n\r\t\0)除外。

  • quoteChar – 指定要用于引用的字符。默认值为双引号:'"'。将这设置为 -1 可完全关闭引用。

  • multiLine – 指定单个记录能否跨越多行的布尔值。当字段包含带引号的换行符时,会出现此选项。如果有记录跨越多个行,您必须将此选项设置为 True。默认值为 False,它允许在分析过程中更积极地拆分文件。

  • withHeader – 指定是否将第一行视为标题的布尔值。默认值为 False。可以在 DynamicFrameReader 类中使用此选项。

  • writeHeader – 指定是否将标题写入输出的布尔值。默认值为 True。可以在 DynamicFrameWriter 类中使用此选项。

  • skipFirst – 指定是否跳过第一个数据行的布尔值。默认值为 False

  • optimizePerformance – 指定是否使用高级 SIMD CSV 读取器以及基于 Apache Arrow 的列式内存格式的布尔值。仅适用于 Amazon Glue 3.0。

以下示例演示了如何在 Amazon Glue ETL 任务脚本内指定格式选项。

glueContext.write_dynamic_frame.from_options( frame = datasource1, connection_type = "s3", connection_options = { "path": "s3://s3path" }, format = "csv", format_options={ "quoteChar": -1, "separator": "|" }, transformation_ctx = "datasink2")

结合使用矢量化 SIMD CSV 读取器与 Apache Arrow 列式格式

Amazon Glue 版本 3.0 支持将 Apache Arrow 用作内存列式格式,允许以列式方式批处理记录。批处理可减少与处理跨记录批处理更改模式相关的 CPU 瓶颈,并提高从列缓冲区检索数据的效率。Amazon Glue 使用 CPU SIMD 指令集和微并行算法矢量化 CSV 读取器。通过这些优化,与使用基于行的 Amazon Glue DynamicFrame 或 Spark DataFrames 相比,Amazon Glue 版本 3.0 实现显著的性能提升。

要使用 Arrow 格式的优化读取器,请在 format_options 或表属性中将“optimizePerformance”设为 true。

glueContext.create_dynamic_frame.from_options( frame = datasource1, connection_type = "s3", connection_options = {"paths": ["s3://s3path"]}, format = "csv", format_options={ "optimizePerformance": True, "separator": "," }, transformation_ctx = "datasink2")

矢量化 CSV 读取器的限制

请注意以下限制:

  • 它不支持 multiLineescaper 格式选项。它使用默认双引号字符“"”的 escaper。设置这些选项后,Amazon Glue 会自动回退使用基于行的 CSV 读取器。

  • 它不支持创建具有 ChoiceType 的 DynamicFrame。

  • 它不支持创建具有错误记录的 DynamicFrame。

  • 它不支持读取带多字节字符(如日语或中文字符)的 CSV 文件。

format="ion"

此值指定 Amazon Ion 作为数据格式。(有关更多信息,请参阅 Amazon Ion 规范。)

目前,Amazon Glue 不支持以 ion 格式输出。

没有适用于 format_optionsformat="ion" 值。

format="grokLog"

此值指定由一个或多个 Logstash grok 模式指定的日志数据格式(例如,请参阅 Logstash 参考 [6.2]:Grok 筛选器插件)。

目前,Amazon Glue 不支持以 groklog 格式输出。

您可以将以下 format_options 值与 format="grokLog" 结合使用:

  • logFormat – 指定与日志的格式匹配的 Grok 模式。

  • customPatterns – 指定在此处使用的其他 Grok 模式。

  • MISSING – 指定用于标识缺失值的信号。默认为 '-'

  • LineCount – 指定每个日志记录中的行数。默认值为 '1',并且目前仅支持单行记录。

  • StrictMode – 指定是否启用严格模式的布尔值。在严格模式下,读取器不会执行自动类型转换或恢复。默认值为 "false"

format="json"

此值指定 JSON (JavaScript 对象表示法) 数据格式。

您可以将以下 format_options 值与 format="json" 结合使用:

  • jsonPathJsonPath 表达式,标识要读取到记录中的对象。当文件包含嵌套在外部数组内的记录时,此表达式尤其有用。例如,以下 JsonPath 表达式面向 JSON 对象的 id 字段。

    format="json", format_options={"jsonPath": "$.id"}
  • multiLine – 指定单个记录能否跨越多行的布尔值。当字段包含带引号的换行符时,会出现此选项。如果有记录跨越多个行,您必须将此选项设置为 "true"。默认值为 "false",它允许在分析过程中更积极地拆分文件。

format="orc"

此值指定 Apache ORC 作为数据格式。(有关更多信息,请参阅 LanguageManual ORC。)

没有适用于 format_optionsformat="orc" 值。不过,基础 SparkSQL 代码所接受的任何选项均可通过 connection_options 映射参数传递给它。

format="parquet"

此值指定 Apache Parquet 作为数据格式,但也提供了选项来使用针对动态帧进行优化的自定义 Parquet 编写器类型。编写前不需要预先计算的架构。使用 useGlueParquetWriter 选项指定时,编写器会在数据传入时,动态计算和修改架构。

您可以将以下 format_options 值:

  • useGlueParquetWriter – 指定使用针对动态帧进行优化的自定义 Parquet 编写器类型。输出格式为 Parquet。

请参阅以下示例,其中 sinkglue_context.getSink() 返回的对象。

sink.setFormat("parquet", useGlueParquetWriter=True)

指定 useGlueParquetWriter 时的限制:

  • 编写器仅支持架构发展,例如添加或删除列,但不支持更改列类型,例如使用 ResolveChoice

  • 编写器无法存储仅架构文件。

  • 选项只能作为数据目标的格式传递。

此外,基础 SparkSQL 代码所接受的任何选项均可通过 connection_options 映射参数传递给此格式。例如,可以为 Amazon Glue Spark 读取器设置 Spark 配置(如 mergeSchema),以合并所有文件的架构。

  • compression – 指定在编写 Parquet 文件时使用的压缩编解码器。采用 glueparquet 格式的压缩编解码器完全兼容 org.apache.parquet.hadoop.metadata.CompressionCodecName *,支持 "uncompressed""snappy""gzip""lzo"。默认值为 "snappy"

  • blockSize – 指定内存中缓冲的行组的字节大小(以 MB 为单位)。默认值为 134217728 或 128MB。

  • pageSize – 指定必须完全读取以访问单个记录的最小单位的字节大小。默认值为 1048576 或 1MB。

format="xml"

此值指定 XML 作为数据格式,通过 Apache Spark 的 XML 数据源分析器分支对其进行分析。

目前,Amazon Glue 不支持以“xml”格式输出。

您可以将以下 format_options 值与 format="xml" 结合使用:

  • rowTag – 指定文件中要视为行的 XML 标签。行标签不能自结束。

  • encoding – 指定字符编码。默认值为 "UTF-8"

  • excludeAttribute – 指定是否要排除元素中的属性的布尔值。默认值为 "false"

  • treatEmptyValuesAsNulls – 指定是否将空格视为空值的布尔值。默认值为 "false"

  • attributePrefix – 用于将属性与元素区分开来的属性的前缀。此前缀用于字段名称。默认值为 "_"

  • valueTag – 在元素中具有没有子项的属性时用于值的标签。默认为 "_VALUE"

  • ignoreSurroundingSpaces – 指定是否应忽略值周围的空格的布尔值。默认值为 "false"

  • withSchema – 包含预期架构的字符串值。如果您不使用此选项,Amazon Glue 会推断 XML 数据中的架构。

示例

此示例使用 withSchema 格式选项来指定 XML 数据的架构。

schema = StructType([ Field("id", IntegerType()), Field("name", StringType()), Field("nested", StructType([ Field("x", IntegerType()), Field("y", StringType()), Field("z", ChoiceType([IntegerType(), StringType()])) ])) ]) datasource0 = create_dynamic_frame_from_options( connection_type, connection_options={"paths": ["s3://xml_bucket/someprefix"]}, format="xml", format_options={"withSchema": json.dumps(schema.jsonValue())}, transformation_ctx = "" )