在数据目录中通过 Amazon Glue ETL 任务创建表、更新架构和添加新分区 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

在数据目录中通过 Amazon Glue ETL 任务创建表、更新架构和添加新分区

您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。随着时间推移,您的数据集架构可能演变和偏离 Amazon Glue 数据目录架构。Amazon GlueETL 任务现在提供了几个功能,您可以在 ETL 脚本中使用这些功能在数据目录中更新架构和分区。这些功能允许您在数据目录中查看 ETL 工作的结果,而无需重新运行爬网程序。

新分区

如果要在 Amazon Glue Data Catalog 中查看新分区,可以执行以下操作之一:

  • 在作业完成后,重新运行爬网程序,并在爬网程序完成后,在控制台上查看新分区。

  • 一旦作业完成,即可在控制台上查看新分区,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码使用 enableUpdateCatalog 参数指示数据目录在任务运行期间将随着新分区的创建而更新。

方法 1

在选项参数中传递 enableUpdateCatalogpartitionKeys

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
方法 2

getSink() 中传递 enableUpdateCatalogpartitionKeys,并对 DataSink 对象调用 setCatalogInfo()

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

现在,您可以使用 Amazon Glue ETL 任务本身创建新的目录表、使用修改的架构更新现有表,并在数据目录中添加新的表分区,而无需重新运行爬网程序。

更新表架构

如果要覆盖数据目录表的架构,可以执行以下操作之一:

  • 作业完成后,重新运行爬网程序,并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时,在查控制台上看新分区以及任何架构更新。有关更多信息,请参阅使用 API 配置爬网程序

  • 一旦作业完成,即可在控制台上查看修改的架构,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码将 enableUpdateCatalog 设置为 true,也将 updateBehavior 设置为 UPDATE_IN_DATABASE,这表示在任务运行期间将覆盖架构并在数据目录中添加新分区。

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

如果您希望防止表架构被覆盖,但仍希望添加新分区,也可以将该 updateBehavior 值设置为 LOGupdateBehavior 的默认值为 UPDATE_IN_DATABASE,因此,如果您没有明确定义它,则表架构将被覆盖。

如果 enableUpdateCatalog 未设置为 true,无论为 updateBehavior 选择哪个选项,ETL 任务都不会更新数据目录中的表。

创建新表

还可以使用相同的选项在数据目录中创建新表。您可以使用 setCatalogInfo 指定数据库和新表名。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

限制

请注意以下限制:

  • 仅支持 Amazon Simple Storage Service(Amazon S3)目标。

  • 仅支持以下格式:jsoncsvavroparquet

  • 要使用 parquet 分类创建或更新表,您必须使用 DynamicFrames 针对 Amazon Glue 优化的 Parquet 写入器。这可以通过以下三种方式之一实现:

    • 调用 write_dynamic_frame_from_catalog(),然后在您要更新的表中将 useGlueParquetWriter 表属性设置为 true.

    • 使用 connection_type="s3" 在脚本中调用 getSink(),然后将格式设置为 glueparquet

    • 使用 connection_type="s3" 在脚本中调用 getSink(),然后将格式设置为 parquet,在您的 format_options 中传递 useGlueParquetWriter 属性为 true,这对于创建新的 Parquet 表尤其有用。

  • updateBehavior 设置为 LOG 时,只有当 DynamicFrame 架构等效于或包含在数据目录表的架构中定义的列子集时,才会添加新分区。

  • 在 ETL 脚本中传递的参数与数据目录表架构中的 partitionKey 之间,您的 partitionKeys 必须是等效的且顺序相同。

  • 此功能目前尚不支持更新/创建嵌套更新架构的表(例如,结构内的数组)。

有关更多信息,请参阅 ETL 脚本编程