Lambda 事件源映射 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Lambda 事件源映射

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

事件源映射是一个从事件源读取并调用 Lambda 函数的 Lambda 资源。您可以使用事件源映射来处理未直接调用 Lambda 函数的服务中的流或队列中的项。本页介绍了 Lambda 提供事件源映射的服务以及如何微调批处理行为。

事件源映射使用函数执行角色中的权限来读取和管理事件源中的项。权限、事件结构、设置和轮询行为因事件源而异。有关更多信息,请参阅用作事件源的服务的链接主题。

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

注意

在更新、禁用或删除 Amazon MQ、Amazon MSK、自行管理的 Apache Kafka 或 Amazon DocumentDB 的事件源映射后,更改最长可能需要 15 分钟才会生效。在此期间结束之前,您的事件源映射可能会继续处理事件并使用您之前的设置来调用您的函数。即使控制台中显示的事件源映射状态表明更改已经生效,也可能发生这种情况。

以下示例使用 Amazon CLI 将一个名为 my-function 的函数映射到由其 Amazon Resource Name(ARN)指定的 DynamoDB 流,其中批处理大小为 500。

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525

您应看到以下输出:

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

由于轮询器的分布式特性,Lambda 事件源映射至少处理一次事件。因此,在极少数情况下,Lambda 函数可能会收到重复的事件。遵循 使用 Amazon Lambda 函数的最佳实践 并构建幂等函数,以避免与重复事件相关的问题。

批处理行为

事件源映射从目标事件源读取项目。预设情况下,事件源映射会将记录合并为单个有效负载进行批处理,并由 Lambda 将其发送到您的函数。您可以配置批处理时段 (MaximumBatchingWindowInSeconds) 和批处理大小 (BatchSize) 来优化批处理行为。批处理时段是将记录收集到单个有效负载中的最长时间。批处理大小是单个批处理中的最大记录数。满足以下三个条件中的任意一个时,Lambda 会调用您的函数:

  • 批处理时段达到其最大值。批处理时段行为因特定的事件源而异。

    • 对于 Kinesis、DynamoDB 和 Amazon SQS 事件源:原定设置的批处理时段是 0 秒。这意味着 Lambda 会尽快向您的函数发送批处理。如果您配置了 MaximumBatchingWindowInSeconds,则下一个批处理时段将在上一个函数调用完成后立即开始计算。

    • 对于 Amazon MSK、自托管式 Apache Kafka、Amazon MQ 和 Amazon DocumentDB 事件源:默认批处理时段为 500 毫秒。您可以将 MaximumBatchingWindowInSeconds 配置为介于 0 秒到 300 秒之间的任意值,以秒的整数倍调整。第一条记录到达后,批处理时段将立即开始计算。

      注意

      由于您只能以秒的整数倍调整 MaximumBatchingWindowInSeconds,您无法在更改该值后恢复到 500 毫秒的原定设置批处理时段。要恢复原定设置的批处理时段,必须创建新的事件源映射。

  • 达到批处理大小。最小批处理大小为 1。原定设置和最大批处理大小取决于事件源。有关这些值的详细信息,请参阅 CreateEventSourceMapping API 操作的 BatchSize 规范。

  • 有效负载大小达到 6MB您不能修改此限制。

下图演示了这三个条件。假设批处理时段从 t = 7 秒开始。在第一种场景中,批处理时段累积 5 条记录后在 t = 47 秒达到 40 秒的最大值。在第二种场景中,批处理大小在批处理时段到期之前达到 10,因此批处理时段会提前结束。在第三种场景中,在批处理时段到期之前达到最大有效负载大小,因此批处理时段会提前结束。


        当满足以下三个条件中的任何一个时,批处理时段将到期:批处理时段达到最大值、达到批处理大小或有效负载大小达到 6MB。

以下示例显示了从 Kinesis 流读取的事件源映射。如果一批事件的所有处理尝试失败,则事件源映射将有关该批次的详细信息发送到 SQS 队列。


        从 Kinesis 流读取的事件源映射。它在本地将记录排队后才将记录发送给函数。

事件批次是 Lambda 发送到函数的事件。它是由事件源映射在当前批处理时段到期之前读取的项目组成的一批记录或消息。

对于 Kinesis 和 DynamoDB 流,事件源映射为流中的每个分片创建迭代器,并按顺序处理每个分片中的项。您可以将事件源映射配置为只读取流中显示的新项,或者从较旧的项开始。已处理的项目不会从流中删除,并且可以由其他函数或使用者处理。

Lambda 在发送下个批处理之前不会等待任何配置的 Lambda 扩展 完成。换句话说,扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何并发设置或限制,可能会导致节流问题。要检测这是否是潜在问题,请监控函数并检查所显示的并发指标是否高于事件源映射的预期。

预设情况下,如果函数返回错误,事件源映射会重新处理整个批处理,直到函数成功,或直到批处理中的项目到期。为确保按顺序处理,在错误得到解决之前,事件源映射会暂停处理受影响的分片。您可以将事件源映射配置为放弃旧事件或并行处理多个批次。如果并行处理多个批处理,仍然保证每个分区键按顺序处理,但事件源映射会同时处理同一分片中的多个分区键。

对于流源(DynamoDB 和 Kinesis),可以配置 Lambda 在函数返回错误时重试的最大次数。批次未到达您的函数时出现的服务错误或节流,不计入重试次数。

您还可以将事件源映射配置为在放弃某个事件批次时向其他服务发送调用记录。Lambda 支持以下事件源映射的目标

  • Amazon SQS – SQS 队列。

  • Amazon SNS – SNS 主题。

调用记录包含 JSON 格式的失败事件批次的详细信息。

以下示例显示了 Kinesis 流的调用记录。

例 调用记录
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Lambda 还支持 FIFO(先进先出)队列的有序处理,可纵向扩展到活动消息组的数量。对于标准队列,不一定按顺序处理项。Lambda 向上扩展以尽可能快地处理标准队列。出现错误时,Lambda 会将批处理作为单个项退回队列,并且可在与原始批处理不同的分组中处理。有时,即使没有发生任何函数错误,事件源映射也可能会从队列中接收相同的项两次。Lambda 在成功处理项后将其从队列中删除。如果 Lambda 无法处理项目,您可以配置源队列以将项目发送到死信队列。

有关直接调用 Lambda 函数的服务的信息,请参阅 将 Amazon Lambda 与其他服务一起使用