经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Lambda 函数预处理数据
注意
2023 年 9 月 12 日之后,如果您尚未使用 Kinesis Data Analytics for SQL,则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息,请参阅限制。
如果流中的数据需要转变格式、转换、扩充或筛选,您可以使用 Amazon Lambda 函数预处理数据。您可以在执行应用程序 SQL 代码或应用程序通过数据流创建架构之前执行此操作。
在以下情况下,Lambda 函数有助于预处理记录:
-
将其他格式 (例如 KPL 或 GZIP) 的记录转换为 Kinesis Data Analytics 可以分析的格式。Kinesis Data Analytics 目前支持 JSON 或 CSV 数据格式。
-
将数据扩展为聚合或异常检测等操作更易访问的格式。例如,如果多个数据值存储在同一字符串中,您可以将数据展开为多个分开的列。
-
利用其他 Amazon 服务进行数据扩充,例如外推或错误更正。
-
将复杂的字符串转换应用于记录字段。
-
用于整理数据的数据筛选。
使用 Lambda 函数预处理记录
在创建 Kinesis Data Analytics 应用程序时,您可在 连接到源 页面上启用 Lambda 预处理。
使用 Lambda 函数在 Kinesis Data Analytics 应用程序中预处理记录
登录 Amazon Web Services Management Console,然后打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
在应用程序的 连接到源 页面上,在 使用 预处理记录 Amazon Lambda部分选择 已启用。
-
如需使用已创建的 Lambda 函数,请在 Lambda 函数 下拉列表中选择该函数。
-
如需通过某个 Lambda 预处理模板创建新的 Lambda 函数,请从下拉列表中选择该模板。然后,选择 View <template name> in Lambda (在 Lambda 中查看 <模板名称>) 以编辑该函数。
-
如需新建 Lambda 函数,请选择 新建。有关创建 Lambda 函数的信息,请参阅Amazon Lambda开发人员指南中的创建 HelloWorld Lambda 函数和探索控制台。
-
选择要使用的 Lambda 函数的版本。要使用最新版本,请选择 $LATEST。
在选择或创建 Lambda 函数以预处理记录时,将在执行应用程序 SQL 代码或应用程序通过记录生成架构之前预处理记录。
Lambda 预处理权限
使用 Lambda 进行预处理,应用程序的 IAM 角色需要具有以下权限策略:
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
Lambda 预处理指标
您可以使用 Amazon CloudWatch 监控 Lambda 调用次数、处理的字节数、成功和失败次数,等等。有关 Kinesis Data Analytics Lambda 预处理发送的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标。
将 Amazon Lambda 与 Kinesis 创建器库一起使用
Kinesis Producer Library(KPL)将较小的用户格式化记录聚合为较大的记录(最大为 1 MB),以更好地利用 Amazon Kinesis Data Streams 吞吐量。用于 Java 的 Kinesis 客户端库 (KCL) 支持取消聚合这些记录。但在将 Amazon Lambda 作为流使用者时,必须使用特殊模块取消聚合记录。
如需获取必要的项目代码和说明,请参阅 GitHub 上的 Amazon Lambda
数据预处理事件输入数据模型/记录响应模型
要预处理记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。
事件输入数据模型
Kinesis Data Analytics 持续从您的 Kinesis 数据流或 Firehose 传输流读取数据。对于检索的每一批记录,此服务管理如何将每个批次传送到您的 Lambda 函数。您的函数将接收到的记录列表作为输入。在您的函数中,您对列表进行迭代,并应用业务逻辑来完成您的预处理要求 (如数据格式转换或扩充)。
您的预处理函数的输入模型会稍有不同,具体取决于是从 Kinesis 数据流还是 Firehose 传输流接收数据。
如果源是 Firehose 传输流,则事件输入数据模型如下所示:
Kinesis Data Firehose 请求数据模型
字段 | 描述 | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 调用 ID (随机 GUID)。 | ||||||||||||
applicationArn |
Kinesis Data Analytics 应用程序 Amazon 资源名称 (ARN) | ||||||||||||
streamArn |
传输流 ARN | ||||||||||||
记录
|
以下示例显示来自 Firehose 传输流的输入:
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
如果源是 Kinesis 数据流,事件输入数据模型如下所示:
Kinesis 流请求数据模型
字段 | 描述 | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 调用 ID (随机 GUID)。 | ||||||||||||||||||
applicationArn |
Kinesis Data Analytics 应用程序 ARN | ||||||||||||||||||
streamArn |
传输流 ARN | ||||||||||||||||||
记录
|
以下示例显示来自 Kinesis 数据流的输入:
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
记录响应模型
必须返回发送到 Lambda 函数,并从您的 Lambda 预处理函数返回的所有记录 (带记录 ID)。这些记录必须包含以下参数,否则,Kinesis Data Analytics 将拒绝这些记录,并将其视为失败的数据预处理。可对记录的数据负载部分进行转换,以满足预处理要求。
响应数据模型
记录
|
以下示例显示来自 Lambda 函数的输出:
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
常见的数据预处理失败情况
以下是预处理失败的常见原因。
-
并非批次中发送到 Lambda 函数的所有记录(具有记录 ID)都会返回到 Kinesis Data Analytics 服务。
-
响应中缺少记录 ID、状态或数据负载字段。对于
Dropped
或ProcessingFailed
记录,数据负载字段是可选的。 -
Lambda 函数的超时时间不足以预处理数据。
-
Lambda 函数的响应时间超出了 Amazon Lambda 服务施加的响应限制。
如果数据预处理失败,Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用,直到成功为止。您可以监控以下 CloudWatch 指标,以了解详细的失败情况。
-
Kinesis Data Analytics 应用程序
MillisBehindLatest
:指示应用程序从流式传输源中读取时的滞后时间。 -
Kinesis Data Analytics 应用程序
InputPreprocessing
CloudWatch 指标:指示成功和失败次数以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标。 -
Amazon Lambda 函数 CloudWatch 指标和日志。