

# 使用 Amazon Glue ETL 任务在 Data Catalog 中更新架构并添加新分区
<a name="update-from-job"></a>

您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。随着时间推移，您的数据集架构可能演变和偏离 Amazon Glue 数据目录架构。Amazon GlueETL 任务现在提供了几个功能，您可以在 ETL 脚本中使用这些功能在数据目录中更新架构和分区。这些功能允许您在数据目录中查看 ETL 工作的结果，而无需重新运行爬网程序。

## 新分区
<a name="update-from-job-partitions"></a>

如果要在 Amazon Glue Data Catalog 中查看新分区，可以执行以下操作之一：
+ 在作业完成后，重新运行爬网程序，并在爬网程序完成后，在控制台上查看新分区。
+ 一旦作业完成，即可在控制台上查看新分区，而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能，如以下示例中所示。代码使用 `enableUpdateCatalog` 参数指示数据目录在任务运行期间将随着新分区的创建而更新。

**方法 1：**  
在选项参数中传递 `enableUpdateCatalog` 和 `partitionKeys`。  

```
additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["region", "year", "month", "day"]


sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>,
                                                    table_name=<target_table_name>, transformation_ctx="write_sink",
                                                    additional_options=additionalOptions)
```

```
val options = JsonOptions(Map(
    "path" -> <S3_output_path>, 
    "partitionKeys" -> Seq("region", "year", "month", "day"), 
    "enableUpdateCatalog" -> true))
val sink = glueContext.getCatalogSink(
    database = <target_db_name>, 
    tableName = <target_table_name>, 
    additionalOptions = options)sink.writeDynamicFrame(df)
```

**方法 2：**  
在 `getSink()` 中传递 `enableUpdateCatalog` 和 `partitionKeys`，并对 `DataSink` 对象调用 `setCatalogInfo()`。  

```
sink = glueContext.getSink(
    connection_type="s3", 
    path="<S3_output_path>",
    enableUpdateCatalog=True,
    partitionKeys=["region", "year", "month", "day"])
sink.setFormat("json")
sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>)
sink.writeFrame(last_transform)
```

```
val options = JsonOptions(
   Map("path" -> <S3_output_path>, 
       "partitionKeys" -> Seq("region", "year", "month", "day"), 
       "enableUpdateCatalog" -> true))
val sink = glueContext.getSink("s3", options).withFormat("json")
sink.setCatalogInfo(<target_db_name>, <target_table_name>)
sink.writeDynamicFrame(df)
```

现在，您可以使用 Amazon Glue ETL 任务本身创建新的目录表、使用修改的架构更新现有表，并在数据目录中添加新的表分区，而无需重新运行爬网程序。

## 更新表架构
<a name="update-from-job-updating-table-schema"></a>

如果要覆盖数据目录表的架构，可以执行以下操作之一：
+ 作业完成后，重新运行爬网程序，并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时，在查控制台上看新分区以及任何架构更新。有关更多信息，请参阅[使用 API 配置爬网程序](https://docs.amazonaws.cn/glue/latest/dg/crawler-configuration.html#crawler-configure-changes-api)。
+ 一旦作业完成，即可在控制台上查看修改的架构，而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能，如以下示例中所示。代码将 `enableUpdateCatalog` 设置为 true，也将 `updateBehavior` 设置为 `UPDATE_IN_DATABASE`，这表示在任务运行期间将覆盖架构并在数据目录中添加新分区。

------
#### [ Python ]

```
additionalOptions = {
    "enableUpdateCatalog": True, 
    "updateBehavior": "UPDATE_IN_DATABASE"}
additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"]

sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>,
    table_name=<dst_tbl_name>, transformation_ctx="write_sink",
    additional_options=additionalOptions)
job.commit()
```

------
#### [ Scala ]

```
val options = JsonOptions(Map(
    "path" -> outputPath, 
    "partitionKeys" -> Seq("partition_0", "partition_1"), 
    "enableUpdateCatalog" -> true))
val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options)
sink.writeDynamicFrame(df)
```

------

如果您希望防止表架构被覆盖，但仍希望添加新分区，也可以将该 `updateBehavior` 值设置为 `LOG`。`updateBehavior` 的默认值为 `UPDATE_IN_DATABASE`，因此，如果您没有明确定义它，则表架构将被覆盖。

如果 `enableUpdateCatalog` 未设置为 true，无论为 `updateBehavior` 选择哪个选项，ETL 任务都不会更新数据目录中的表。

## 创建新表
<a name="update-from-job-creating-new-tables"></a>

还可以使用相同的选项在数据目录中创建新表。您可以使用 `setCatalogInfo` 指定数据库和新表名。

------
#### [ Python ]

```
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
    enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)
```

------
#### [ Scala ]

```
val options = JsonOptions(Map(
    "path" -> outputPath, 
    "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), 
    "enableUpdateCatalog" -> true, 
    "updateBehavior" -> "UPDATE_IN_DATABASE"))
val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>")
sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”)
sink.writeDynamicFrame(df)
```

------

## 限制
<a name="update-from-job-restrictions"></a>

请注意以下限制：
+ 仅支持 Amazon Simple Storage Service（Amazon S3）目标。
+ 受管辖的表不支持 `enableUpdateCatalog` 功能。
+ 仅支持以下格式：`json`、`csv`、`avro` 和 `parquet`。
+ 要使用 `parquet` 分类创建或更新表，您必须使用 DynamicFrames 针对 Amazon Glue 优化的 Parquet 写入器。这可以通过以下一种方法实现：
  + 如果您要使用 `parquet` 分类更新目录中的现有表，则必须先将表的 `"useGlueParquetWriter"` 表属性设置为 `true`，然后才能对其进行更新。您可以通过 Amazon Glue API/SDK、控制台或 Athena DDL 语句设置此属性。  
![\[Amazon Glue 控制台中的目录表属性编辑字段。\]](http://docs.amazonaws.cn/glue/latest/dg/images/edit-table-property.png)

    设置目录表属性后，您可以使用以下代码片段使用新数据更新目录表：

    ```
    glueContext.write_dynamic_frame.from_catalog(
        frame=frameToWrite,
        database="dbName",
        table_name="tableName",
        additional_options={
            "enableUpdateCatalog": True,
            "updateBehavior": "UPDATE_IN_DATABASE"
        }
    )
    ```
  + 如果目录中尚不存在该表，则可以在脚本中使用 `getSink()` 方法与 `connection_type="s3"` 将表及其分区添加到目录中，同时将数据写入 Amazon S3。为您的工作流程提供适当的 `partitionKeys` 和 `compression`。

    ```
    s3sink = glueContext.getSink(
        path="s3://bucket/folder/",
        connection_type="s3",
        updateBehavior="UPDATE_IN_DATABASE",
        partitionKeys=[],
        compression="snappy",
        enableUpdateCatalog=True
    )
        
    s3sink.setCatalogInfo(
        catalogDatabase="dbName", catalogTableName="tableName"
    )
        
    s3sink.setFormat("parquet", useGlueParquetWriter=True)
    s3sink.writeFrame(frameToWrite)
    ```
  + `glueparquet` 格式值是启用 Amazon Glue Parquet 写入器的传统方法。
+ 当 `updateBehavior` 设置为 `LOG` 时，只有当 `DynamicFrame` 架构等效于或包含在数据目录表的架构中定义的列子集时，才会添加新分区。
+ 非分区表不支持架构更新（未使用“partitionKeys”选项）。
+ 在 ETL 脚本中传递的参数与数据目录表架构中的 partitionKey 之间，您的 partitionKeys 必须是等效的且顺序相同。
+ 此功能目前尚不支持更新/创建嵌套更新架构的表（例如，结构内的数组）。

有关更多信息，请参阅 [Spark 脚本编程](aws-glue-programming.md)。