在 Data Catalog 中通过 AWS Glue ETL 作业创建表、更新架构和添加新分区 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

在 Data Catalog 中通过 AWS Glue ETL 作业创建表、更新架构和添加新分区

您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。随着时间推移,您的数据集架构可能演变和偏离 AWS Glue Data Catalog 架构。AWS Glue ETL 作业现在提供了几个功能,您可以在 ETL 脚本中使用这些功能在 Data Catalog 中更新架构和分区。这些功能允许您在 Data Catalog 中查看 ETL 工作的结果,而无需重新运行爬网程序。

新分区

如果要在 AWS Glue 数据目录 中查看新分区,可以执行以下操作之一:

  • 在作业完成后,重新运行爬网程序,并在爬网程序完成后,在控制台上查看新分区。

  • 一旦作业完成,即可在控制台上查看新分区,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码使用 enableUpdateCatalog 参数指示 Data Catalog 在作业运行期间将作为新分区进行更新。

方法 1

在选项参数中传递 enableUpdateCatalogpartitionKeys

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options) sink.writeDynamicFrame(df)
方法 2

getSink() 中传递 enableUpdateCatalogpartitionKeys,并对 DataSink 对象调用 setCatalogInfo()

Python
sink = glueContext.getSink(connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

现在,您可以使用 AWS Glue ETL 作业本身创建新的目录表、使用修改的架构更新现有表,并在 Data Catalog 中添加新的表分区,而无需重新运行爬网程序。

更新表架构

如果要覆盖 Data Catalog 表的架构,可以执行以下操作之一:

  • 作业完成后,重新运行爬网程序,并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时,在查控制台上看新分区以及任何架构更新。有关更多信息,请参阅使用 API 配置爬网程序

  • 一旦作业完成,即可在控制台上查看修改的架构,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码将 enableUpdateCatalog 设置为 true,也将 updateBehavior 设置为 UPDATE_IN_DATABASE,这表示在作业运行期间将覆盖架构并在 Data Catalog 中添加新分区。

Python
additionalOptions = {"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map("path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

如果您希望防止表架构被覆盖,但仍希望添加新分区,也可以将该 updateBehavior 值设置为 LOGupdateBehavior 的默认值为 UPDATE_IN_DATABASE,因此,如果您没有明确定义它,则表架构将被覆盖。

如果 enableUpdateCatalog 未设置为 true,无论为 updateBehavior 选择哪个选项,ETL 作业都不会更新 Data Catalog 中的表。

创建新表

还可以使用相同的选项在 Data Catalog 中创建新表。您可以使用 setCatalogInfo 指定数据库和新表名。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map("path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", options = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

限制

请注意以下限制:

  • 仅支持 Amazon Simple Storage Service (Amazon S3) 目标。

  • 仅支持以下格式:jsoncsvavroglueparquet

  • updateBehavior 设置为 LOG 时,只有当 DynamicFrame 架构等效于或包含在 Data Catalog 表的架构中定义的列子集时,才会添加新分区。

  • 在 ETL 脚本中传递的参数与 Data Catalog 表架构中的 partitionKey 之间,您的 partitionKeys 必须是等效的且顺序相同。

有关更多信息,请参阅 ETL 脚本编程