在 Amazon Data Firehose 上使用 Apache Iceberg 表 - Amazon Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 Amazon Data Firehose 流传输到亚马逊 S3 中的 Apache Iceberg Tables 处于预览阶段,可能会发生变化。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon Data Firehose 上使用 Apache Iceberg 表

注意

向亚马逊 S3 中的 Apache Iceberg Tables 传输 Firehose 直播处于预览阶段。请勿将此功能用于生产工作负载。

Apache Iceberg 是一种用于执行大数据分析的高性能开源表格格式。Apache Iceberg 为亚马逊 S3 数据湖带来了SQL表格的可靠性和简单性,并使 Spark、Flink、Trino、Hive 和 Impala 等开源分析引擎可以同时处理相同的数据。有关 Apache Iceberg 的更多信息,请参阅。https://iceberg.apache.org/

你可以使用 Firehose 将流数据直接传送到亚马逊 S3 中的 Apache Iceberg Tables。使用此功能,您可以将记录从单个流路由到不同的 Apache 冰山表,并自动对 Apache Iceberg Tables 中的记录应用插入、更新和删除操作。此功能需要使用 Amazon Glue Data Catalog。

支持的区域和数据类型

  • Apache Iceberg Tables 已在美国东部(弗吉尼亚北部)、美国西部(俄勒冈)、欧洲(爱尔兰)、亚太地区(东京)、加拿大(中部)和亚太地区(悉尼)上市, Amazon Web Services 区域 以供预览。

  • Firehose 支持 Apache Iceberg 支持的所有原始和复杂数据类型。有关更多信息,请参阅架构和数据类型

先决条件

在开始之前,请完成以下先决条件。

  • 创建 Amazon S3 存储桶 — 您必须在创建 Apache Iceberg 表之前创建一个 Amazon S3 存储桶以添加元数据文件路径。有关更多信息,请参阅创建您的第一个 S3 存储桶

  • 使用 Glue 创建 Apache 冰山表 — 当你使用 Firehose 向亚马逊 S3 中的 Apache Iceberg 表传输直播时,你必须在创建 Firehose 直播 Amazon Glue Data Catalog 之前在中创建目标表。有关更多信息,请参阅 Creating Apache Iceberg tables

    注意
    • 表格格式版本 — Firehose 仅支持 V2 表格格式。请勿创建 V1 格式的表,否则会出现错误,数据将改为传送到 S3 错误存储桶。

    • 数据存储格式 — Firehose 以 Parquet 格式将数据写入 Apache Iceberg Tables。

    • 行级操作 —Firehose 支持将数据写入 Apache Iceberg Tables 的读时合并 (MOR) 模式。

  • 创建具有所需权限的IAM角色 — Firehose 需要一个具有特定权限的IAM角色才能访问 Amazon Glue 表并将数据写入 Amazon S3。在创建 Firehose 直播期间,你需要这个IAM角色。有关更多信息,请参阅 向 Firehose 授予 Apache 冰山桌目标的访问权限

  • 授予表级别权限-您必须使用 Amazon Lake Formation 控制台向在上一步中创建的IAM角色授予表级权限。有关如何执行此操作的更多信息,请参阅授予 Lake Formation 对 Iceberg 表的权限

设置直播

要创建 Firehose 直播,您需要配置以下内容。

来源和目的地

要向 Apache Iceberg Tables 传送数据,请为您的直播选择来源。目前,我们不支持将亚马逊MSK作为向 Apache Iceberg Tables 传输直播的来源。

如果您选择 Amazon Kinesis Data Streams 作为来源,请在控制台的 “来源” 设置下,浏览或创建 Kinesis Data ARN Streams 或者按格式输入数据流。arn:aws:kinesis:[Region]:[AccountId]:stream/[StreamName]

接下来,选择 Apache Iceberg Table s 作为目标并提供 Firehose 直播名称。

数据转换

要对数据执行自定义转换,例如在传入流中添加或修改记录,您可以将 Lambda 函数添加到您的 Firehose 流中。有关在 Firehose 流中使用 Lambda 进行数据转换的更多信息,请参阅。在 Amazon Data Firehose 中转换数据

对于 Apache Iceberg Tables,您需要指定如何将传入流中的数据映射到目标中的特定表和列。您还需要指定如何将传入记录路由到不同的目标表。您可以通过以下方式之一执行此操作。

  • 在将数据采集到 Firehose 之前,请先在数据生成器中对记录进行格式化。

  • 使用 Lambda 函数对记录进行格式化。

有关更多信息,请参阅 格式化输入记录

数据目录

Apache Iceberg 需要数据目录来指定表和列的定义。Firehose 与 Apache Iceber Amazon Glue Data Catalog g Tables 集成。如果您没有现有的数据目录,请参阅 Glu e 数据 Amazon 目录入门

如果你使用 Hive Metastore,请参阅将数据目录连接到外部 Hive 元数据仓,了解如何连接 Hive Metastor Amazon Glue Data Catalog e。

您可以在与您的 Firehose 直播相同的账户和区域 Amazon Glue Data Catalog 中使用(默认),也可以从控制台的Amazon Web Services 区域下拉列表中选择不同的区域。

唯一密钥

唯一键是源记录中的一个或多个字段,用于唯一标识 Apache Iceberg Tables 中的一行。在源记录中使用唯一密钥是可选的。如果要对某些表执行更新和删除操作,则需要在源记录中具有进入这些表的唯一密钥,并且必须在 Firehose 流创建过程中配置这些唯一密钥。

Retry duration

您可以使用此配置来指定 Firehose 在写入 Amazon S3 中的 Apache Iceberg 表时遇到失败时应尝试重试的时长(以秒为单位)。您可以将 0 到 7200 秒之间的任何值设置为执行重试。默认情况下,Firehose 会重试 300 秒。

配送或处理失败

您必须将 Firehose 配置为将记录传送到 S3 备份存储桶,以防它在重试持续时间到期后处理或传输数据流时遇到故障。为此,请在控制台的 Backup 设置中配置 S3 备份存储桶和 S3 备份存储桶错误输出前缀

缓冲区提示

Firehose 会将内存中传入的流数据缓冲到一定大小(缓冲大小)和一段时间(缓冲间隔),然后再将其传送到 Apache Iceberg 表。您可以选择 1—128 的缓冲区大小 MiBs 和 0—900 秒的缓冲间隔。缓冲区提示越高,S3 写入次数越少,由于数据文件越大,压缩成本就会降低;查询执行速度更快,但延迟更高。较低的缓冲区提示值以较低的延迟提供数据。

高级设置

您可以为 Apache Iceberg Tables 配置服务器端加密、错误日志、权限和标签。有关更多信息,请参阅 配置高级设置。在 “高级设置” 中,在 “服务访问权限” 下,您需要添加作为其中一部分创建的IAM角色先决条件 。Firehose 将担任访问 Amazon Glue 表和写入亚马逊 S3 存储桶的角色。

Firehose 直播创建可能需要几分钟才能完成。有关任何与直播创建相关的错误,请参阅处理错误。成功创建 Firehose 流后,您可以开始向其中提取数据,并可以使用控制台查看 Apache Iceberg 表中的数据。 Amazon Glue

处理错误

直播错误

错误消息 描述
Firehose 无法扮演角色 < > roleArn。请检查提供的角色。 Firehose 无法担任所提供的角色。
角色 %s 无权GetTable 在 catalogId %s 上执行粘合:或者该角色不存在。 要么是 Glue 表不存在,要么提供的角色无权访问该表。要修复此问题,请检查您的权限。
不支持将 Iceberg MSK 作为目的地,使用消防水带作为源。 DeliveryStreamType 一定不是MSKasSource。
< UpdateDestination > DestinationTableConfigurationList 无法更新。 DestinationTableConfigurationList 不应在此期间更新 UpdateDestination
< UpdateDestination > 目录ARN无法更新。 GlueCatalogARN在此期间无法更新 UpdateDestination。

Apache Iceberg 传送错误

Firehose 将所有传送错误发送到 CloudWatch 日志和 S3 错误存储桶。

错误代码 错误消息和信息

冰山。 NoSuchTable

指定的表不存在。

Firehose 正在尝试写入一个不存在的表。

冰山。 InvalidTableName

表名无效:空/空或 Iceberg 表不是 v2。

根据 Iceberg 规范的默认验证,数据库/表名称无效。

S3。 AccessDenied

确保所提供IAM角色的信任策略允许 Firehose 代入该角色,并且访问策略允许访问 S3 存储桶。

确保在先决条件步骤中创建的IAM角色具有所需的权限和信任策略。
Glue。 AccessDenied

确保所提供IAM角色的信任策略允许 Firehose 代入该角色,并且访问策略允许访问 S3 存储桶

确保在先决条件步骤中创建的IAM角色具有所需的权限和信任策略。

指标

为了向 Apache Iceberg Tables 传输数据,Firehose 会在直播级别发布以下 CloudWatch 指标。

指标 描述
DeliveryToIceberg.Bytes

在指定时间段内传输到 Apache Iceberg Tables 的字节数。

单位:字节

DeliveryToIceberg.IncomingRowCount

Firehose 尝试向 Apache Iceberg Tables 传送的记录数量。

单位:计数

DeliveryToIceberg.SuccessfulRowCount

成功传送到 Apache Iceberg Tables 的行数。

单位:计数

DeliveryToIceberg.FailedRowCount

传送到 S3 备份存储桶的失败行数。

单位:计数

DeliveryToIceberg.DataFreshness Firehose 中最古老唱片的年龄(从进入 Firehose 到现在)。任何超过这个年龄的唱片都已发送到 Apache Iceberg Tables。

单位:秒

DeliveryToIceberg.Success

成功提交到 Apache Iceberg Tables 的总和。

考虑因素和限制

Firehose 对用于预览的 Apache Iceberg 表的支持有以下注意事项和限制。

  • 吞吐量 — 如果您使用 D irect PUT 作为数据源向 Apache Iceberg 表传送数据,则在美国东部(弗吉尼亚北部)、美国西部(俄勒冈)和欧洲(爱尔兰)区域,每个流的最大吞吐量为 5 MiB/秒,在亚太地区(东京)、加拿大(中部)和亚太地区(悉尼)区域,每个流的最大吞吐量为 1 MiB/秒。如果您只想在没有更新和删除的情况下向 Iceberg 表中插入数据,并且想要使用更高的流吞吐量进行测试,则可以使用 Firehose Limits 表单请求提高吞吐量限制

  • -对于列名和值,Firehose 仅采用多级嵌套中的第一级节点。JSON例如,Firehose 会选择第一级中可用的节点,包括位置字段。源数据的列名和数据类型应与目标表的列名和数据类型相匹配,Firehose 才能成功交付。在这种情况下,Firehose 希望你的 Iceberg 表中有结构或地图数据类型列来匹配位置字段。Firehose 仅支持四个等级的嵌套。以下是嵌套的示例JSON。

    { "version":"2016-04-01", "deviceId":"<solution_unique_device_id>", "sensorId":"<device_sensor_id>", "timestamp":"2024-01-11T20:42:45.000Z", "value":"<actual_value>", "position":{ "x":143.595901, "y":476.399628, "z":0.24234876 } }

    如果列名或数据类型不匹配,Firehose 会抛出错误并将数据传送到 S3 错误存储桶。如果 Apache Iceberg 表中的所有列名和数据类型都匹配,但源记录中还有一个新字段,Firehose 会跳过新字段。

  • 每条记录一个JSON对象 — 在一条 Firehose 记录中只能发送一个JSON对象。如果您在记录中聚合并发送多个JSON对象,Firehose 会抛出错误并将数据传输到 S3 错误存储桶。

  • 直播来源 — Firehose 目前不支持适用于 Apache Kafka 的亚马逊托管流媒体作为 Apache Iceberg Tables 的来源。

  • Apache Iceberg 隔离级别 — Firehose 会向 Apache Iceberg Tables 进行均等写入。目前,我们不支持可序列化和快照隔离级别。

  • 压缩 — 每次使用 Firehose 进行写作时,它都会生成数据文件。拥有成千上万的小数据文件会增加元数据开销并影响读取性能。为了获得最佳的查询性能,您可能需要考虑一种解决方案,即定期使用较小的数据文件并重写为较少的大数据文件。这个过程称为压实。 Amazon Glue Data Catalog 支持自动压缩 Apache 冰山表。有关更多信息,请参阅 Glue 用户指南中的Amazon 压缩管理。有关更多信息,请参阅自动压缩 Apache 冰山表。您还可以使用对 Apache Iceberg 表执行表维护的VACUUM语句来减少存储消耗,从而优化 Iceberg 表。

格式化输入记录

你可以使用 Dire PUT PutRecord ct PutRecordBatch APIs 和/或 Amazon Kinesis Data Streams 将数据提取到 Firehose 中。 PutRecords API目前,对于这两个源,你都需要将你的JSON对象作为 blob 添加到 Data 字段中,如下例所示。

示例JSON对象

{ "courseid": "5", "coursename": "Computer Science", "semester": "fall", "grade": "A" }

JSONas blob 已添加到输入记录的数据字段

{ "Data": blob // byte array of JSON string as shown in previous sample }

使用 Firehose 将流传输到 Apache Iceberg Tables 时,必须指定如何将传入的记录路由到不同的目标表以及要执行的操作。为此,Firehose 需要ADF_Metadat在您的直播记录中包含三个元数据字段,如以下示例所示。您还需要将您的添加JSON为的一部分ADF_Record,如以下示例所示。

{ "ADF_Record": "{ "courseid": "5", "coursename": "Computer Science", "semester": "fall", "grade": "A" }" "ADF_Metadata" : { "OTF_Metadata": { "DestinationTableName": "iceberg_testing", //Required "DestinationDatabaseName": "course",// Required "Operation": "INSERT" // Optional (Possible values-INSERT, UPDATE, DELETE) } } }

在上面的示例中,DestinationDatabaseName是该记录的目标数据库的名称,并且是必填字段。 DestinationTableName是此记录的 Apache Iceberg Table 的名称,也是必填字段。 Operation是一个可选字段,可能的值为、INSERT(默认)UPDATE、和DELETE。根据您要对此记录执行的操作指定值。要执行UPDATE和DELETE操作,您必须在创建 Firehose 直播时为所需表配置唯一键。

注意

DestinationTableNameDestinationDatabasename必须与您的 Iceberg 数据库和表名完全匹配。如果它们不匹配,Firehose 会抛出错误并将失败的传输数据传送到 S3 备份存储桶。

按照上一个示例所示格式化数据后,可以将此JSON对象的 blob 添加到 Data 字段,如下所示。

{ "Data": blob // byte array of JSON string shown in previous example }

您可以通过以下方式之一格式化流媒体记录。

您可以使用带有 Lambda 函数的 Firehose 必填字段来格式化已收录的直播记录。为此,您可以使用以下示例 python 代码。此 Lambda 函数解析传入的流记录,并添加必填字段以指定应如何将数据写入特定的表和列。如果您使用 Lambda 函数,还可以解析数据流中的嵌套JSON记录,并将要写入目标 Iceberg 表中的特定表和列的数据提取出来。

例如,如果格式化之前的传入流记录如下所示sample JSON,则在使用以下示例代码进行格式化后,记录(new_data在代码块中)如下所示sample blob

import base64 import json def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {} # Create result object. firehose_records_output['records'] = [] # Go through records and process them for firehose_record_input in firehose_records_input['records']: json_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') original_data = json.loads(json_bytes) new_data = {'ADF_Record' : original_data} adf_metadata = json.loads('{"OTF_Metadata": {"DestinationTableName":"course", "DestinationDatabaseName": "iceberg_testing","Operation":"insert"}}') new_data["ADF_Metadata"] = adf_metadata firehose_record_output = {} # Must set proper record ID firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' modified_json_bytes = json.dumps(new_data) firehose_record_output['data'] = base64.b64encode(modified_json_bytes.encode('utf-8')) firehose_records_output['records'].append(firehose_record_output) return firehose_records_output

如果要在将传入到 Firehose 之前使用数据生成器中的必填字段格式化传入的流记录,则可以使用以下示例 python 代码。

import boto3 import json delivery_stream_name = 'xxxxxx' print("Putting data to " + delivery_stream_name) firehose_client = boto3.client('firehose', region_name='eu-west-1') # Replace 'region_name' with your AWS region def load_from_local(): records = [] courseids = ["5"] coursenames = ["Computer Science"] for j in range(0, 1): # change here to any json you need to upload to iceberg raw_json = '{"courseid":"%s","coursename": "%s","semester":"fall","grade":"A"}' % (courseids[j], coursenames[j]) print(raw_json) original_data = json.loads(raw_json) metadata = '{"OTF_Metadata": {"DestinationTableName":"course", "DestinationDatabaseName": "iceberg_testing","Operation":"insert"}}' adf_metadata = json.loads(metadata) iceberg_data = {'ADF_Record' : original_data} iceberg_data["ADF_Metadata"] = adf_metadata modified_json_bytes = json.dumps(iceberg_data) record = {'Data': modified_json_bytes} records.append(record) response = firehose_client.put_record_batch(DeliveryStreamName=delivery_stream_name, Records=records) print(response) load_from_local()

例如,如果以下示例代码original_data中的传入直播记录在格式化sample JSON之前看起来像这样,那么在使用以下代码进行格式化之后,记录iceberg_data将如下所示sample code