经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Lambda 函数作为输出
通过将 Amazon Lambda 作为目标,您可以更轻松地执行 SQL 结果后处理,然后再将其发送到最终目标。常见的后处理任务包括:
-
将多行聚合为一条记录
-
将当前结果与过去的结果相结合以解决迟到数据的问题
-
根据信息类型传输到不同的目标
-
记录格式转换 (如转换为 Protobuf)
-
字符串操作或转换
-
分析处理后的数据扩充
-
地理空间使用案例的自定义处理
-
数据加密
Lambda 函数可以将分析信息传输到各种不同的 Amazon 服务和其他目标,其中包括:
有关创建 Lambda 应用程序的更多信息,请参阅Amazon Lambda 入门。
主题
Lambda 作为输出权限
要使用 Lambda 作为输出,应用程序的 Lambda 输出 IAM 角色需要以下权限策略:
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "
FunctionARN
" }
Lambda 作为输出指标
您可以使用 Amazon CloudWatch 监控发送的字节数、成功和失败次数,等等。有关 Kinesis Data Analytics 将 Lambda 函数作为输出发送的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标。
Lambda 作为输出事件输入数据模型和记录响应模型
要发送 Kinesis Data Analytics 输出记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。
事件输入数据模型
使用以下请求模型持续将输出记录从应用程序发送到 以作为输出函数。在您的函数中,遍历列表并应用业务逻辑来完成输出要求 (例如,在将数据发送到最终目标之前先进行数据转换)。
字段 | 描述 | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
&LAM; 调用 ID (随机 GUID)。 | ||||||||||||
applicationArn |
数据分析应用程序 Amazon 资源名称 (ARN)。 | ||||||||||||
记录
|
注意
retryHint
是一个每次传输失败时都会增加的值。该值不会持久不变,并在应用程序中断时重置。
记录响应模型
作为输出函数发送到您的 Lambda 的每个记录(具有记录 ID)必须使用 Ok
或 DeliveryFailed
进行确认,并且必须包含以下参数。否则,Kinesis Data Analytics 将其视为传输失败。
记录
|
Lambda 输出调用频率
Kinesis Data Analytics 数据分析应用程序缓存输出记录,并频繁调用 Amazon Lambda 目标函数。
-
如果在数据分析应用程序中使用滚动窗口将记录发送到目标应用程序内部流,每次触发滚动窗口时,都会调用 Amazon Lambda 目标函数。例如,如果使用 60 秒的滚动窗口将记录发送到目标应用程序内部流,则每 60 秒调用一次 函数。
-
如果在该应用程序中使用持续查询或滑动窗口将记录发送到目标应用程序内部流,则大约每秒调用一次 目标函数。
注意
每 Lambda 函数调用请求负载大小限制适用。超出这些限制将导致拆分输出记录,并在多个 函数调用中进行发送。
添加 Lambda 函数作为输出
以下过程说明了如何添加 Lambda 函数以作为 Kinesis Data Analytics 应用程序输出。
登录 Amazon Web Services Management Console,然后打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择列表中的应用程序,然后选择 Application details。
-
在 Destination 部分,选择 Connect new destination。
-
对于 Destination 项,选择 Amazon Lambda function。
-
在向 Amazon Lambda 传递记录部分中,选择现有Lambda 函数或创建新函数。
-
如果您正在创建一个新的 Lambda 函数,请执行以下操作:
-
选择提供的模板之一。有关更多信息,请参阅 为应用程序目标创建 Lambda 函数。
-
将在新的浏览器选项卡中打开 Create Function (创建函数) 页。在 Name (名称) 框中,为函数指定一个有意义的名称(例如,
myLambdaFunction
)。 -
针对您的应用程序用后处理功能更新模板。有关创建 Lambda 函数的信息,请参阅Amazon Lambda开发人员指南中的入门。
-
在 Kinesis Data Analytics 控制台上的 Lambda 函数列表中,选择刚创建的 Lambda 函数。Lambda 函数版本选择最新。
-
-
在 In-application stream 部分,选择 Choose an existing in-application stream。对于 In-application stream name,选择应用程序的输出流。选定输出流的结果将发送到 Lambda 输出函数。
-
保持表单其余部分为默认值,然后选择 Save and continue。
您的应用程序现在将记录从应用程序内部流发送到 Lambda 函数。您可以在 Amazon CloudWatch 控制台中查看默认模板的结果。监控 AWS/KinesisAnalytics/LambdaDelivery.OkRecords
指标,查看传输给 Lambda 函数的记录数。
常见的 Lambda 作为输出的故障
传输到 Lambda 函数失败的常见原因如下。
-
并非批次中发送到 Lambda 函数的所有记录(具有记录 ID)都会返回到 服务。
-
响应中缺少记录 ID 或状态字段。
-
Lambda 函数超时不足以在 Lambda 函数内完成业务逻辑。
-
Lambda 函数中的业务逻辑不会捕获所有错误,导致因未处理的异常而产生超时和反向压力。这些消息通常称为“毒丸”消息。
如果数据传输失败,Kinesis Data Analytics 会继续对同一组记录重试 Lambda 调用,直到成功为止。要了解详细的失败情况,您可以监控以下 CloudWatch 指标:
-
Kinesis Data Analytics 应用程序 Lambda as Output CloudWatch 指标:指示成功和失败次数以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标。
-
Amazon Lambda 函数 CloudWatch 指标和日志。