Amazon Kinesis Data Firehose
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

Amazon Kinesis Data Firehose 数据转换

Kinesis Data Firehose 可以调用您的 Lambda 函数转换传入的源数据并将转换后的数据传输给目标。当您创建传输流时,可以启用 Kinesis Data Firehose 数据转换。

数据转换流

启用 Kinesis Data Firehose 数据转换后,Kinesis Data Firehose 默认情况下将缓冲最多 3 MB 的传入数据。(要调整缓冲大小,请将 ProcessingConfiguration API 与名为 BufferSizeInMBsProcessorParameter 一起使用。)然后,Kinesis Data Firehose 将使用 AWS Lambda 同步调用模式,对每个缓冲的批处理异步调用指定的 Lambda 函数。转换后的数据将从 Lambda 发送到 Kinesis Data Firehose。然后,当达到指定的目标缓冲大小或缓冲间隔时(以先达到者为准),Kinesis Data Firehose 会将这些数据发送到目的地。

重要

对于请求和响应,Lambda 同步调用模式的负载大小限制为 6 MB。确保用于向函数发送请求的缓冲大小小于或等于 6 MB,并且函数返回的响应也不超过 6 MB。

数据转换和状态模型

所有通过 Lambda 转换的记录均包含以下参数,否则 Kinesis Data Firehose 会拒绝它们并将其视为数据转换失败。

recordId

记录 ID 在调用期间从 Kinesis Data Firehose 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据转换失败。

result

记录的数据转换的状态。可能的值为:Ok(记录成功转换)、Dropped(处理逻辑故意丢弃记录)和 ProcessingFailed(记录无法转换)。如果记录的状态为 OkDropped,Kinesis Data Firehose 会认为它已成功处理。否则,Kinesis Data Firehose 会认为它未被成功处理。

数据

转换后的数据负载 (使用 base64 编码之后)。

Lambda 蓝图

Kinesis Data Firehose 提供以下 Lambda 蓝图,可供您用来为数据转换创建 Lambda 函数。

  • General Firehose Processing (一般 Firehose 处理) — 包含上一部分中描述的数据转换和状态模型。可将此蓝图用于任何自定义转换逻辑。

  • Apache Log to JSON (Apache 日志到 JSON) — 解析 Apache 日志行并将其转换为 JSON 对象(使用预定义的 JSON 字段名称)。

  • Apache Log to CSV (Apache 日志到 CSV) — 解析 Apache 日志行并将其转换为 CSV 格式。

  • Syslog to JSON (Syslog 到 JSON) — 解析 Syslog 行并将其转换为 JSON 对象(使用预定义的 JSON 字段名称)。

  • Syslog to CSV (Syslog 到 CSV) — 解析 Syslog 行并将其转换为 CSV 格式。

  • Kinesis Data Firehose Process Record Streams as source (Kinesis Data Firehose 将记录流作为源进行处理) — 访问输入中的 Kinesis Data Streams 记录,并返回记录及处理状态。

  • Kinesis Data Firehose CloudWatch Logs Processor (Kinesis Data Firehose CloudWatch Logs 处理器) —从 CloudWatch Logs 订阅筛选器发送的记录中解析和提取各个日志事件。

Lambda 蓝图仅提供 Node.js 和 Python 语言版本。您可以采用其他支持的语言实现自己的函数。有关 AWS Lambda 支持的语言的信息,请参阅简介:构建 Lambda 函数

要查看 Kinesis Data Firehose 的所有 Lambda 蓝图以及 Python 和 Node.js 中的示例,请执行以下步骤

  1. 通过以下网址登录 AWS 管理控制台并打开 AWS Lambda 控制台:https://console.amazonaws.cn/lambda/

  2. 选择 Create function (创建函数),然后选择 Blueprints (蓝图)

  3. 搜索关键字“firehose”以查找 Kinesis Data Firehose Lambda 蓝图

数据转换失败处理

如果您的 Lambda 函数调用由于网络超时或因为您已经达到 Lambda 调用限制而失败,Kinesis Data Firehose 会默认重试调用三次。如果调用不成功,Kinesis Data Firehose 随后会跳过该批记录。跳过的记录会被视为未被成功处理的记录。您可以使用 CreateDeliveryStreamUpdateDestination API 指定或覆盖重试选项。对于这种失败,您可以将调用错误记录到 Amazon CloudWatch Logs 中。有关更多信息,请参阅 使用 CloudWatch Logs 监控 Kinesis Data Firehose

如果记录的数据转换的状态为 ProcessingFailed,Kinesis Data Firehose 会将记录视为未被成功处理。对于这种失败,您可以从 Lambda 函数将错误日志发送到 Amazon CloudWatch Logs。有关更多信息,请参阅 AWS Lambda Developer Guide 中的访问 AWS Lambda 的 Amazon CloudWatch Logs

如果数据转换失败,则未被成功处理的记录会传输到您在 processing-failed 文件夹中的 S3 存储桶。这些记录具有如下格式:

{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade

尝试的调用请求数。

arrivalTimestamp

Kinesis Data Firehose 收到记录的时间。

errorCode

Lambda 返回的 HTTP 错误代码。

errorMessage

Lambda 返回的错误消息。

attemptEndingTimestamp

Kinesis Data Firehose 停止尝试 Lambda 调用的时间。

rawData

经过 base64 编码的记录数据。

lambdaArn

Lambda 函数的 Amazon 资源名称 (ARN)。

源记录备份

Kinesis Data Firehose 可以在将转换的记录传输到目标的同时,将所有未转换的记录备份至 S3 存储桶。您可以在创建或更新传输流时,启用源记录备份。在启用源记录备份之后,便不能禁用它。