

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 设置 Firehose 流
<a name="apache-iceberg-stream"></a>

要创建以 Apache Iceberg 表作为目的地的 Firehose 流，您必须配置以下内容。

**注意**  
用于传输到 S3 表存储桶中的表的 Firehose 流的设置与 Amazon S3 中的 Apache Iceberg Tables 相同。

## 配置源位置和目的地
<a name="apache-iceberg-stream-source"></a>

要向 Apache Iceberg 表传输数据，请为您的流选择源。

要为流配置源，请参阅[配置源设置](configure-source.md)。

接下来，选择 **Apache Iceberg 表**作为目的地并提供 Firehose 流名称。

## 配置数据转换
<a name="apache-iceberg-stream-data-transform"></a>

要对数据执行自定义转换（例如，在传入流中添加或修改记录），您可以将 Lambda 函数添加到您的 Firehose 流中。有关在 Firehose 流中使用 Lambda 进行数据转换的更多信息，请参阅[在 Amazon Data Firehose 中转换源数据](data-transformation.md)。

对于 Apache Iceberg 表，您必须指定如何将传入记录路由到不同的目标表以及要执行的操作。向 Firehose 提供所需路由信息的其中一种方法是使用 Lambda 函数。

有关更多信息，请参阅[将记录路由到不同的 Iceberg 表](apache-iceberg-format-input-record.md)。

## 连接数据目录
<a name="apache-iceberg-stream-data-catalog"></a>

Apache Iceberg 需要数据目录才能写入 Apache Iceberg 表。Firehose 与 Apache Iceber Amazon Glue Data Catalog g Tables 集成。

你可以在与 Firehose 直播相同的账号 Amazon Glue Data Catalog 中使用，也可以在跨账号中使用，也可以在与 Firehose 直播相同的区域（默认）中使用，也可以在不同的区域中使用。

如果您要传输到 Amazon S3 表类数据存储服务，并且正在使用控制台设置 Firehose 流，请选择与您的 Amazon S3 表类数据存储服务目录相对应的目录。如果您使用 CLI 来设置 Firehose 流，那么在 `CatalogConfiguration` 输入中，请按以下格式使用 `CatalogARN`：`arn:aws:glue:<region>:<account-id>:catalog/s3tablescatalog/<s3 table bucket name>`。有关更多信息，请参阅[设置 Firehose 流到 Amazon S3 表类数据存储服务](https://docs.amazonaws.cn/AmazonS3/latest/userguide/s3-tables-integrating-firehose.html#firehose-stream-tables)。

**注意**  
Firehose 支持对 Iceberg 表执行三种操作：插入、更新和删除。如果没有指定操作，Firehose 默认为插入，将每条传入的记录添加为新行，并保留重复的记录。要改为修改现有记录，请指定“更新”操作，该操作使用主键来定位和更改现有行。  
示例：  
默认（插入）：多个相同的客户记录会创建重复的行。
指定更新：新的客户地址会更新现有记录。

## 配置 JQ 表达式
<a name="apache-iceberg-stream-jq-exp"></a>

对于 Apache Iceberg 表，您必须指定如何将传入记录路由到不同的目标表以及要执行的插入、更新和删除等操作。为此，您可以为 Firehose 配置 JQ 表达式，以解析并获取所需信息。有关更多信息，请参阅 [使用表达式向 Firehose 提供路由信息 JSONQuery](apache-iceberg-format-input-record-different.md#apache-iceberg-route-jq)。

## 配置唯一键
<a name="apache-iceberg-stream-unique-key"></a>

**使用多个表进行更新和删除**：唯一键是源记录中的一个或多个字段，用于唯一标识 Apache Iceberg 表中的一行。如果您有包含多个表的仅插入场景，则不必配置唯一键。如果您要对某些表进行更新和删除，则必须为这些必需的表配置唯一键。请注意，如果表中缺少行，则更新将自动插入该行。如果您只有一个表，则可以配置唯一键。对于更新操作，Firehose 会先放置一个删除文件，然后再执行插入操作。

[您可以在 Firehose 直播创建过程中为每个表配置唯一密钥，也可以在创建表或更改[表](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#create-table)操作期间在 Iceberg 中进行[identifier-field-ids](https://iceberg.apache.org/spec/#identifier-field-ids)本地设置。](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table-set-identifier-fields)在创建流期间为每个表配置唯一键是可选项。如果您在流创建期间没有为每个表配置唯一键，则 Firehose 会检查 `identifier-field-ids` 有无所需的表并将其用作唯一键。如果两者都未配置，则通过更新和删除操作传输数据将失败。

要配置此部分，请为要更新或删除数据的表提供数据库名称、表名称和唯一键。在配置中，每个表只能有一个条目。对于仅限附加的场景，您无需配置此部分。或者，如果表中的数据如以下示例所示无法传输，则您也可以选择提供错误存储桶前缀。

```
[
  {
    "DestinationDatabaseName": "MySampleDatabase",
    "DestinationTableName": "MySampleTable",
    "UniqueKeys": [
      "COLUMN_PLACEHOLDER"
    ],
    "S3ErrorOutputPrefix": "OPTIONAL_PREFIX_PLACEHOLDER"
  }
]
```

如果提供的列名在整个表中是唯一的，Firehose 支持配置唯一键。但是，它不支持将完全限定的列名作为唯一键。例如，如果列名 `_id` 也出现在顶层，则名为 `top._id` 的键不被视为唯一键。如果 `_id` 在整个表中是唯一的，那么无论它在表格结构中的位置如何，都会使用它。无论是顶级列还是嵌套列都是如此。在以下示例中，`_id` 是架构的有效唯一键，因为列名在架构中是唯一的。

```
[
 "schema": {
  "type": "struct",
  "fields": [
    {
      "name": "top",
      "type": {
        "type": "struct",
        "fields": [
          { "name": "_id", "type": "string" },
          { "name": "name", "type": "string" }
        ]
      }
    },
    { "name": "user", "type": "string" }
  ]
}
]
```

在以下示例中，`_id` 不是架构的有效唯一键，因为它既用于顶级列，也用于嵌套结构。

```
[
"schema": {
  "type": "struct",
  "fields": [
    {
      "name": "top",
      "type": {
        "type": "struct",
        "fields": [
          { "name": "_id", "type": "string" },
          { "name": "name", "type": "string" }
        ]
      }
    },
    { "name": "_id", "type": "string" }
  ]
}
]
```

## 指定重试持续时间
<a name="apache-iceberg-stream-retry-duration"></a>

您可以使用此配置来指定 Firehose 在写入 Amazon S3 中的 Apache Iceberg 表中遇到失败时应尝试重试的持续时间（以秒为单位）。您可以设置 0 到 7200 秒之间的任何值，以执行重试。默认情况下，Firehose 的重试时间为 300 秒。

## 处理失败的传输或处理
<a name="apache-iceberg-stream-failed-delivery"></a>

您必须将 Firehose 配置为将记录传输到 S3 备份存储桶，以防它在重试持续时间到期后遇到处理或传输数据流失败问题。为此，请在控制台的**备份设置**中配置 **S3 备份存储桶**和 **S3 备份存储桶错误输出前缀**。

## 处理错误
<a name="apache-iceberg-handle-errors"></a>

Firehose 将所有传送错误发送到 CloudWatch 日志和 Amazon S3 错误存储桶。

错误列表：


|  **错误消息**  |  ****描述****  | 
| --- | --- | 
|  `Iceberg.NoSuchTable`  |  Firehose 正在写入一个不存在的表，或者该表不是 V2 格式。Firehose 不支持 V1 格式的表。  | 
|  `Iceberg.InvalidTableName`  |  传递的表名为 null 或为空，或者表的格式不是 V2。Firehose 不支持 V1 格式的表。  | 
|  `S3.AccessDenied`  |  确保在先决条件步骤中创建的 IAM 角色具有所需的权限和信任策略。  | 
|  `Glue.AccessDenied`  |  确保在先决条件步骤中创建的 IAM 角色具有所需的权限和信任策略。  | 

## 配置缓冲区提示
<a name="apache-iceberg-stream-buffer"></a>

Firehose 将内存中传入的流数据缓冲到一定大小（**缓冲大小**），或缓冲一定时间（**缓冲时间间隔**）后再将其传输到 Apache Iceberg 表。您可以选择 1—128 的缓冲区大小 MiBs 和 0—900 秒的缓冲间隔。缓冲区提示越高，S3 写入次数越少，压缩成本越低（由于越大的数据文件），查询运行时速度更快，但延迟更高。较低的缓冲区提示值以较低的延迟传输数据。

## 配置高级设置
<a name="apache-iceberg-stream-advance-settings"></a>

您可以为 Apache Iceberg 表配置服务器端加密、错误日志记录、权限和标签。有关更多信息，请参阅 [配置高级设置](create-configure-advanced.md)。您必须添加作为 [使用 Apache Iceberg 表作为目的地的先决条件](apache-iceberg-prereq.md) 一部分而创建的 IAM 角色。Firehose 将担任访问 Amazon Glue 表和写入 Amazon S3 存储桶的角色。

可能需要几分钟的时间才能完成 Firehose 流创建。成功创建 Firehose 流后，您可以开始向其中摄取数据，并可以查看 Apache Iceberg 表中的数据。