使用 Amazon Glue ETL 任务在 Data Catalog 中更新架构并添加新分区 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon Glue ETL 任务在 Data Catalog 中更新架构并添加新分区

您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。随着时间推移,您的数据集架构可能演变和偏离 Amazon Glue 数据目录架构。Amazon GlueETL 任务现在提供了几个功能,您可以在 ETL 脚本中使用这些功能在数据目录中更新架构和分区。这些功能允许您在数据目录中查看 ETL 工作的结果,而无需重新运行爬网程序。

新分区

如果要在 Amazon Glue Data Catalog 中查看新分区,可以执行以下操作之一:

  • 在作业完成后,重新运行爬网程序,并在爬网程序完成后,在控制台上查看新分区。

  • 一旦作业完成,即可在控制台上查看新分区,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码使用 enableUpdateCatalog 参数指示数据目录在任务运行期间将随着新分区的创建而更新。

方法 1:

在选项参数中传递 enableUpdateCatalogpartitionKeys

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
方法 2:

getSink() 中传递 enableUpdateCatalogpartitionKeys,并对 DataSink 对象调用 setCatalogInfo()

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

现在,您可以使用 Amazon Glue ETL 任务本身创建新的目录表、使用修改的架构更新现有表,并在数据目录中添加新的表分区,而无需重新运行爬网程序。

更新表架构

如果要覆盖数据目录表的架构,可以执行以下操作之一:

  • 作业完成后,重新运行爬网程序,并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时,在查控制台上看新分区以及任何架构更新。有关更多信息,请参阅使用 API 配置爬网程序

  • 一旦作业完成,即可在控制台上查看修改的架构,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码将 enableUpdateCatalog 设置为 true,也将 updateBehavior 设置为 UPDATE_IN_DATABASE,这表示在任务运行期间将覆盖架构并在数据目录中添加新分区。

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

如果您希望防止表架构被覆盖,但仍希望添加新分区,也可以将该 updateBehavior 值设置为 LOGupdateBehavior 的默认值为 UPDATE_IN_DATABASE,因此,如果您没有明确定义它,则表架构将被覆盖。

如果 enableUpdateCatalog 未设置为 true,无论为 updateBehavior 选择哪个选项,ETL 任务都不会更新数据目录中的表。

创建新表

还可以使用相同的选项在数据目录中创建新表。您可以使用 setCatalogInfo 指定数据库和新表名。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

限制

请注意以下限制:

  • 仅支持 Amazon Simple Storage Service(Amazon S3)目标。

  • 受管辖的表不支持 enableUpdateCatalog 功能。

  • 仅支持以下格式:jsoncsvavroparquet

  • 要使用 parquet 分类创建或更新表,您必须使用 DynamicFrames 针对 Amazon Glue 优化的 Parquet 写入器。这可以通过以下一种方法实现:

    • 如果您要使用 parquet 分类更新目录中的现有表,则必须先将表的 "useGlueParquetWriter" 表属性设置为 true,然后才能对其进行更新。您可以通过 Amazon Glue API/SDK、控制台或 Athena DDL 语句设置此属性。

      Amazon Glue 控制台中的目录表属性编辑字段。

      设置目录表属性后,您可以使用以下代码片段使用新数据更新目录表:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • 如果目录中尚不存在该表,则可以在脚本中使用 getSink() 方法与 connection_type="s3" 将表及其分区添加到目录中,同时将数据写入 Amazon S3。为您的工作流程提供适当的 partitionKeyscompression

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • glueparquet 格式值是启用 Amazon Glue Parquet 写入器的传统方法。

  • updateBehavior 设置为 LOG 时,只有当 DynamicFrame 架构等效于或包含在数据目录表的架构中定义的列子集时,才会添加新分区。

  • 非分区表不支持架构更新(未使用“partitionKeys”选项)。

  • 在 ETL 脚本中传递的参数与数据目录表架构中的 partitionKey 之间,您的 partitionKeys 必须是等效的且顺序相同。

  • 此功能目前尚不支持更新/创建嵌套更新架构的表(例如,结构内的数组)。

有关更多信息,请参阅 Spark 脚本编程