亚马逊 Data Firehose 以前被称为亚马逊 Kinesis Data Firehose
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
亚马逊数据 Firehose 数据转换
Amazon Data Firehose 可以调用您的 Lambda 函数来转换传入的源数据并将转换后的数据传送到目的地。您可以在创建 Firehose 直播时启用亚马逊数据 Firehose 数据转换。
数据转换流
当你启用 Firehose 数据转换时,Firehose 会缓冲传入的数据。缓冲大小提示介于 0.2 MB 和 3MB 之间。除了 Splunk 和 Snowflake 之外,所有目标的默认 Lambda 缓冲大小提示均为 1 MB。对于 Splunk 和 Snowflake,默认的缓冲提示为 256 KB。Lambda 缓冲间隔提示的范围在 0 到 900 秒之间。除了 Snowflake 以外的所有目的地,默认 Lambda 缓冲间隔提示为六十秒。对于 Snowflake,默认的缓冲提示间隔为 30 秒。要调整缓冲区大小,请使用ProcessorParameter调用的BufferSizeInMBs
和IntervalInSeconds
设置CreateDeliveryStream或 UpdateDestinationAPI 的ProcessingConfiguration参数。然后,Firehose 使用同步调用模式对每个缓冲批次异步调用指定的 Lambda 函数。 Amazon Lambda 转换后的数据将从 Lambda 发送到 Firehose。然后,当达到指定的目标缓冲大小或缓冲间隔(以先发生者为准)时,Firehose 将其发送到目标。
重要
Lambda 同步调用模式对请求和响应的负载大小限制均为 6MB。确保用于向函数发送请求的缓冲大小小于或等于 6 MB,并且函数返回的响应也不超过 6 MB。
数据转换和状态模型
所有来自 Lambda 的转换记录都必须包含以下参数,否则 Amazon Data Firehose 会拒绝这些参数,并将其视为数据转换失败。
对于 Kinesis Data Streams 和 Direct PUT:
- recordId
-
在调用期间,记录 ID 将从 Amazon Data Firehose 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据转换失败。
- result
-
记录的数据转换的状态。可能的值为:
Ok
(记录成功转换)、Dropped
(处理逻辑故意丢弃记录)和ProcessingFailed
(记录无法转换)。如果记录的状态为Ok
或Dropped
,则 Amazon Data Firehose 会认为该记录已成功处理。否则,Amazon Data Firehose 会认为处理失败。 - 数据
-
转换后的数据负载(使用 base64 编码之后)。
以下是 Lambda 结果输出示例:
{ "recordId":
"<recordId from the Lambda input>"
, "result": "Ok", "data":"<Base64 encoded Transformed data>"
}
对于 Amazon MSK
- recordId
-
在调用期间,记录 ID 会从 Firehose 传递到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据转换失败。
- result
-
记录的数据转换的状态。可能的值为:
Ok
(记录成功转换)、Dropped
(处理逻辑故意丢弃记录)和ProcessingFailed
(记录无法转换)。如果记录的状态为Ok
或Dropped
,Firehose 会认为该记录已成功处理。否则,Firehose 会认为其处理失败。 - KafkaRecordValue
-
转换后的数据负载(使用 base64 编码之后)。
以下是 Lambda 结果输出示例:
{ "recordId":
"<recordId from the Lambda input>"
, "result": "Ok", "kafkaRecordValue":"<Base64 encoded Transformed data>"
}
Lambda 蓝图
这些蓝图演示了如何创建和使用 Lamb Amazon da 函数来转换您的 Amazon Data Firehose 数据流中的数据。
查看控制台中可用的蓝图 Amazon Lambda
登录 Amazon Web Services Management Console 并打开 Amazon Lambda 控制台,网址为 https://console.aws.amazon.com/lambda/
。 -
选择创建函数,然后选择使用蓝图。
-
在 “蓝图” 字段中,搜索关键词
firehose
以查找 Amazon Data Firehose Lambda 蓝图。
蓝图列表:
-
处理发送到亚马逊 Data Firehose 流(Node.js、Python)的记录
此蓝图展示了如何使用 Lambda Amazon 处理您的 Firehose 数据流中的数据的基本示例。
最新发行日期:2016 年 11 月。
发行说明:无。
-
发送到 Firehose 的处理 CloudWatch 日志
此蓝图已被弃用。有关处理发送到 Firehose 的 CloudWatch 日志的信息,请参阅使用日志写入 Firehose。 CloudWatch
-
将 syslog 格式的 Amazon Data Firehose 直播记录转换为 JSON (Node.js)
此蓝图展示了如何将 RFC3164 系统日志格式的输入记录转换为 JSON。
最新发行日期:2016 年 11 月。
发行说明:无。
要查看中可用的蓝图 Amazon Serverless Application Repository
-
选择浏览所有应用程序。
-
在 应用程序 字段中,搜索关键字
firehose
。
您也可以在不使用蓝图的情况下创建 Lambda 函数。请参阅 Amazon Lambda 入门。
数据转换失败处理
如果您的 Lambda 函数调用由于网络超时或已达到 Lambda 调用限制而失败,Amazon Data Firehose 默认会重试三次调用。如果调用失败,Amazon Data Firehose 将跳过该批记录。跳过的记录会被视为未被成功处理的记录。您可以使用或 UpdateDestination
API 指定或覆盖重试选项。CreateDeliveryStream对于此类故障,您可以将调用错误记录到 Amazon Lo CloudWatch gs 中。有关更多信息,请参阅使用日志监控亚马逊数据 Firehose CloudWatch 。
如果记录的数据转换状态为,则 Amazon Data Firehose 会将该记录视为处理失败。ProcessingFailed
对于此类故障,您可以通过 Lambda 函数向 Amazon CloudWatch 日志发送错误日志。有关更多信息,请参阅Amazon Lambda 开发者指南 Amazon Lambda中的访问 Amazon CloudWatch 日志。
如果数据转换失败,则未被成功处理的记录会传输到您在 processing-failed
文件夹中的 S3 存储桶。这些记录具有如下格式:
{ "attemptsMade": "
count
", "arrivalTimestamp": "timestamp
", "errorCode": "code
", "errorMessage": "message
", "attemptEndingTimestamp": "timestamp
", "rawData": "data
", "lambdaArn": "arn
" }
attemptsMade
-
尝试的调用请求数。
arrivalTimestamp
-
亚马逊 Data Firehose 收到记录的时间。
errorCode
-
Lambda 返回的 HTTP 错误代码。
errorMessage
-
Lambda 返回的错误消息。
attemptEndingTimestamp
-
Amazon Data Firehose 停止尝试 Lambda 调用的时间。
rawData
-
经过 base64 编码的记录数据。
lambdaArn
-
Lambda 函数的 Amazon 资源名称(ARN)。
Lambda 调用的持续时间
Amazon Data Firehose 支持长达 5 分钟的 Lambda 调用时间。如果您的 Lambda 函数需要超过 5 分钟才能完成,则会收到以下错误:Firehose 在调用 Lambda 时遇到超时错误。 Amazon 支持的最大函数超时为 5 分钟。
有关发生此类错误时 Amazon Data Firehose 会如何处理的信息,请参阅。数据转换失败处理
源记录备份
Amazon Data Firehose 可以将所有未转换的记录同时备份到您的 S3 存储桶,同时将转换后的记录传送到目标。您可以在创建或更新 Firehose 直播时启用源记录备份。在启用源记录备份之后,便不能禁用它。