将传入记录路由到不同的 Iceberg 表 - Amazon Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将传入记录路由到不同的 Iceberg 表

Firehose 可以根据记录的内容将流中的传入记录路由到不同的 Iceberg 表。例如,考虑以下示例输入记录。

{ "deviceId": "Device1234", "timestamp": "2024-11-28T11:30:00Z", "data": { "temperature": 21.5, "location": { "latitude": 37.3324, "longitude": -122.0311 } }, "powerlevel": 84, "status": "online" }
{ "deviceId": "Device4567", "timestamp": "2023-11-28T10:40:00Z", "data": { "pressure": 1012.4, "location": { "zipcode": 24567 } }, "powerlevel": 82, "status": "online" }

在此示例中,deviceId 字段有两个可能的值 – Device1234Device4567。当传入记录的deviceId字段为Device1234,我们要将记录写入名为 Iceberg 表中Device1234,而当传入记录的deviceId字段为Device4567,我们想要将记录写入名为Device4567

请注意,带有 Device1234Device4567 的记录可能有一组不同的字段,这些字段映射到相应的 Iceberg 表中的不同列。传入的记录可能具有嵌套的 JSON 结构,其中 deviceId 可以嵌套在 JSON 记录中。在接下来的几节中,我们将讨论在这种情况下如何通过向 Firehose 提供适当的路由信息将记录路由到不同的表。

使用表达式向 Firehose 提供路由信息 JSONQuery

向 Firehose 提供记录路由信息的最简单、最具成本效益的方法是 JSONQuery 提供表达式。使用这种方法,您可以为三个参数提供 JSONQuery 表达式 — Database Name Table Name、和(可选)Operation。Firehose 使用您提供的表达式从传入的流记录中提取信息以路由记录。

Database Name参数指定目标数据库的名称。该Table Name参数指定目标表的名称。 Operation是一个可选参数,用于指示是将传入的流记录作为新记录插入目标表,还是修改或删除目标表中的现有记录。“操作”字段必须具有以下值之一 – insertupdatedelete

对于这三个参数中的每一个,您可以提供静态值或动态表达式,在其中从传入的记录中检索值。例如,如果要将所有传入的流记录传输到名为 IoTevents 的单个数据库,则数据库名称的静态值将为 “IoTevents”。如果必须从传入记录中的字段中获取目标表名,则表名是一个动态表达式,用于指定传入记录中需要从中检索目标表名的字段。

在以下示例中,我们为数据库名称使用静态值,为表名使用动态值,为操作使用静态值。请注意,指定操作是可选项。如果未指定任何操作,则 Firehose 会默认将传入的记录作为新记录插入到目标表中。

Database Name : "IoTevents" Table Name : .deviceId Operation : "insert"

如果deviceId字段嵌套在 JSON 记录中,则我们将具有嵌套字段信息的表名指定为.event.deviceId

注意
  • 当您将操作指定为updatedelete,则必须在设置 Firehose 流时为目标表指定唯一的键,或者在 Iceberg 中运行创建表或更改表操作时在 Iceberg 中运行创建表或更改表操作时identifier-field-ids在 Iceberg 中运行创建表或更改表操作时在 Iceberg 中设置。如果您未指定此项,则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。

  • Database NameTable Name 值必须与目标数据库和表名完全匹配。如果它们不匹配,则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。

使用 Amazon Lambda 函数提供路由信息

在某些情况下,您可能有复杂的规则来决定如何将传入记录路由到目标表。例如,您可能有一个规则来定义字段是否包含值 A、B 或 F,该规则应路由到名为 TableX 的目标表,或者您可能希望通过添加其他属性来增加传入的流记录。例如,如果一条记录包含的字段device_id为 1,则可能需要添加另一个 “modem” 字段,并将该附加字段写入目标表列。device_type在这种情况下,您可以使用 Firehose 中的 Amazon Lambda 函数转换源流,并提供路由信息作为 Lambda 转换函数输出的一部分。要了解如何使用 Firehose 中的 Amazon Lambda 函数转换源流,请参阅在 Amazon Data Firehose 中转换源数据

当您使用 Lambda 在 Firehose 中转换源流时,输出必须包含recordIdresult、和或参数。data KafkaRecordValue参数recordId包含输入流记录,result指示转换是否成功,并data包含 Lambda 函数的 Base64 编码的转换输出。有关更多信息,请参阅 数据转换所需的参数

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1" }

要向 Firehose 指定如何将流记录路由到目标表的路由信息,作为 Lambda 函数的一部分,Lambda 函数的输出必须包含 metadata 的附加部分。以下示例显示了如何将元数据部分添加到 Firehose 流的 Lambda 输出中,该流使用 Kinesis Data Streams 作为数据来源,指示 Firehose 必须将该记录作为新记录插入到数据库 IoTevents 的名为 Device1234 的表中。

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

同样,以下示例显示了如何将元数据部分添加到 Firehose 的 Lambda 输出中,该Firehose 会使用Apache Firehose 的 Managed Streaming 作为数据来源,指示 Firehose 必须将该记录作为新记录插入到数据库中命名的表中。Device1234 IoTevents

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

对于这个示例,

  • destinationDatabaseName 指的是目标数据库的名称,是必填字段。

  • destinationTableName 指的是目标表的名称,是必填字段。

  • operation 是一个可选字段,可能的值为 insertupdatedelete。如果您未指定任何值,则默认操作为 insert

注意
  • 当您将操作指定为updatedelete,则必须在设置 Firehose 流时为目标表指定唯一的键,或者在 Iceberg 中运行创建表或更改表操作时在 Iceberg 中运行创建表或更改表操作时identifier-field-ids在 Iceberg 中运行创建表或更改表操作时在 Iceberg 中设置。如果您未指定此项,则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。

  • Database NameTable Name 值必须与目标数据库和表名完全匹配。如果它们不匹配,则 Firehose 会抛出错误并将数据传输到 S3 错误存储桶。

  • 当您的 Firehose 流既有 Lambda 转换函数又有 Lambda 转换函数又有表达式时 JSONQuery ,Firehose 会首先检查 Lambda 输出中的元数据字段,以确定如何将记录路由到相应的目标表,然后查看表达式的输出中是否有缺失的字段。 JSONQuery

    如果 Lambda 或 JSONQuery 表达式未提供所需的路由信息,则 Firehose 会将其视为单表场景,并在唯一键配置中查找单表信息。

    有关更多信息,请参阅将传入记录路由到单个 Iceberg 表。如果 Firehose 无法确定路由信息并将记录与指定的目标表进行匹配,则会将数据传输到您指定的 S3 错误存储桶。

示例 Lambda 函数

此 Lambda 函数是示例 Python 代码,用于解析传入的流记录并添加必填字段,以指定应如何将数据写入特定表。您可以使用此示例代码添加用于路由信息的元数据部分。

import json import base64 def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']) firehose_records_output = {} firehose_records_output['records'] = [] for firehose_record_input in firehose_records_input['records']: # Get payload from Lambda input, it could be different with different sources if 'kafkaRecordValue' in firehose_record_input: payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8') else payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') # perform data processing on customer payload bytes here # Create output with proper record ID, output data (may be different with different sources), result, and metadata firehose_record_output = {} if 'kafkaRecordValue' in firehose_record_input: firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8')) else firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8')) firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' firehose_record_output['metadata'] = { 'otfMetadata': { 'destinationDatabaseName': 'your_destination_database', 'destinationTableName': 'your_destination_table', 'operation': 'insert' } } firehose_records_output['records'].append(firehose_record_output) return firehose_records_output