使用 Lambda 函数预处理数据 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

使用 Lambda 函数预处理数据

如果流中的数据需要转变格式、转换、扩充或筛选,您可以使用 AWS Lambda 函数预处理数据。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。

在以下情况下,使用 Lambda 函数预处理记录是非常有用的:

  • 将记录从其他格式(如 KPL 或 GZIP)转换为 Kinesis Data Analytics 可分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。

  • 将数据扩展为聚合或异常检测等操作更易访问的格式。例如,如果多个数据值存储在同一字符串中,您可以将数据展开为多个分开的列。

  • 利用其他 AWS 服务进行数据扩充,例如外推或错误更正。

  • 将复杂的字符串转换应用于记录字段。

  • 用于整理数据的数据筛选。

使用 Lambda 函数预处理记录

在创建 Kinesis Data Analytics 应用程序时,您可以在 Connect to a Source (连接到源) 页面上启用 Lambda 预处理。

使用 Lambda 函数在 Kinesis Data Analytics 应用程序中预处理记录

  1. 登录 AWS 管理控制台并通过以下网址打开 Kinesis Data Analytics 控制台: https://console.amazonaws.cn/kinesisanalytics

  2. 在应用程序的 Connect to a Source (连接到源) 页上,在 Record pre-processing with AWS Lambda (使用 AWS Lambda 预处理记录) 部分中选择 Enabled (已启用)。

  3. 要使用已创建的 Lambda 函数,请在 Lambda function (Lambda 函数) 下拉列表中选择该函数。

  4. 要通过某个 Lambda 预处理模板创建新的 Lambda 函数,请从下拉列表中选择该模板。然后,选择 View <template name> in Lambda (在 Lambda 中查看 <模板名称>) 以编辑该函数。

  5. 要创建新的 Lambda 函数,请选择 Create new (新建)。有关创建 Lambda 函数的信息,请参阅 https://docs.amazonaws.cn/lambda/latest/dg/getting-started-create-function.html 开发人员指南 中的AWS Lambda创建 HelloWorld Lambda 函数和探索控制台

  6. 选择要使用的 Lambda 函数版本。要使用最新版本,请选择 $LATEST

在选择或创建 Lambda 函数以预处理记录时,将在执行应用程序 SQL 代码或应用程序通过记录生成架构之前预处理记录。

Lambda 预处理权限

要使用 Lambda 进行预处理,应用程序的 IAM 角色需要具有以下权限策略:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

有关添加权限策略的更多信息,请参阅Amazon Kinesis Data Analytics for SQL 应用程序的身份验证和访问控制

Lambda 预处理指标

您可以使用 Amazon CloudWatch 监控 Lambda 调用次数、处理的字节数、成功和失败次数,等等。有关 Kinesis Data Analytics Lambda 预处理发送的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标

将 AWS Lambda 与 Kinesis 创建者库一起使用

Kinesis 创建器库 (KPL) 将较小的用户格式化记录聚合为较大的记录(最大为 1 MB),以更好地利用 Amazon Kinesis Data Streams 吞吐量。用于 Java 的 Kinesis 客户端库 (KCL) 支持取消聚合这些记录。但在将 AWS Lambda 作为流使用者时,必须使用特殊模块取消聚合记录。

要获取必要的项目代码和说明,请参阅 GitHub 上的 Kinesis Producer Library Deaggregation Modules for AWS Lambda。您可以使用此项目中的组件在 AWS Lambda 中通过 Java、Node.js 和 Python 处理 KPL 序列化数据。您也可以将这些组件用在多语言 KCL 应用程序中。

数据预处理事件输入数据模型/记录响应模型

要预处理记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。

事件输入数据模型

Kinesis Data Analytics 从 Kinesis 数据流或 Kinesis Data Firehose 传输流中持续读取数据。对于检索的每一批记录,此服务管理如何将每个批次传送到您的 Lambda 函数。您的函数将接收到的记录列表作为输入。在您的函数中,您对列表进行迭代,并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。

您的预处理函数的输入模型略有不同,具体取决于是从 Kinesis 数据流还是 Kinesis Data Firehose 传输流中接收数据。

如果源是 Kinesis Data Firehose 传输流,则事件输入数据模型如下所示:

Kinesis Data Firehose 请求数据模型

字段 Description
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 Amazon 资源名称 (ARN)
streamArn 传输流 ARN
记录
字段 Description
recordId 记录 ID (随机 GUID)
kinesisFirehoseRecordMetadata
字段 Description
approximateArrivalTimestamp 传输流记录大致到达时间
data Base64 编码的源记录负载

以下示例显示来自 Firehose 传输流的输入:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

如果源是 Kinesis 数据流,则事件输入数据模型如下所示:

Kinesis 流请求数据模型

字段 Description
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 ARN
streamArn 传输流 ARN
记录
字段 Description
recordId 基于 Kinesis 记录序列号的记录 ID
kinesisStreamRecordMetadata
字段 Description
sequenceNumber 从 Kinesis 流记录中得到的序列号
partitionKey 从 Kinesis 流记录中得到的分区键
shardId 从 Kinesis 流记录中得到的 ShardId
approximateArrivalTimestamp 传输流记录大致到达时间
数据 Base64 编码的源记录负载

以下示例显示来自 Kinesis 数据流的输入:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

记录响应模型

必须返回发送到 Lambda 函数,并从您的 Lambda 预处理函数返回的所有记录 (带记录 ID)。这些记录必须包含以下参数,否则,Kinesis Data Analytics 将拒绝这些记录,并将其视为失败的数据预处理。可对记录的数据负载部分进行转换,以满足预处理要求。

响应数据模型

记录
字段 Description
recordId 在调用期间,记录 ID 从 Kinesis Data Analytics 传送到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据预处理失败。
result 记录的数据转换的状态。可能的值包括:
  • Ok: 记录已成功转换。 Kinesis Data Analytics 为SQL处理提供记录。

  • Dropped: 您的处理逻辑故意丢弃记录。 Kinesis Data Analytics 从SQL处理中删除记录。对于 Dropped 记录,数据负载字段是可选的。

  • ProcessingFailed: 记录无法转换。 Kinesis Data Analytics 将其未成功地用您的Lambda函数处理,并将错误写入错误流。有关错误流的更多信息,请参阅错误处理。对于 ProcessingFailed 记录,数据负载字段是可选的。

data 转换后的数据负载 (使用 base64 编码之后)。如果应用程序提取数据格式为 JSON,则每个数据负载可能包含多个 JSON 文档。或者,如果应用程序提取数据格式为 CSV,则每个数据负载可能包含多个 CSV 行 (在每一行中指定行分隔符)。Kinesis Data Analytics 服务将成功分析并处理同一数据负载中的多个 JSON 文档或 CSV 行数据。

以下示例显示来自 Lambda 函数的输出:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

常见的数据预处理失败情况

以下是预处理失败的常见原因。

  • 并非批次中发送到 Lambda 函数的所有记录(具有记录 ID)都返回到 Kinesis Data Analytics 服务。

  • 响应中缺少记录 ID、状态或数据负载字段。对于 DroppedProcessingFailed 记录,数据负载字段是可选的。

  • 在 Lambda 函数的超时时间内,不足以预处理数据。

  • Lambda 函数的响应时间超出了 AWS Lambda 服务施加的响应限制。

如果数据预处理失败,Kinesis Data Analytics 继续对同一组记录重试 Lambda 调用,直到成功为止。您可以监控以下 CloudWatch 指标,以了解详细的失败情况。

  • Kinesis Data Analytics 应用 MillisBehindLatest: 表示应用程序从流来源读取的距离。

  • Kinesis Data Analytics 应用 InputPreprocessing CloudWatch 度量: 表示成功和失败的数量以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标

  • AWS Lambda 函数 CloudWatch 指标和日志。