在 Data Catalog 中通过 AWS Glue ETL 作业创建表、更新架构和添加新分区 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Data Catalog 中通过 AWS Glue ETL 作业创建表、更新架构和添加新分区

您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。您的数据集框架可以从 AWS Glue Data Catalog 框架。 AWS GlueETL作业现在提供了几个功能,您可以在ETL脚本中使用这些功能来更新您的方案和分区 Data Catalog. 这些功能允许您在 Data Catalog 中查看 ETL 工作的结果,而无需重新运行爬网程序。

新分区

如果要在 AWS Glue 数据目录 中查看新分区,可以执行以下操作之一:

  • 在作业完成后,重新运行爬网程序,并在爬网程序完成后,在控制台上查看新分区。

  • 一旦作业完成,即可在控制台上查看新分区,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码使用 enableUpdateCatalog 参数指示 Data Catalog 在作业运行期间将作为新分区进行更新。

方法 1

在选项参数中传递 enableUpdateCatalogpartitionKeys

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options) sink.writeDynamicFrame(df)
方法 2

合格 enableUpdateCatalogpartitionKeys 英寸 getSink()、和呼叫 setCatalogInfo()DataSink 对象。

Python
sink = glueContext.getSink(connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

现在,您可以使用 AWS Glue ETL 作业本身创建新的目录表、使用修改的架构更新现有表,并在 Data Catalog 中添加新的表分区,而无需重新运行爬网程序。

更新表架构

如果要覆盖 Data Catalog 表的架构,可以执行以下操作之一:

  • 作业完成后,重新运行爬网程序,并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时,在查控制台上看新分区以及任何架构更新。有关更多信息,请参阅使用 API 配置爬网程序

  • 一旦作业完成,即可在控制台上查看修改的架构,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码将 enableUpdateCatalog 设置为 true,也将 updateBehavior 设置为 UPDATE_IN_DATABASE,这表示在作业运行期间将覆盖架构并在 Data Catalog 中添加新分区。

Python
additionalOptions = {"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map("path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

如果您希望防止表架构被覆盖,但仍希望添加新分区,也可以将该 updateBehavior 值设置为 LOGupdateBehavior 的默认值为 UPDATE_IN_DATABASE,因此,如果您没有明确定义它,则表架构将被覆盖。

如果 enableUpdateCatalog 未设置为 true,无论为 updateBehavior 选择哪个选项,ETL 作业都不会更新 Data Catalog 中的表。

创建新表

还可以使用相同的选项在 Data Catalog 中创建新表。您可以使用 setCatalogInfo 指定数据库和新表名。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map("path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", options = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Restrictions

请注意以下限制:

  • 仅支持 Amazon Simple Storage Service (Amazon S3) 目标。

  • 仅支持以下格式: json, csv, avro,和 glueparquet.

  • 使用创建的或更新的表 glueparquet 分类不能用作其他作业的数据源。

  • updateBehavior 设置为 LOG 时,只有当 DynamicFrame 架构等效于或包含在 Data Catalog 表的架构中定义的列子集时,才会添加新分区。

  • 您的 partitionKeys 在ETL脚本中传递的参数和 partitionKeys 您的 Data Catalog 表框架。

  • 此功能目前尚不支持更新/创建将更新框架嵌套到其中的表(例如,结构内部的数组)。

有关更多信息,请参阅ETL 脚本编程