本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon Glue ETL 任务在 Data Catalog 中更新架构并添加新分区
您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。随着时间推移,您的数据集架构可能演变和偏离 Amazon Glue 数据目录架构。Amazon GlueETL 任务现在提供了几个功能,您可以在 ETL 脚本中使用这些功能在数据目录中更新架构和分区。这些功能允许您在数据目录中查看 ETL 工作的结果,而无需重新运行爬网程序。
新分区
如果要在 Amazon Glue Data Catalog 中查看新分区,可以执行以下操作之一:
-
在作业完成后,重新运行爬网程序,并在爬网程序完成后,在控制台上查看新分区。
-
一旦作业完成,即可在控制台上查看新分区,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码使用
enableUpdateCatalog
参数指示数据目录在任务运行期间将随着新分区的创建而更新。
- 方法 1:
-
在选项参数中传递
enableUpdateCatalog
和partitionKeys
。 - 方法 2:
-
在
getSink()
中传递enableUpdateCatalog
和partitionKeys
,并对DataSink
对象调用setCatalogInfo()
。
现在,您可以使用 Amazon Glue ETL 任务本身创建新的目录表、使用修改的架构更新现有表,并在数据目录中添加新的表分区,而无需重新运行爬网程序。
更新表架构
如果要覆盖数据目录表的架构,可以执行以下操作之一:
作业完成后,重新运行爬网程序,并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时,在查控制台上看新分区以及任何架构更新。有关更多信息,请参阅使用 API 配置爬网程序。
一旦作业完成,即可在控制台上查看修改的架构,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码将
enableUpdateCatalog
设置为 true,也将updateBehavior
设置为UPDATE_IN_DATABASE
,这表示在任务运行期间将覆盖架构并在数据目录中添加新分区。
如果您希望防止表架构被覆盖,但仍希望添加新分区,也可以将该 updateBehavior
值设置为 LOG
。updateBehavior
的默认值为 UPDATE_IN_DATABASE
,因此,如果您没有明确定义它,则表架构将被覆盖。
如果 enableUpdateCatalog
未设置为 true,无论为 updateBehavior
选择哪个选项,ETL 任务都不会更新数据目录中的表。
创建新表
还可以使用相同的选项在数据目录中创建新表。您可以使用 setCatalogInfo
指定数据库和新表名。
限制
请注意以下限制:
-
仅支持 Amazon Simple Storage Service(Amazon S3)目标。
-
受管辖的表不支持
enableUpdateCatalog
功能。 -
仅支持以下格式:
json
、csv
、avro
和parquet
。 -
要使用
parquet
分类创建或更新表,您必须使用 DynamicFrames 针对 Amazon Glue 优化的 Parquet 写入器。这可以通过以下一种方法实现:-
如果您要使用
parquet
分类更新目录中的现有表,则必须先将表的"useGlueParquetWriter"
表属性设置为true
,然后才能对其进行更新。您可以通过 Amazon Glue API/SDK、控制台或 Athena DDL 语句设置此属性。设置目录表属性后,您可以使用以下代码片段使用新数据更新目录表:
glueContext.write_dynamic_frame.from_catalog( frame=
frameToWrite
, database="dbName
", table_name="tableName
", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } ) -
如果目录中尚不存在该表,则可以在脚本中使用
getSink()
方法与connection_type="s3"
将表及其分区添加到目录中,同时将数据写入 Amazon S3。为您的工作流程提供适当的partitionKeys
和compression
。s3sink = glueContext.getSink( path="s3://
bucket/folder/
", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName
", catalogTableName="tableName
" ) s3sink.setFormat("parquet", useGlueParquetWriter=True) s3sink.writeFrame(frameToWrite
) -
glueparquet
格式值是启用 Amazon Glue Parquet 写入器的传统方法。
-
-
当
updateBehavior
设置为LOG
时,只有当DynamicFrame
架构等效于或包含在数据目录表的架构中定义的列子集时,才会添加新分区。 -
非分区表不支持架构更新(未使用“partitionKeys”选项)。
-
在 ETL 脚本中传递的参数与数据目录表架构中的 partitionKey 之间,您的 partitionKeys 必须是等效的且顺序相同。
-
此功能目前尚不支持更新/创建嵌套更新架构的表(例如,结构内的数组)。
有关更多信息,请参阅 Spark 脚本编程。