将 Amazon Lambda 与 Amazon Kinesis 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Amazon Lambda 与 Amazon Kinesis 结合使用

注意

如果您想将数据发送到 Lambda 函数以外的目标或在发送数据之前对其进行扩充,请参阅 A ma EventBridge zon Pipes。

您可以使用 Amazon Lambda 函数来处理 Amazon Kinesis 数据流中的记录。

Kinesis 数据流是一组分区。每个分片包含一系列数据记录。使用者 是一种处理 Kinesis 数据流中的数据的应用程序。您可以将 Lambda 函数映射到共享吞吐量使用者(标准迭代器)或具有增强扇出功能的专用吞吐量使用者。

对于标准迭代器,Lambda 使用 HTTP 协议轮询 Kinesis 流中的每个分片以查找记录。事件源映射与分片的其他使用者共享读取吞吐量。

为了最大限度地减少延迟并最大限度地提高读取吞吐量,您可以创建具有增强扇出功能的数据流使用者。流使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。如果您有许多应用程序读取相同的数据,或者您正在重新处理具有大记录的流,则专用吞吐量可以提供帮助。Kinesis 通过 HTTP/2 向 Lambda 推送记录。

有关 Kinesis 数据流的详细信息,请参阅读取 Amazon Kinesis Data Streams 中的数据

示例事件

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

轮询和批处理流

Lambda 从数据流中读取记录并同步调用您的函数,带有一个包含流记录的事件。Lambda 分批读取记录并调用您的函数来处理批处理中的记录。每个批处理包含来自单个分区/数据流的记录。

默认情况下,Lambda 会在记录可用时尽快调用您的函数。如果 Lambda 从事件源中读取的批处理只有一条记录,则 Lambda 将会只向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数,您可以配置 batching window(批处理时段),让事件源缓冲最多五分钟的记录。调用函数前,Lambda 会持续从事件源中读取记录,直到收集完整批处理、批处理时段到期或批处理达到 6MB 的有效负载时为止。有关更多信息,请参阅批处理行为

警告

Lambda 事件源映射对每个事件至少处理一次,并且可能会出现批次的重复处理。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为等性。要了解更多信息,请参阅知识中心中的如何使我的 Lambda 函数具有等性。 Amazon

如果您的函数返回一个错误,则 Lambda 将重试批处理,直到处理成功或数据过期。为了避免分区停止,您可以将事件源映射配置为使用较小批次重试、限制重试次数或丢弃过旧的记录。要保留已丢弃的事件,您可以将事件源映射配置为将有关失败批处理的详细信息发送到 SQS 队列或标准 SNS 主题。

要增加并发性,您还可以将来自各个分片的多个批次并行处理。Lambda 可以同时处理各个分区中的多达 10 个批次。如果您增加每个分区的并发批次数,Lambda 仍然可以确保分区密钥级别的顺序处理。

配置 ParallelizationFactor 设置来同时处理一个 Kinesis 或 DynamoDB 数据流的分区,并使用多个 Lambda 调用。您可以指定 Lambda 通过从 1(默认值)到 10 的并行化因子从分区中轮询的并发批次数。例如,假设您将 ParallelizationFactor 设置为 2,则最多可以有 200 个并发 Lambda 调用来处理 100 个 Kinesis 数据分片(但您可能实际上会看到不同的 ConcurrentExecutions 指标值)。这有助于在数据量不稳定并且 IteratorAge 较高时纵向扩展处理吞吐量。

您也可以将ParallelizationFactor与 Kinesis 聚合一起使用。事件源映射的行为取决于你是否使用增强的扇出

  • 如果没有增强的扇出:聚合事件中的所有事件都必须具有相同的分区键。分区键还必须与聚合事件的分区键相匹配。如果聚合事件中的事件具有不同的分区键,则 Lambda 无法保证按分区键按顺序处理事件。

  • 借助增强的扇出功能:首先,Lambda 将聚合的事件解码为其单个事件。聚合事件的分区键可以与其包含的事件不同。但是,与分区键不对应的事件会被删除并丢失。Lambda 不处理这些事件,也不会将它们发送到配置的故障目的地。

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

配置数据流和函数

您的 Lambda 函数是数据流的用户应用程序。对于每个分片,它一次处理一批记录。您可以将 Lambda 函数映射到数据流(标准迭代器),或映射到流的使用者(增强型扇出功能)。

对于标准迭代器,Lambda 将针对记录轮询 Kinesis 流中的每个分区(按照每秒一次的基本频率)。当有更多记录可用时,Lambda 会继续进行批处理,直到函数赶上流的速度。事件源映射与分区的其他使用者共享读取吞吐量。

为了最大限度地减少延迟并最大限度地提高读取吞吐量,请创建具有增强扇出功能的数据流使用者。增强扇出功能使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。流使用者使用 HTTP/2 通过长期连接将记录推送到 Lambda 并压缩请求头来减少延迟。你可以使用 Kinesis RegisterStreamConsumerAPI 创建直播使用者。

aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

您应看到以下输出:

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

要提高函数处理记录的速度,请将分区添加到数据流中。Lambda 按顺序处理各个分区中的记录。如果您的函数返回错误,它会停止处理分片中的其他记录。使用更多分片,可以同时处理更多批次,从而降低错误对并发性的影响。

如果您的函数无法扩展以处理并发批处理的总数,请为您的函数请求提高配额预留并发

执行角色权限

Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源。AWSLambdaKinesisExecutionRole 托管策略包含这些权限。您可以将此托管策略添加到函数的执行角色中。

如果 Kinesis 数据流和 Lambda 函数位于不同账户中,请确保流资源向 Lambda 函数的执行角色或账户授予 kinesis:DescribeStream 权限。

此外,从控制台创建事件源映射时,必须拥有 k inesis: ListStreams 和 kin esis: ListStreamConsumers 权限

要将失败批处理的记录发送到某个标准 SQS 队列或 SNS 主题,您的函数需要额外的权限。每项目标服务均需要不同的权限,如下所示:

添加权限并创建事件源映射

创建事件源映射以指示 Lambda 将数据流中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个数据流的项目。处理来自多个数据流的项目时,每个批处理将只包含来自单个分区/流的记录。

要将您的函数配置为从 Kinesis 数据流中读取,请将AWSLambdaKinesisExecutionRoleAmazon托管策略添加到您的执行角色并创建 Kinesis 触发器。

添加权限并创建触发器
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. 选择 Configuration(配置)选项卡,然后选择 Permissions(权限)。

  4. 在 “角色名称” 下,选择指向您的执行角色的链接。此链接在 IAM 控制台中打开该角色。

    
              指向执行角色的链接
  5. 选择添加权限,然后选择附加策略

    
              在 IAM 控制台中附加策略
  6. 在搜索字段中输入 AWSLambdaKinesisExecutionRole。将此策略添加到您的执行角色中。这是一个Amazon托管策略,包含您的函数从 Kinesis 流中读取所需的权限。有关此策略的更多信息,请参阅AWSLambdaKinesisExecutionRoleAmazon托管策略参考》。

  7. 在 Lambda 控制台中返回您的函数。在 Function overview(函数概览)下,选择 Add trigger(添加触发器)。

    
              Lambda 控制台的函数概述部分
  8. 选择触发器类型。

  9. 配置必填选项,然后选择 Add(添加)。

Lambda 为 Kinesis 事件源支持以下选项:

事件源选项
  • Kinesis 流(Kinesis 流)– 从中读取记录的 Kinesis 流。

  • Consumer(使用者)(可选)– 使用流使用者通过专用连接从流中读取。

  • Batch size(批处理大小)– 每个批次中要发送到函数的记录数,最高为 10000。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的有效负载限制 (6 MB)。

  • Batch window(批处理时段)– 指定在调用函数之前收集记录的最长时间(以秒为单位)。

  • Starting position(开始位置)– 仅处理新记录、所有现有记录或在特定日期之后创建的记录。

    • Latest(最新)– 处理已添加到流中的新记录。

    • Trim horizon(时间范围)– 处理流中的所有记录。

    • At timestamp(时间戳)– 处理从特定时间开始的记录。

    在处理任何现有记录后,函数将继续处理新记录。

  • 失败时的目标 – 无法处理的记录的标准 SQS 队列或标准 SNS 主题。当 Lambda 因为某批记录太旧或已用尽所有重试而将其丢弃时,Lambda 会将有关该批处理的详细信息发送到该队列或主题。

  • Retry attempts(重试次数)– 函数返回错误时 Lambda 重试的最大次数。这不适用于批处理未到达函数的服务错误或限制。

  • Maximum age of record(记录的最长时限)– Lambda 发送到您的函数的记录的最长期限。

  • Split batch on error(出现错误时拆分批)– 当函数返回错误时,在重试之前将批次拆分为两批。原始批量大小设置会保持不变。

  • Concurrent batches per shard(每个分片的并发批处理数)– 同时处理来自同一个分片的多个批处理。

  • Enabled(已启用)– 设置为 true 可启用事件源映射。设置为 false 可停止处理记录。Lambda 将跟踪已处理的最后一条记录,并在重新启用后从停止位置重新开始处理。

注意

Kinesis 按每个分区收费;对于增强型扇出功能,从流中读取数据。有关定价详细信息,请参阅 Amazon Kinesis 定价

之后,要管理事件源配置,请在设计器中选择触发器。

筛选 Kinesis 事件

将 Kinesis 配置为 Lambda 的事件源时,您可以使用事件筛选来控制 Lambda 将流中的哪些记录发送到函数以进行处理。要了解有关将 Lambda 事件筛选与 Kinesis 结合使用的更多信息,请参阅使用 Kinesis 筛选

事件源映射 API

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

要使用 Amazon CLI 创建事件源映射,请使用 create-event-source-mapping 命令。以下示例使用 Amazon CLI 将名为 my-function 的函数映射到 Kinesis 数据流。数据流由 Amazon 资源名称(ARN)指定,批处理大小为 500,从以 Unix 时间表示的时间戳开始。

aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

您应看到以下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

要使用一个使用者,请指定使用者的 ARN 而非数据流的 ARN。

配置其他选项以自定义处理批次的方式,以及指定何时丢弃无法处理的记录。以下示例更新事件源映射,以便在两次重试尝试后或记录已超过一小时的情况下向标准 SQS 队列发送失败记录。

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

您应该看到此输出内容:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新的设置是异步应用的,并且直到该过程完成才反映在输出中。使用 get-event-source-mapping 命令查看当前状态。

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

您应该看到此输出内容:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

要同时处理多个批次,请使用 --parallelization-factor 选项。

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

错误处理

从 Kinesis 流读取记录的事件源映射会同步调用函数并在出现错误时重试。如果 Lambda 对该函数实施节流或返回错误而不调用该函数,Lambda 将会持续重试,直到记录过期或超过您在事件源映射上配置的最大使用期限为止。

如果函数收到记录但返回错误,Lambda 将重试,直到批处理中的记录过期、超过最长使用期限或达到配置的重试配额。对于函数错误,您还可以将事件源映射配置为将失败的批处理拆分为两批。使用较小的批次重试会隔离错误的记录并解决超时问题。拆分批次不计入重试配额。

如果错误处理措施失败,Lambda 将丢弃记录并继续处理数据流中的批次。使用默认设置时,这意味着错误的记录可能会阻止受影响的分区上的处理,时间最长为一周。为了避免这种情况,请配置函数的事件源映射,使用合理的重试次数和适合您的使用案例的最长记录期限。

要保留已丢弃批次的记录,请配置失败事件目标。Lambda 将包含有关批次详细信息的文档发送到目标队列或主题。

为失败事件记录配置目标
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择函数。

  3. Function overview(函数概览)下,选择 Add destination(添加目标)。

  4. 对于 Source(源),选择 Stream invocation(流调用)。

  5. 对于 Stream(流),选择映射到函数的流。

  6. 对于 Destination type(目标类型),请选择接收调用记录的资源类型。

  7. 对于 Destination (目标),请选择一个资源。

  8. 选择 Save(保存)。

以下示例显示了 Kinesis 流的调用记录。

例 调用记录
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

您可以使用此信息从流中检索受影响的记录以进行故障排除。实际记录不包括在内,因此您必须处理此记录并在记录过期和丢失之前从流中检索它们。

亚马逊 CloudWatch 指标

在您的函数处理完一批记录后,Lambda 将发出 IteratorAge 指标。该指标指示处理完成时批处理中最后一条记录的时间。如果您的函数正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理新记录的时间之间的延迟。

迭代器期限中的上升趋势可以指示您的函数问题。有关更多信息,请参阅 使用 Lambda 函数指标

时间窗口

Lambda 函数可以运行连续流处理应用程序。流表示通过您的应用程序持续流动的无边界数据。要分析这种不断更新的输入中的信息,可以使用按时间定义的窗口来限制包含的记录。

滚动窗口是定期打开和关闭的不同窗口。预设情况下,Lambda 调用是无状态的,在没有外部数据库的情况下,无法使用它们跨多次连续调用处理数据。但是,有了滚动窗口后,您可以在不同调用中保持状态。此状态包含之前为当前窗口处理的消息的汇总结果。您的状态最多可以是每个分片 1MB。如果超过该大小,Lambda 将提前终止窗口。

流中的每条记录都属于特定窗口。Lambda 将至少处理每条记录一次,但不保证每条记录只处理一次。在极少数情况下(例如错误处理),某些记录可能会被多次处理。第一次处理记录时始终按顺序处理。如果多次处理记录,则可能会不按顺序处理。

聚合和处理

系统将调用您的用户托管函数以便聚合和处理该聚合的最终结果。Lambda 汇总在该窗口中接收的所有记录。您可以分多个批次接收这些记录,每个批次都作为单独的调用。每次调用都会收到一个状态。因此,当使用滚动窗口时,Lambda 函数响应必须包含 state 属性。如果响应不包含 state 属性,Lambda 会将其视作失败的调用。为了满足该条件,您的函数可以返回一个具有以下 JSON 形状的 TimeWindowEventResponse 对象:

TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注意

对于 Java 函数,我们建议使用 Map<String, String> 来表示状态。

在窗口末尾,标志 isFinalInvokeForWindow 被设置 true,以表示这是最终状态,并且已准备好进行处理。处理完成后,窗口完成,最终调用完成,然后状态将被删除。

在窗口结束时,Lambda 会对针对聚合结果的操作应用最终处理。您的最终处理将同步调用。成功调用后,函数会检查序列号并继续进行流处理。如果调用失败,则您的 Lambda 函数将暂停进一步处理,直到成功调用为止。

例 KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

配置

您可以在创建或更新事件源映射时配置滚动窗口。要配置滚动窗口,请以秒为单位进行指定。以下示例 Amazon Command Line Interface (Amazon CLI) 命令会创建一个滚动窗口为 120 秒的流式事件源映射。为聚合和处理定义的 Lambda 函数被命名为 tumbling-window-example-function

aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda 根据记录插入到流的时间来确定滚动窗口的边界。所有记录都有一个大致的时间戳,供 Lambda 在确定边界时使用。

滚动窗口聚合不支持重新分片。当分区结束时,Lambda 会认为窗口已关闭,子分区将以全新的状态启动自己的窗口。

滚动窗口完全支持现有的重试策略 maxRetryAttemptsmaxRecordAge

例 Handler.py – 聚合和处理

以下 Python 函数演示了如何聚合然后处理您的最终状态:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}

报告批处理项目失败

在使用和处理来自事件源的流式数据时,默认情况下,Lambda 仅在批处理完全成功时,才会在批次的最高序列号处设置检查点。Lambda 会将所有其他结果视为完全失败并重试批处理,直至达到重试次数上限。要允许在处理来自流的批次时部分成功,请开启 ReportBatchItemFailures。允许部分成功有助于减少对记录重试的次数,尽管这并不能完全阻止在成功记录中重试的可能性。

要开启 ReportBatchItemFailures,请在 ReportBatchItemFailures 列表 FunctionResponseTypes 中包含枚举值。此列表指示为函数启用了哪些响应类型。您可以在创建或更新事件源映射时配置此列表。

报告语法

配置批处理项目失败的报告时,将返回 StreamsEventResponse 类,其中包含批处理项目失败列表。您可以使用 StreamsEventResponse 对象返回批处理中第一个失败记录的序列号。您还可以使用正确的响应语法来创建自己的自定义类。以下 JSON 结构显示了所需的响应语法:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
注意

如果 batchItemFailures 数组包含多个项目,Lambda 会使用序列号最小的记录作为检查点。然后,Lambda 会重试从该检查点开始的所有记录。

成功和失败的条件

如果返回以下任意一项,则 Lambda 会将批处理视为完全成功:

  • 空的 batchItemFailure 列表

  • Null batchItemFailure 列表

  • 空的 EventResponse

  • Null EventResponse

如果返回以下任何一项,则 Lambda 会将批处理视为完全失败:

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

Lambda 会根据您的重试策略在失败时重试。

将批次一分为二

如果调用失败并且已开启 BisectBatchOnFunctionError,则无论您的 ReportBatchItemFailures 设置如何,批次都将一分为二。

当收到批处理部分成功响应且同时开启 BisectBatchOnFunctionErrorReportBatchItemFailures 时,批次将在返回的序列号处一分为二,并且 Lambda 将仅重试剩余记录。

以下函数代码示例将返回批处理中处理失败消息的 ID 列表:

.NET
Amazon SDK for .NET
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 .NET 进行 Lambda Kinesis 批处理项目失败。

using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }
Go
适用于 Go V2 的 SDK
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Go 报告 Lambda 的 Kinesis 批处理项目失败。

package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, record := range kinesisEvent.Records { curRecordSequenceNumber := "" // Process your record if /* Your record processing condition here */ { curRecordSequenceNumber = record.Kinesis.SequenceNumber } // Add a condition to check if the record processing failed if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber}) } } kinesisBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return kinesisBatchResponse, nil } func main() { lambda.Start(handler) }
Java
SDK for Java 2.x
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 Java 进行 Lambda Kinesis 批处理项目失败。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
JavaScript
适用于 JavaScript (v2) 的软件开发工具包
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 Javascript 进行 Lambda Kinesis 批处理项目失败。

exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

使用 Lambda 报告 Kinesis 批处理项目失败。 TypeScript

import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
适用于 PHP 的 SDK
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 PHP 使用 Lambda 报告 Kinesis 批处理项目失败。

<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $kinesisEvent = new KinesisEvent($event); $this->logger->info("Processing records"); $records = $kinesisEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK for Python (Boto3)
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 Python 进行 Lambda Kinesis 批处理项目失败。

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Rust
适用于 Rust 的 SDK
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告通过 Rust 进行 Lambda Kinesis 批处理项目失败。

use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Amazon Kinesis 配置参数

所有 Lambda 事件源类型都共享相同的 UpdateEventSourceMappingAPI CreateEventSourceMapping操作。但是,只有部分参数适用于 Kinesis。

适用于 Kinesis 的事件源参数
参数 必需 默认值 注意事项

BatchSize

100

最大值:10000

BisectBatchOnFunctionError

false

DestinationConfig

丢弃的记录的标准 Amazon SQS 队列或标准 Amazon SNS 主题目标。

启用

真实

EventSourceArn

Y

数据流或流使用者的 ARN

FunctionName

Y

MaximumBatchingWindowInSeconds

0

MaximumRecordAgeInSeconds

–1

-1 表示无限:Lambda 不会丢弃记录(Kinesis Data Streams 数据保留设置仍然适用

最小值:-1

最大值:604800

MaximumRetryAttempts

–1

-1 表示无限:会一直重试失败的记录,直到记录过期。

最小值:-1

最大值:10000

ParallelizationFactor

1

最大值:10

StartingPosition

Y

AT_TIMESTAMP、TRIM_HORIZON 或 LATEST

StartingPositionTimestamp

仅当设置 StartingPosition 为 AT_TIMESTAMP 时才有效。开始读取的时间(以 Unix 时间秒为单位)

TumblingWindowInSeconds

最小值:0

最大值:900