使用 Lambda 函数预处理数据 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数预处理数据

注意

2023 年 9 月 12 日之后,如果您尚未使用适用于 SQL 的 Kinesis Data Analytics,则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息,请参阅限制

如果您的流中的数据需要格式转换、转换、丰富或过滤,则可以使用 Amazon Lambda 函数对数据进行预处理。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。

在以下情况下,Lambda 函数有助于预处理记录:

  • 将其他格式 (例如 KPL 或 GZIP) 的记录转换为 Kinesis Data Analytics 可以分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。

  • 将数据扩展为聚合或异常检测等操作更易访问的格式。例如,如果多个数据值存储在同一字符串中,您可以将数据展开为多个分开的列。

  • 利用其他 Amazon 服务进行数据扩充,例如外推或错误更正。

  • 将复杂的字符串转换应用于记录字段。

  • 用于整理数据的数据筛选。

使用 Lambda 函数预处理记录

在创建 Kinesis Data Analytics 应用程序时,您可在 连接到源 页面上启用 Lambda 预处理。

使用 Lambda 函数在 Kinesis Data Analytics 应用程序中预处理记录
  1. 登录 Amazon Web Services Management Console 并打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics。

  2. 在应用程序的 连接到源 页面上,在 使用 预处理记录 Amazon Lambda部分选择 已启用。

  3. 如需使用已创建的 Lambda 函数,请在 Lambda 函数 下拉列表中选择该函数。

  4. 如需通过某个 Lambda 预处理模板创建新的 Lambda 函数,请从下拉列表中选择该模板。然后,选择 View <template name> in Lambda (在 Lambda 中查看 <模板名称>) 以编辑该函数。

  5. 如需新建 Lambda 函数,请选择 新建有关创建 Lambda 函数的信息,请参阅开发人员指南中的创建 HelloWorld Lambda 函数和浏览控制台。Amazon Lambda

  6. 选择要使用的 Lambda 函数的版本。要使用最新版本,请选择 $LATEST

在选择或创建 Lambda 函数以预处理记录时,将在执行应用程序 SQL 代码或应用程序通过记录生成架构之前预处理记录。

Lambda 预处理权限

使用 Lambda 进行预处理,应用程序的 IAM 角色需要具有以下权限策略:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Lambda 预处理指标

您可以使用 Amazon CloudWatch 来监控 Lambda 调用的次数、处理的字节数、成功和失败次数等。有关 Kinesis Data Analytics Lambda 预处理发出的 CloudWatch 指标的信息,请参阅亚马逊 Kinesis Analytics 指标。

Amazon Lambda 与 Kinesis 制作人库配合使用

Kinesis Producer Library(KPL)将较小的用户格式化记录聚合为较大的记录(最大为 1 MB),以更好地利用 Amazon Kinesis Data Streams 吞吐量。用于 Java 的 Kinesis 客户端库 (KCL) 支持取消聚合这些记录。但是,当你用作直播的使用者时,必须使用 Amazon Lambda 特殊模块来解聚记录。

要获取必要的项目代码和说明,请参阅 Kinesis Producer 库解聚模块以获取相关信息。 Amazon LambdaGitHub您可以使用此项目中的组件在 Java、Node.js 和 Python Amazon Lambda 中处理 KPL 序列化数据。您也可以将这些组件用在多语言 KCL 应用程序中。

数据预处理事件输入数据模型/记录响应模型

要预处理记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。

事件输入数据模型

Kinesis Data Analytics 会持续从你的 Kinesis 数据流或 Firehose 传输流中读取数据。对于检索的每一批记录,此服务管理如何将每个批次传送到您的 Lambda 函数。您的函数将接收到的记录列表作为输入。在您的函数中,您对列表进行迭代,并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。

预处理函数的输入模型略有不同,具体取决于数据是从 Kinesis 数据流还是 Firehose 传输流接收。

如果源是 Firehose 传送流,则事件输入数据模型如下所示:

Kinesis Data Firehose 请求数据模型

字段 描述
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 Amazon 资源名称 (ARN)
streamArn 传输流 ARN
记录
字段 描述
recordId 记录 ID (随机 GUID)
kinesisFirehoseRecordMetadata
字段 描述
approximateArrivalTimestamp 传输流记录大致到达时间
data Base64 编码的源记录负载

以下示例显示来自 Firehose 传输流的输入:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

如果源是 Kinesis 数据流,事件输入数据模型如下所示:

Kinesis 流请求数据模型

字段 描述
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 ARN
streamArn 传输流 ARN
记录
字段 描述
recordId 基于 Kinesis 记录序列号的记录 ID
kinesisStreamRecordMetadata
字段 描述
sequenceNumber 从 Kinesis 流记录中得到的序列号
partitionKey 从 Kinesis 流记录中得到的分区键
shardId 从 Kinesis 流记录中得到的 ShardId
approximateArrivalTimestamp 传输流记录大致到达时间
数据 Base64 编码的源记录负载

以下示例显示来自 Kinesis 数据流的输入:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

记录响应模型

必须返回发送到 Lambda 函数,并从您的 Lambda 预处理函数返回的所有记录 (带记录 ID)。这些记录必须包含以下参数,否则,Kinesis Data Analytics 将拒绝这些记录,并将其视为失败的数据预处理。可对记录的数据负载部分进行转换,以满足预处理要求。

响应数据模型

记录
字段 描述
recordId 在调用期间,记录 ID 从 Kinesis Data Analytics 传送到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据预处理失败。
result 记录的数据转换的状态。可能的值包括:
  • Ok:记录已成功转换。Kinesis Data Analytics 会摄取 SQL 处理记录。

  • Dropped:您的处理逻辑有意丢弃了此记录。Kinesis Data Analytics 会丢弃 SQL 处理记录。对于 Dropped 记录,数据负载字段是可选的。

  • ProcessingFailed:无法转换记录。Kinesis Data Analytics 认为您的 Lambda 函数未成功处理它,并在错误流中写入一条错误。有关错误流的更多信息,请参阅错误处理。对于 ProcessingFailed 记录,数据负载字段是可选的。

data 转换后的数据负载(使用 base64 编码之后)。如果应用程序提取数据格式为 JSON,则每个数据负载可能包含多个 JSON 文档。或者,如果应用程序提取数据格式为 CSV,则每个数据负载可能包含多个 CSV 行 (在每一行中指定行分隔符)。Kinesis Data Analytics 服务将成功分析并处理同一数据负载中的多个 JSON 文档或 CSV 行数据。

以下示例显示来自 Lambda 函数的输出:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

常见的数据预处理失败情况

以下是预处理失败的常见原因。

  • 并非批次中发送到 Lambda 函数的所有记录(具有记录 ID)都会返回到 Kinesis Data Analytics 服务。

  • 响应中缺少记录 ID、状态或数据负载字段。对于 DroppedProcessingFailed 记录,数据负载字段是可选的。

  • Lambda 函数的超时时间不足以预处理数据。

  • Lambda 函数的响应时间超出了 Amazon Lambda 服务施加的响应限制。

如果数据预处理失败,Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用,直到成功为止。您可以监控以下 CloudWatch 指标以深入了解故障。

  • Kinesis Data Analytics 应用程序 MillisBehindLatest:指示应用程序从流式传输源中读取时的滞后时间。

  • Kinesis Data Analytics InputPreprocessing CloudWatch 应用程序指标:表示成功和失败的数量以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标

  • Amazon Lambda 函数 CloudWatch 指标和日志。