使用 Lambda 函数预处理数据 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数预处理数据

注意

2023 年 9 月 12 日之后,如果您尚未使用 Kinesis Data Analytics for SQL,则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息,请参阅限制

如果流中的数据需要转变格式、转换、扩充或筛选,您可以使用 Amazon Lambda 函数预处理数据。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。

在以下情况下,Lambda 函数有助于预处理记录:

  • 将其他格式 (例如 KPL 或 GZIP) 的记录转换为 Kinesis Data Analytics 可以分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。

  • 将数据扩展为聚合或异常检测等操作更易访问的格式。例如,如果多个数据值存储在同一字符串中,您可以将数据展开为多个分开的列。

  • 利用其他 Amazon 服务进行数据扩充,例如外推或错误更正。

  • 将复杂的字符串转换应用于记录字段。

  • 用于整理数据的数据筛选。

使用 Lambda 函数预处理记录

在创建 Kinesis Data Analytics 应用程序时,您可在 连接到源 页面上启用 Lambda 预处理。

使用 Lambda 函数在 Kinesis Data Analytics 应用程序中预处理记录
  1. 登录 Amazon Web Services Management Console,然后打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在应用程序的 连接到源 页面上,在 使用 预处理记录 Amazon Lambda部分选择 已启用。

  3. 如需使用已创建的 Lambda 函数,请在 Lambda 函数 下拉列表中选择该函数。

  4. 如需通过某个 Lambda 预处理模板创建新的 Lambda 函数,请从下拉列表中选择该模板。然后,选择 View <template name> in Lambda (在 Lambda 中查看 <模板名称>) 以编辑该函数。

  5. 如需新建 Lambda 函数,请选择 新建。有关创建 Lambda 函数的信息,请参阅Amazon Lambda开发人员指南中的创建 HelloWorld Lambda 函数和探索控制台

  6. 选择要使用的 Lambda 函数的版本。要使用最新版本,请选择 $LATEST

在选择或创建 Lambda 函数以预处理记录时,将在执行应用程序 SQL 代码或应用程序通过记录生成架构之前预处理记录。

Lambda 预处理权限

使用 Lambda 进行预处理,应用程序的 IAM 角色需要具有以下权限策略:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Lambda 预处理指标

您可以使用 Amazon CloudWatch 监控 Lambda 调用次数、处理的字节数、成功和失败次数,等等。有关 Kinesis Data Analytics Lambda 预处理发送的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标

将 Amazon Lambda 与 Kinesis 创建器库一起使用

Kinesis Producer Library(KPL)将较小的用户格式化记录聚合为较大的记录(最大为 1 MB),以更好地利用 Amazon Kinesis Data Streams 吞吐量。用于 Java 的 Kinesis 客户端库 (KCL) 支持取消聚合这些记录。但在将 Amazon Lambda 作为流使用者时,必须使用特殊模块取消聚合记录。

如需获取必要的项目代码和说明,请参阅 GitHub 上的 Amazon Lambda Kinesis 创建器库取消聚合模块。您可以使用此项目中的组件在 Amazon Lambda 中通过 Java、Node.js 和 Python 处理 KPL 序列化数据。您也可以将这些组件用在多语言 KCL 应用程序中。

数据预处理事件输入数据模型/记录响应模型

要预处理记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。

事件输入数据模型

Kinesis Data Analytics 持续从您的 Kinesis 数据流或 Firehose 传输流读取数据。对于检索的每一批记录,此服务管理如何将每个批次传送到您的 Lambda 函数。您的函数将接收到的记录列表作为输入。在您的函数中,您对列表进行迭代,并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。

您的预处理函数的输入模型会稍有不同,具体取决于是从 Kinesis 数据流还是 Firehose 传输流接收数据。

如果源是 Firehose 传输流,则事件输入数据模型如下所示:

Kinesis Data Firehose 请求数据模型

字段 描述
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 Amazon 资源名称 (ARN)
streamArn 传输流 ARN
记录
字段 描述
recordId 记录 ID (随机 GUID)
kinesisFirehoseRecordMetadata
字段 描述
approximateArrivalTimestamp 传输流记录大致到达时间
data Base64 编码的源记录负载

以下示例显示来自 Firehose 传输流的输入:

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

如果源是 Kinesis 数据流,事件输入数据模型如下所示:

Kinesis 流请求数据模型

字段 描述
invocationId Lambda 调用 ID (随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 ARN
streamArn 传输流 ARN
记录
字段 描述
recordId 基于 Kinesis 记录序列号的记录 ID
kinesisStreamRecordMetadata
字段 描述
sequenceNumber 从 Kinesis 流记录中得到的序列号
partitionKey 从 Kinesis 流记录中得到的分区键
shardId 从 Kinesis 流记录中得到的 ShardId
approximateArrivalTimestamp 传输流记录大致到达时间
数据 Base64 编码的源记录负载

以下示例显示来自 Kinesis 数据流的输入:

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

记录响应模型

必须返回发送到 Lambda 函数,并从您的 Lambda 预处理函数返回的所有记录 (带记录 ID)。这些记录必须包含以下参数,否则,Kinesis Data Analytics 将拒绝这些记录,并将其视为失败的数据预处理。可对记录的数据负载部分进行转换,以满足预处理要求。

响应数据模型

记录
字段 描述
recordId 在调用期间,记录 ID 从 Kinesis Data Analytics 传送到 Lambda。转换后的记录必须包含相同记录 ID。原始记录的 ID 和转换记录的 ID 之间如果有不匹配,将被视为数据预处理失败。
result 记录的数据转换的状态。可能的值包括:
  • Ok:记录已成功转换。Kinesis Data Analytics 会摄取 SQL 处理记录。

  • Dropped:您的处理逻辑有意丢弃了此记录。Kinesis Data Analytics 会丢弃 SQL 处理记录。对于 Dropped 记录,数据负载字段是可选的。

  • ProcessingFailed:无法转换记录。Kinesis Data Analytics 认为您的 Lambda 函数未成功处理它,并在错误流中写入一条错误。有关错误流的更多信息,请参阅错误处理。对于 ProcessingFailed 记录,数据负载字段是可选的。

data 转换后的数据负载(使用 base64 编码之后)。如果应用程序提取数据格式为 JSON,则每个数据负载可能包含多个 JSON 文档。或者,如果应用程序提取数据格式为 CSV,则每个数据负载可能包含多个 CSV 行 (在每一行中指定行分隔符)。Kinesis Data Analytics 服务将成功分析并处理同一数据负载中的多个 JSON 文档或 CSV 行数据。

以下示例显示来自 Lambda 函数的输出:

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

常见的数据预处理失败情况

以下是预处理失败的常见原因。

  • 并非批次中发送到 Lambda 函数的所有记录(具有记录 ID)都会返回到 Kinesis Data Analytics 服务。

  • 响应中缺少记录 ID、状态或数据负载字段。对于 DroppedProcessingFailed 记录,数据负载字段是可选的。

  • Lambda 函数的超时时间不足以预处理数据。

  • Lambda 函数的响应时间超出了 Amazon Lambda 服务施加的响应限制。

如果数据预处理失败,Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用,直到成功为止。您可以监控以下 CloudWatch 指标,以了解详细的失败情况。

  • Kinesis Data Analytics 应用程序 MillisBehindLatest:指示应用程序从流式传输源中读取时的滞后时间。

  • Kinesis Data Analytics 应用程序 InputPreprocessing CloudWatch 指标:指示成功和失败次数以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标

  • Amazon Lambda 函数 CloudWatch 指标和日志。