使用 Lambda 函数作为输出 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

使用 Lambda 函数作为输出

通过将 AWS Lambda 作为目标,您可以更轻松地执行 SQL 结果后处理,然后再将其发送到最终目标。常见的后处理任务包括:

  • 将多行聚合为一条记录

  • 将当前结果与过去的结果相结合以解决迟到数据的问题

  • 根据信息类型传输到不同的目标

  • 记录格式转换 (如转换为 Protobuf)

  • 字符串操作或转换

  • 分析处理后的数据扩充

  • 地理空间使用案例的自定义处理

  • 数据加密

Lambda 函数可以将分析信息传输到各种不同的 AWS 服务和其他目标,其中包括:

有关创建 Lambda 应用程序的更多信息,请参阅 AWS Lambda 入门

Lambda 作为输出权限

要将 Lambda 作为输出,应用程序的 Lambda 输出 IAM 角色需要使用以下权限策略:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "FunctionARN" }

将 Lambda 作为输出指标

您可以使用 Amazon CloudWatch 监控发送的字节数、成功和失败次数,等等。有关将 Lambda 作为输出时 Kinesis Data Analytics 发出的 CloudWatch 指标的信息,请参阅 Amazon Kinesis Analytics 指标

Lambda 作为输出事件输入数据模型和记录响应模型

要发送 Kinesis Data Analytics 输出记录,您的 Lambda 函数必须符合所需的事件输入数据和记录响应模型。

事件输入数据模型

Kinesis Data Analytics 使用以下请求模型持续将输出记录从应用程序发送到 Lambda 以作为输出函数。在您的函数中,遍历列表并应用业务逻辑来完成输出要求 (例如,在将数据发送到最终目标之前先进行数据转换)。

字段 Description
invocationId Lambda 调用 ID(随机 GUID)。
applicationArn Kinesis Data Analytics 应用程序 Amazon 资源名称 (ARN)。
记录
字段 Description
recordId 记录 ID (随机 GUID)
lambdaDeliveryRecordMetadata
字段 Description
retryHint 传输重试次数
数据 Base64 编码的输出记录负载
注意

retryHint 是一个每次传输失败时都会增加的值。该值不会持久不变,并在应用程序中断时重置。

记录响应模型

作为输出函数发送到您的 Lambda 的每个记录(具有记录 ID)必须使用 OkDeliveryFailed 进行确认,并且必须包含以下参数。否则,Kinesis Data Analytics 将其视为传输失败。

记录
字段 Description
recordId 记录 ID 在调用期间从 Kinesis Data Analytics 传递到 Lambda。如果原始记录的 ID 和确认记录的 ID 之间不匹配,就会被视为传输失败。
result 记录的传输状态。以下是可能的值:
  • Ok: 记录已成功转换并发送至最终目标。 Kinesis Data Analytics 为SQL处理提供记录。

  • DeliveryFailed: 记录未成功交付至最终目的地 Lambda 作为输出功能。 Kinesis Data Analytics 持续重试将失败的记录发送到 Lambda 作为输出功能。

Lambda 输出调用频率

Kinesis Data Analytics 应用程序缓存输出记录,并频繁调用 AWS Lambda 目标函数。

  • 如果在数据分析应用程序中使用滚动窗口将记录发送到目标应用程序内部流,每次触发滚动窗口时,都会调用 AWS Lambda 目标函数。例如,如果使用 60 秒的滚动窗口将记录发送到目标应用程序内部流,则每 60 秒调用一次 Lambda 函数。

  • 如果在该应用程序中使用持续查询或滑动窗口将记录发送到目标应用程序内部流,则大约每秒调用一次 Lambda 目标函数。

注意

每个 Lambda 函数调用请求负载大小限制适用。超出这些限制将导致拆分输出记录,并在多个 Lambda 函数调用中进行发送。

添加 Lambda 函数以作为输出

以下过程说明了如何添加 Lambda 函数以作为 Kinesis Data Analytics 应用程序输出。

  1. 登录 AWS 管理控制台并通过以下网址打开 Kinesis Data Analytics 控制台: https://console.amazonaws.cn/kinesisanalytics

  2. 选择列表中的应用程序,然后选择 Application details

  3. Destination 部分,选择 Connect new destination

  4. 对于 Destination (目标) 项,请选择 AWS Lambda function (AWS Lambda 函数)

  5. Deliver records to AWS Lambda (将记录传输到 AWS Lambda) 部分中,选择现有的 Lambda 函数和版本或选择 Create new (新建)

  6. 如果正在创建新的 Lambda 函数,请执行以下操作:

    1. 选择提供的模板之一。有关更多信息,请参阅 为应用程序目标创建 Lambda 函数

    2. 将在新的浏览器选项卡中打开 Create Function (创建函数) 页。在 Name (名称) 框中,为函数指定一个有意义的名称(例如,myLambdaFunction)。

    3. 针对您的应用程序用后处理功能更新模板。有关创建 Lambda 函数的信息,请参阅 AWS Lambda 开发人员指南 中的入门。

    4. 在 Kinesis Data Analytics 控制台上的 Lambda function (Lambda 函数) 列表中,选择刚创建的 Lambda 函数。为 Lambda 函数版本选择 $LATEST

  7. In-application stream 部分,选择 Choose an existing in-application stream。对于 In-application stream name,选择应用程序的输出流。选定输出流的结果将发送到 Lambda 输出函数。

  8. 保持表单其余部分为默认值,然后选择 Save and continue

您的应用程序现在将记录从应用程序内部流发送到 Lambda 函数。您可以在 Amazon CloudWatch 控制台中查看默认模板的结果。监控 AWS/KinesisAnalytics/LambdaDelivery.OkRecords 指标,以查看传输到 Lambda 函数的记录数。

将 Lambda 作为输出的常见故障

传输到 Lambda 函数可能失败的常见原因如下所示。

  • 并非批次中发送到 Lambda 函数的所有记录(具有记录 ID)都会返回到 Kinesis Data Analytics 服务。

  • 响应中缺少记录 ID 或状态字段。

  • 在 Lambda 函数的超时时间内,不足以完成 Lambda 函数中的业务逻辑。

  • Lambda 函数中的业务逻辑不会捕获所有错误,导致因未处理的异常而产生超时和反向压力。这些消息通常称为“毒丸”消息。

如果数据传输失败,Kinesis Data Analytics 继续对同一组记录重试 Lambda 调用,直到成功为止。要了解详细的失败情况,您可以监控以下 CloudWatch 指标:

  • Kinesis Data Analytics 应用 Lambda 作为输出 CloudWatch 度量: 表示成功和失败的数量以及其他统计数据。有关更多信息,请参阅 Amazon Kinesis Analytics 指标

  • AWS Lambda 函数 CloudWatch 指标和日志。