在数据目录中通过 AWS Glue ETL 作业创建表、更新架构和添加新分区 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在数据目录中通过 AWS Glue ETL 作业创建表、更新架构和添加新分区

您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。随着时间推移,您的数据集架构可能演变和偏离 AWS Glue 数据目录架构。AWS Glue ETL 作业现在提供了几个功能,您可以在 ETL 脚本中使用这些功能在数据目录中更新架构和分区。这些功能允许您在数据目录中查看 ETL 工作的结果,而无需重新运行爬网程序。

新分区

如果要在 AWS Glue 数据目录中查看新分区,可以执行以下操作之一:

  • 在作业完成后,重新运行爬网程序,并在爬网程序完成后,在控制台上查看新分区。

  • 一旦作业完成,即可在控制台上查看新分区,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。该代码使用enableUpdateCatalog参数指示在作业运行期间将作为新分区进行更新。

方法 1

在选项参数中传递 enableUpdateCatalogpartitionKeys

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options) sink.writeDynamicFrame(df)
方法 2

PassenableUpdateCatalogpartitionKeysingetSink(),然后调用setCatalogInfo()DataSink对象。

Python
sink = glueContext.getSink(connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

现在,您可以使用 AWS Glue ETL 作业本身创建新的目录表、使用修改的架构更新现有表,并在数据目录中添加新的表分区,而无需重新运行爬网程序。

更新表架构

如果要覆盖数据目录表的架构,可以执行以下操作之一:

  • 作业完成后,重新运行爬网程序,并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时,在查控制台上看新分区以及任何架构更新。有关更多信息,请参阅使用 API 配置爬网程序

  • 一旦作业完成,即可在控制台上查看修改的架构,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。该代码使用enableUpdateCatalog设置为 true,并且updateBehavior设置为UPDATE_IN_DATABASE,这表示在作业运行期间将覆盖架构并在数据目录中添加新分区。

Python
additionalOptions = {"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map("path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

如果您希望防止表架构被覆盖,但仍希望添加新分区,也可以将该 updateBehavior 值设置为 LOGupdateBehavior 的默认值为 UPDATE_IN_DATABASE,因此,如果您没有明确定义它,则表架构将被覆盖。

如果enableUpdateCatalog未设置为 true,无论为updateBehavior,ETL 作业将不会更新数据目录中的表。

创建新表

还可以使用相同的选项在数据目录中创建新表。您可以使用 setCatalogInfo 指定数据库和新表名。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map("path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", options = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Restrictions

请注意以下限制:

  • 仅支持 Amazon Simple Storage Service (Amazon S3) 目标。

  • 仅支持以下格式:jsoncsvavro, 和parquet

  • 要创建或更新使用parquet分类时,您必须使用针对 DynamicFrames 的 AWS Glue 优化拼花编写器。这可以通过以下三种方式之一实现:

    • Callwrite_dynamic_frame_from_catalog(),然后设置useGlueParquetWriter表属性设置为 true。

    • CallgetSink()在你的脚本中使用连接类型 =”s3”,然后将您的格式设置为glueparquet

    • CallgetSink()在你的脚本中使用连接类型 =”s3”,然后将您的格式设置为parquet并传递一个useGlueParquetWriter属性设置为 trueformat_options,这对于创建新的镶木地板表格尤其有用。

  • updateBehavior设置为LOG,则只有在DynamicFrame架构等效于或包含在数据目录表的架构中定义的列子集。

  • 在 ETL 脚本中传递的参数与数据目录表架构中的 partitionKey 之间,您的 partitionKeys 必须是等效的且顺序相同。

  • 此功能目前还不支持更新/创建嵌套更新模式的表(例如,结构体内的数组)。

有关更多信息,请参阅 ETL 脚本编程