亚马逊数据 Firehose 数据转换 - Amazon Data Firehose
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

亚马逊 Data Firehose 以前被称为亚马逊 Kinesis Data Firehose

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊数据 Firehose 数据转换

Amazon Data Firehose 可以调用您的 Lambda 函数来转换传入的源数据并将转换后的数据传送到目的地。您可以在创建 Firehose 直播时启用亚马逊数据 Firehose 数据转换。

数据转换流

当你启用 Firehose 数据转换时,Firehose 会缓冲传入的数据。缓冲大小提示介于 0.2 MB 和 3MB 之间。除了 Splunk 和 Snowflake 之外,所有目标的默认 Lambda 缓冲大小提示均为 1 MB。对于 Splunk 和 Snowflake,默认的缓冲提示为 256 KB。Lambda 缓冲间隔提示的范围在 0 到 900 秒之间。除了 Snowflake 以外的所有目的地,默认 Lambda 缓冲间隔提示为六十秒。对于 Snowflake,默认的缓冲提示间隔为 30 秒。要调整缓冲区大小,请使用ProcessorParameter调用的BufferSizeInMBsIntervalInSeconds设置CreateDeliveryStreamUpdateDestinationAPI 的ProcessingConfiguration参数。然后,Firehose 使用同步调用模式对每个缓冲批次异步调用指定的 Lambda 函数。 Amazon Lambda 转换后的数据将从 Lambda 发送到 Firehose。然后,当达到指定的目标缓冲大小或缓冲间隔(以先发生者为准)时,Firehose 将其发送到目标。

重要

Lambda 同步调用模式对请求和响应的负载大小限制均为 6MB。确保用于向函数发送请求的缓冲大小小于或等于 6 MB,并且函数返回的响应也不超过 6 MB。

数据转换和状态模型

所有来自 Lambda 的转换记录都必须包含以下参数,否则 Amazon Data Firehose 会拒绝这些参数,并将其视为数据转换失败。

对于 Kinesis Data Streams 和 Direct PUT:

recordId

在调用期间,记录 ID 将从 Amazon Data Firehose 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据转换失败。

result

记录的数据转换的状态。可能的值为:Ok(记录成功转换)、Dropped(处理逻辑故意丢弃记录)和 ProcessingFailed(记录无法转换)。如果记录的状态为OkDropped,则 Amazon Data Firehose 会认为该记录已成功处理。否则,Amazon Data Firehose 会认为处理失败。

数据

转换后的数据负载(使用 base64 编码之后)。

以下是 Lambda 结果输出示例:

{ "recordId": "<recordId from the Lambda input>", "result": "Ok", "data": "<Base64 encoded Transformed data>" }

对于 Amazon MSK

recordId

在调用期间,记录 ID 会从 Firehose 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据转换失败。

result

记录的数据转换的状态。可能的值为:Ok(记录成功转换)、Dropped(处理逻辑故意丢弃记录)和 ProcessingFailed(记录无法转换)。如果记录的状态为OkDropped,Firehose 会认为该记录已成功处理。否则,Firehose 会认为其处理失败。

KafkaRecordValue

转换后的数据负载(使用 base64 编码之后)。

以下是 Lambda 结果输出示例:

{ "recordId": "<recordId from the Lambda input>", "result": "Ok", "kafkaRecordValue": "<Base64 encoded Transformed data>" }

Lambda 蓝图

这些蓝图演示了如何创建和使用 Lamb Amazon da 函数来转换您的 Amazon Data Firehose 数据流中的数据。

查看控制台中可用的蓝图 Amazon Lambda
  1. 登录 Amazon Web Services Management Console 并打开 Amazon Lambda 控制台,网址为 https://console.aws.amazon.com/lambda/

  2. 选择创建函数,然后选择使用蓝图

  3. 在 “蓝图” 字段中,搜索关键词firehose以查找 Amazon Data Firehose Lambda 蓝图。

蓝图列表:

  • 处理发送到亚马逊 Data Firehose 流(Node.js、Python)的记录

    此蓝图展示了如何使用 Lambda Amazon 处理您的 Firehose 数据流中的数据的基本示例。

    最新发行日期:2016 年 11 月。

    发行说明:无。

  • 发送到 Firehose 的处理 CloudWatch 日志

    此蓝图已被弃用。有关处理发送到 Firehose 的 CloudWatch 日志的信息,请参阅使用日志写入 Firehose。 CloudWatch

  • 将 syslog 格式的 Amazon Data Firehose 直播记录转换为 JSON (Node.js)

    此蓝图展示了如何将 RFC3164 系统日志格式的输入记录转换为 JSON。

    最新发行日期:2016 年 11 月。

    发行说明:无。

要查看中可用的蓝图 Amazon Serverless Application Repository
  1. 转到 Amazon Serverless Application Repository

  2. 选择浏览所有应用程序

  3. 应用程序 字段中,搜索关键字 firehose

您也可以在不使用蓝图的情况下创建 Lambda 函数。请参阅 Amazon Lambda 入门

数据转换失败处理

如果您的 Lambda 函数调用由于网络超时或已达到 Lambda 调用限制而失败,Amazon Data Firehose 默认会重试三次调用。如果调用失败,Amazon Data Firehose 将跳过该批记录。跳过的记录会被视为未被成功处理的记录。您可以使用或 UpdateDestination API 指定或覆盖重试选项。CreateDeliveryStream对于此类故障,您可以将调用错误记录到 Amazon Lo CloudWatch gs 中。有关更多信息,请参阅使用日志监控亚马逊数据 Firehose CloudWatch

如果记录的数据转换状态为,则 Amazon Data Firehose 会将该记录视为处理失败。ProcessingFailed对于此类故障,您可以通过 Lambda 函数向 Amazon CloudWatch 日志发送错误日志。有关更多信息,请参阅Amazon Lambda 开发者指南 Amazon Lambda中的访问 Amazon CloudWatch 日志

如果数据转换失败,则未被成功处理的记录会传输到您在 processing-failed 文件夹中的 S3 存储桶。这些记录具有如下格式:

{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade

尝试的调用请求数。

arrivalTimestamp

亚马逊 Data Firehose 收到记录的时间。

errorCode

Lambda 返回的 HTTP 错误代码。

errorMessage

Lambda 返回的错误消息。

attemptEndingTimestamp

Amazon Data Firehose 停止尝试 Lambda 调用的时间。

rawData

经过 base64 编码的记录数据。

lambdaArn

Lambda 函数的 Amazon 资源名称(ARN)。

Lambda 调用的持续时间

Amazon Data Firehose 支持长达 5 分钟的 Lambda 调用时间。如果您的 Lambda 函数需要超过 5 分钟才能完成,则会收到以下错误:Firehose 在调用 Lambda 时遇到超时错误。 Amazon 支持的最大函数超时为 5 分钟。

有关发生此类错误时 Amazon Data Firehose 会如何处理的信息,请参阅。数据转换失败处理

源记录备份

Amazon Data Firehose 可以将所有未转换的记录同时备份到您的 S3 存储桶,同时将转换后的记录传送到目标。您可以在创建或更新 Firehose 直播时启用源记录备份。在启用源记录备份之后,便不能禁用它。