使用 Lambda 函数预处理数据 - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数预处理数据

如果流中的数据需要转变格式、转换、扩充或筛选,您可以使用 Amazon Lambda 函数预处理数据。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。

使用 Lambda 函数对记录进行预处理在以下情景中十分有用:

  • 将记录从其他格式(如 KPL 或 GZIP)转换为 Kinesis Data Analytics 分析可以分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。

  • 将数据扩展为聚合或异常检测等操作更易访问的格式。例如,如果多个数据值存储在同一字符串中,您可以将数据展开为多个分开的列。

  • 利用其他 Amazon 服务进行数据扩充,例如外推或错误更正。

  • 将复杂的字符串转换应用于记录字段。

  • 用于整理数据的数据筛选。

使用 Lambda 函数预处理记录

创建 Kinesis Data Analytics 应用程序时,您可以在Connect 到源页.

使用 Lambda 函数在 Kinesis Data Analytics 应用程序中预处理记录

  1. 登录到Amazon Web Services Management Console并打开 Kinesis Data Analytics 控制台,网址为https://console.aws.amazon.com/kinesisanalytics

  2. 在存储库的Connect 到源页面,选择Enabled (已启用)中的使用记录预处理Amazon Lambda部分。

  3. 要使用已创建的 Lambda 函数,请从Lambda 函数下拉列表中)。

  4. 要通过某个 Lambda 预处理模板创建新的 Lambda 函数,请从下拉列表中选择该模板。然后,选择 View <template name> in Lambda (在 Lambda 中查看 <模板名称>) 以编辑该函数。

  5. 要创建新的 Lambda 函数,请选择创建新的。有关创建 Lambda 函数的信息,请参阅 Lambda 函数。创建 HelloWorld Lambda 函数和探索控制台中的Amazon Lambda开发人员指南

  6. 选择要使用的 Lambda 函数的版本。要使用最新版本,请选择 $LATEST

在选择或创建 Lambda 函数以预处理记录时,将在执行应用程序 SQL 代码或应用程序通过记录生成架构之前预处理记录。

Lambda 预处理权限

要使用 Lambda 进行预处理,应用程序的 IAM 角色需要具有以下权限策略:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

有关添加权限策略的更多信息,请参阅用于 SQL 应用程序的 Amazon Kinesis Data Analytics 的身份验证和访问控制

Lambda 预处理指标

您可以使用 Amazon CloudWatch 监控 Lambda 调用次数、处理的字节数、成功和失败次数,等等。有关 Kinesis Data Analytics Lambda 预处理发送的 CloudWatch 指标的信息,请参阅Amazon Kinesis Analytics 指标

使用Amazon Lambda与 Kinesis 创建者库

这些区域有:Kinesis 创建者库(KPL) 将较小的用户格式化记录聚合为较大的记录 (最大为 1 MB),以更好地利用 Amazon Kinesis Data Streams 吞吐量。适用于 Java 的 Kinesis 客户端库 (KCL) 支持解聚合这些记录。但在将 Amazon Lambda 作为流使用者时,必须使用特殊模块取消聚合记录。

要获取必要的项目代码和说明,请参阅Kinesis 生产者库解聚模块Amazon Lambda(位于 GitHub 上)。您可以使用此项目中的组件在 Amazon Lambda 中通过 Java、Node.js 和 Python 处理 KPL 序列化数据。您也可以将这些组件用在多语言 KCL 应用程序中。

数据预处理事件输入数据模型/记录响应模型

要预处理记录,您的 Lambda 函数必须符合所需事件输入数据和记录响应模型。

事件输入数据模型

Kinesis Data Analytics 会从您的 Kinesis 数据流或 Kinesis Data Firehose 传输流中持续读取数据。对于此服务检索到的每一批记录,均会管理每个批次传递到您的 Lambda 函数的方式。您的函数将接收到的记录列表作为输入。在您的函数中,您对列表进行迭代,并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。

您的预处理函数的输入模型略有不同,具体取决于是从 Kinesis 数据流还是 Kinesis Data Firehose ams 传输流接收数据。

如果源是 Kinesis Data Firehose 传输流,事件输入数据模型如下所示:

Kinesis Data Firehose 请求数据模型

字段 描述
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 Amazon 资源名称 (ARN)
streamArn 传输流 ARN
记录
字段 描述
recordId 记录 ID (随机 GUID)
kinesisFirehoseRecordMetadata
字段 描述
approximateArrivalTimestamp 传输流记录大致到达时间
data Base64 编码的源记录负载

以下示例显示来自 Firehose 传输流的输入:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

如果源是 Kinesis 数据流,则事件输入数据模型如下所示:

Kinesis 流请求数据模型

字段 描述
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 ARN
streamArn 传输流 ARN
记录
字段 描述
recordId 基于 Kinesis 记录序列号的记录 ID
kinesisStreamRecordMetadata
字段 描述
sequenceNumber 从 Kinesis 流记录中得到的序列号
partitionKey 从 Kinesis 流记录中得到的分区键
shardId 从 Kinesis 流记录中得到的 ShardId
approximateArrivalTimestamp 传输流记录大致到达时间
data Base64 编码的源记录负载

以下示例显示来自 Kinesis 数据流的输入:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

记录响应模型

必须返回发送到 Lambda 函数,并从您的 Lambda 预处理函数返回的所有记录 (带记录 ID)。它们必须包含以下参数,否则 Kinesis Data Analytics 会拒绝它们,并视作失败的数据预处理。可对记录的数据负载部分进行转换,以满足预处理要求。

响应数据模型

记录
字段 描述
recordId 记录 ID 在调用期间从 Kinesis Data Analytics 传送到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据预处理失败。
result 记录的数据转换的状态。可能的值包括:
  • Ok:记录已成功转换。Kinesis Data Analytics 提取记录以进行 SQL 处理。

  • Dropped:您的处理逻辑有意丢弃了此记录。Kinesis Data Analytics 将丢弃记录,不对其进行 SQL 处理。对于 Dropped 记录,数据负载字段是可选的。

  • ProcessingFailed:无法转换记录。Kinesis Data Analytics 认为您的 Lambda 函数未成功处理它,并在错误流中写入一条错误。有关错误流的更多信息,请参阅错误处理。对于 ProcessingFailed 记录,数据负载字段是可选的。

data 转换后的数据负载 (使用 base64 编码之后)。如果应用程序提取数据格式为 JSON,则每个数据负载可能包含多个 JSON 文档。或者,如果应用程序提取数据格式为 CSV,则每个数据负载可能包含多个 CSV 行 (在每一行中指定行分隔符)。Kinesis Data Analytics 服务可以成功解析并处理同一数据负载中有多个 JSON 文档或 CSV 行的数据。

以下示例显示来自 Lambda 函数的输出:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

常见的数据预处理失败情况

以下是预处理失败的常见原因。

  • 并非批次中发送到 Lambda 函数的所有记录 (带有记录 ID) 都会返回 Kinesis Data Analytics 服务。

  • 响应中缺少记录 ID、状态或数据负载字段。对于 DroppedProcessingFailed 记录,数据负载字段是可选的。

  • Lambda 函数的超时间不足以预处理数据。

  • Lambda 函数的响应时间超出了由Amazon Lambda服务。

如果数据预处理失败,Kinesis Data Analytics 继续对同一组记录重试 Lambda 调用,直到成功为止。您可以监控以下 CloudWatch 指标,深入了解失败情况。

  • Kinesis Data Analytics 应用程序MillisBehindLatest:指示应用程序从流式传输源中读取时的滞后时间。

  • Kinesis Data Analytics 应用程序InputPreprocessingCloudWatch 指标:指示成功和失败次数以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标

  • Amazon Lambda功能 CloudWatch 指标和日志。