

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将传入记录路由到不同的 Iceberg 表
<a name="apache-iceberg-format-input-record-different"></a>

Amazon Data Firehose 可以根据记录的内容将流中的传入记录路由到不同的 Iceberg 表。从 Amazon Data Firehose 传输记录时，记录不按顺序保存。请参阅以下示例输入记录。

```
{
  "deviceId": "Device1234",
  "timestamp": "2024-11-28T11:30:00Z",
  "data": {
    "temperature": 21.5,
    "location": {
      "latitude": 37.3324,
      "longitude": -122.0311
    }
  },
  "powerlevel": 84,
  "status": "online"
}
```

```
{
  "deviceId": "Device4567",
  "timestamp": "2023-11-28T10:40:00Z",
  "data": {
    "pressure": 1012.4,
    "location": {
      "zipcode": 24567
    }
  },
  "powerlevel": 82,
  "status": "online"
}
```

在此示例中，`deviceId` 字段有两个可能的值 – `Device1234` 和 `Device4567`。当传入记录的 `deviceId` 字段为 `Device1234` 时，我们要将记录写入名为 `Device1234` 的 Iceberg 表，而当传入记录的 `deviceId` 字段为 `Device4567`，我们想要将记录写入名为 `Device4567` 的表中。

请注意，带有 `Device1234` 和 `Device4567` 的记录可能有一组不同的字段，这些字段映射到相应的 Iceberg 表中的不同列。传入的记录可能具有嵌套的 JSON 结构，其中 **`deviceId`** 可以嵌套在 JSON 记录中。在接下来的几节中，我们将讨论在这种情况下如何通过向 Firehose 提供适当的路由信息将记录路由到不同的表。

## 使用表达式向 Firehose 提供路由信息 JSONQuery
<a name="apache-iceberg-route-jq"></a>

向 Firehose 提供记录路由信息的最简单、最具成本效益的方法是 JSONQuery 提供表达式。使用这种方法，您可以为三个参数提供 JSONQuery 表达式 — `Database Name` `Table Name`、和（可选）`Operation`。Firehose 使用您提供的表达式从传入的流记录中提取信息以路由记录。

`Database Name` 参数指定目标数据库的名称。`Table Name` 参数指定目标表的名称。`Operation` 是一个可选参数，用于指示是将传入的流记录作为新记录插入目标表，还是修改或删除目标表中的现有记录。“操作”字段必须具有以下值之一 – `insert`、`update` 或 `delete`。

对于这三个参数中的每一个，您可以提供静态值或动态表达式，在其中从传入的记录中检索值。例如，如果要将所有传入的流记录传输到名为 `IoTevents` 的单个数据库，则数据库名称的静态值将为 `“IoTevents”`。如果必须从传入记录中的字段中获取目标表名，则表名是一个动态表达式，用于指定传入记录中需要从中检索目标表名的字段。

在以下示例中，我们为数据库名称使用静态值，为表名使用动态值，为操作使用静态值。请注意，指定操作是可选项。如果未指定任何操作，则 Firehose 会默认将传入的记录作为新记录插入到目标表中。

```
Database Name : "IoTevents"
Table Name : .deviceId
Operation : "insert"
```

如果 `deviceId` 字段嵌套在 JSON 记录中，则我们将具有嵌套字段信息的表名指定为 `.event.deviceId`。

**注意**  
当您将操作指定为`update`或时`delete`，您必须在设置 Firehose 流时为目标表指定唯一键，或者在 Iceberg 中运行[创建表或[更改表](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table-set-identifier-fields)](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#create-table)操作时[identifier-field-ids](https://iceberg.apache.org/spec/#identifier-field-ids)在 Iceberg 中设置唯一的键。如果您未指定此项，则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。
`Database Name` 和 `Table Name` 值必须与目标数据库和表名完全匹配。如果它们不匹配，则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。

## 使用 Amazon Lambda 函数提供路由信息
<a name="apache-iceberg-route-lambda"></a>

在某些情况下，您可能有复杂的规则来决定如何将传入记录路由到目标表。例如，您可能有一个规则来定义字段是否包含值 A、B 或 F，该规则应路由到名为 `TableX` 的目标表，或者您可能希望通过添加其他属性来增加传入的流记录。例如，如果一条记录包含的字段 `device_id` 为 1，则可能需要添加另一个 `device_type` 为“modem”的字段，并将该附加字段写入目标表列。在这种情况下，您可以使用 Firehose 中的 Amazon Lambda 函数转换源流，并在 Lambda 转换函数的输出中提供路由信息。要了解如何使用 Firehose 中的 Amazon Lambda 函数转换源流，请参阅在 [Amazon Data Firehose 中转换源数据](data-transformation.md)。

当您使用 Lambda 在 Firehose 中转换源流时，输出必须包含 `recordId`、`result` 和 `data` 或 `KafkaRecordValue` 参数。参数 `recordId` 包含输入流记录，`result` 指示转换是否成功，`data` 包含 Lambda 函数的 Base64 编码的转换输出。有关更多信息，请参阅 [数据转换所需的参数](data-transformation-status-model.md)。

```
{
  "recordId": "49655962066601463032522589543535113056108699331451682818000000",
  "result": "Ok",
  "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1"
}
```

要向 Firehose 指定如何将流记录路由到目标表的路由信息，作为 Lambda 函数的一部分，Lambda 函数的输出必须包含 `metadata` 的附加部分。以下示例显示了如何将元数据部分添加到 Firehose 流的 Lambda 输出中，该流使用 Kinesis Data Streams 作为数据来源，指示 Firehose 必须将该记录作为新记录插入到数据库 `IoTevents` 的名为 `Device1234` 的表中。

```
 {
"recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    
    "metadata":{
"otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }
```

同样，以下示例显示了如何将元数据部分添加到 Firehose 的 Lambda 输出中，它使用 Amazon Managed Streaming for Apache Kafka 作为数据来源，指示 Firehose 必须将该记录作为新记录插入到数据库 `IoTevents` 中名为 `Device1234` 的表中。

```
{
"recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    
    "metadata":{
"otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }
```

对于这个示例，
+ `destinationDatabaseName` 指的是目标数据库的名称，是必填字段。
+ `destinationTableName` 指的是目标表的名称，是必填字段。
+ `operation` 是一个可选字段，可能的值为 `insert`、`update` 和 `delete`。如果您未指定任何值，则默认操作为 `insert`。

**注意**  
当您将操作指定为`update`或时`delete`，您必须在设置 Firehose 流时为目标表指定唯一键，或者在 Iceberg 中运行[创建表或[更改表](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table-set-identifier-fields)](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#create-table)操作时[identifier-field-ids](https://iceberg.apache.org/spec/#identifier-field-ids)在 Iceberg 中设置唯一的键。如果您未指定此项，则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。
`Database Name` 和 `Table Name` 值必须与目标数据库和表名完全匹配。如果它们不匹配，则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。
当您的 Firehose 流同时具有 Lambda 转换函数和表达式时 JSONQuery ，Firehose 会首先检查 Lambda 输出中的元数据字段，以确定如何将记录路由到相应的目标表，然后查看表达式的输出中是否有缺失的字段。 JSONQuery  
如果 Lambda 或 JSONQuery 表达式未提供所需的路由信息，则 Firehose 会将其视为单表场景，并在唯一密钥配置中查找单表信息。  
有关更多信息，请参阅[将传入记录路由到单个 Iceberg 表](apache-iceberg-format-input-record.md)。如果 Firehose 无法确定路由信息并将记录与指定的目标表进行匹配，则会将数据传输到您指定的 S3 错误存储桶。

### 示例 Lambda 函数
<a name="format-record-lambda"></a>

此 Lambda 函数是示例 Python 代码，用于解析传入的流记录并添加必填字段，以指定应如何将数据写入特定表。您可以使用此示例代码添加用于路由信息的元数据部分。

```
import json
import base64
 
 
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'])
 
    firehose_records_output = {}
    firehose_records_output['records'] = []
 
 
    for firehose_record_input in firehose_records_input['records']:
 
        # Get payload from Lambda input, it could be different with different sources
        if 'kafkaRecordValue' in firehose_record_input:
            payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8')
        else
            payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8')
 
        # perform data processing on customer payload bytes here
 
        # Create output with proper record ID, output data (may be different with different sources), result, and metadata
        firehose_record_output = {}
 
        if 'kafkaRecordValue' in firehose_record_input:
            firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8'))
        else
            firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8'))
        
        firehose_record_output['recordId'] = firehose_record_input['recordId']
        firehose_record_output['result'] =  'Ok'
        firehose_record_output['metadata'] = {
            'otfMetadata': {
                'destinationDatabaseName': 'your_destination_database',
                'destinationTableName': 'your_destination_table',
                'operation': 'insert'
                }
            }
        firehose_records_output['records'].append(firehose_record_output)
    return firehose_records_output
```