Lambda 事件源映射 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Lambda 事件源映射

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

事件源映射是一个从事件源读取并调用 Lambda 函数的 Lambda 资源。您可以使用事件源映射来处理未直接调用 Lambda 函数的服务中的流或队列中的项。本页介绍了 Lambda 提供事件源映射的服务以及如何微调批处理行为。

事件源映射使用函数执行角色中的权限来读取和管理事件源中的项。权限、事件结构、设置和轮询行为因事件源而异。有关更多信息,请参阅用作事件源的服务的链接主题。

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

警告

Lambda 事件源映射对每个事件至少处理一次,并且可能会发生批次的重复处理。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 Amazon 知识中心的如何使我的 Lambda 函数具有幂等性

创建事件源映射

要创建事件源与 Lambda 函数之间的映射,请在控制台中创建触发器或使用 create-event-source-mapping 命令。

要添加权限并创建触发器
  1. 将所需权限添加到您的执行角色。某些服务(例如 Amazon SQS)具有 Amazon 托管策略,其中包括 Lambda 从您的事件源读取所需的权限。

  2. 打开 Lamba 控制台的函数页面

  3. 选择一个函数的名称。

  4. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

    
            Lambda 控制台的函数概述部分
  5. 选择触发器类型。

  6. 配置必填选项,然后选择 Add(添加)。

创建事件源映射(Amazon CLI)

以下示例使用 Amazon CLI 将一个名为 my-function 的函数映射到由其 Amazon Resource Name(ARN)指定的 DynamoDB 流,其中批处理大小为 500。

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525

您应看到以下输出:

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新事件源映射

要更新事件源映射(控制台)
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择函数。

  3. 选择配置,然后选择触发器

  4. 选择触发器,然后选择编辑

要更新事件源映射(Amazon CLI)

使用 update-event-source-mapping 命令。以下示例为 Amazon SQS 事件源配置最大并发性

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

删除事件源映射

当您删除函数时,Lambda 不会删除关联的事件源映射。您可以在控制台中删除事件源映射,也可以使用 DeleteEventSourceMapping API 操作删除它们。

要删除事件源映射(控制台)
  1. 打开 Lambda 控制台的“事件源映射”页面

  2. 选择要删除的事件源映射。

  3. 删除事件源映射对话框中,输入 delete,然后选择删除

要删除事件源映射(Amazon CLI)

使用 delete-event-source-mapping 命令。

aws lambda delete-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-11111EXAMPLE
注意

在更新、禁用或删除 Amazon MQ、Amazon MSK、自行管理的 Apache Kafka 或 Amazon DocumentDB 的事件源映射后,更改最长可能需要 15 分钟才会生效。在此期间结束之前,您的事件源映射可能会继续处理事件并使用您之前的设置来调用您的函数。即使控制台中显示的事件源映射状态表明更改已经生效,也可能发生这种情况。

批处理行为

事件源映射从目标事件源读取项目。预设情况下,事件源映射会将记录合并为单个有效负载进行批处理,并由 Lambda 将其发送到您的函数。您可以配置批处理时段 (MaximumBatchingWindowInSeconds) 和批处理大小 (BatchSize) 来优化批处理行为。批处理时段是将记录收集到单个有效负载中的最长时间。批处理大小是单个批处理中的最大记录数。满足以下三个条件中的任意一个时,Lambda 会调用您的函数:

  • 批处理时段达到其最大值。默认的批处理时段行为因特定的事件源而异。

    • 对于 Kinesis、DynamoDB 和 Amazon SQS 事件源:原定设置的批处理时段是 0 秒。这意味着,Lambda 只有在满足批量大小或达到有效负载大小限制时才会向您的函数发送批次。要设置批处理时段,请配置 MaximumBatchingWindowInSeconds。您可以将此参数设置为介于 0 秒到 300 秒之间的任意值,以 1 秒为增量。如果您配置了批处理时段,则下一个时段将在上一个函数调用完成后立即开始计算。

    • 对于 Amazon MSK、自托管式 Apache Kafka、Amazon MQ 和 Amazon DocumentDB 事件源:默认批处理时段为 500 毫秒。您可以将 MaximumBatchingWindowInSeconds 配置为介于 0 秒到 300 秒之间的任意值,以秒的整数倍调整。第一条记录到达后,批处理时段将立即开始计算。

      注意

      由于您只能以秒的整数倍调整 MaximumBatchingWindowInSeconds,您无法在更改该值后恢复到 500 毫秒的原定设置批处理时段。要恢复原定设置的批处理时段,必须创建新的事件源映射。

  • 达到批处理大小。最小批处理大小为 1。原定设置和最大批处理大小取决于事件源。有关这些值的详细信息,请参阅 CreateEventSourceMapping API 操作的 BatchSize 规范。

  • 有效负载大小达到 6MB您不能修改此限制。

下图演示了这三个条件。假设批处理时段从 t = 7 秒开始。在第一种场景中,批处理时段累积 5 条记录后在 t = 47 秒达到 40 秒的最大值。在第二种场景中,批处理大小在批处理时段到期之前达到 10,因此批处理时段会提前结束。在第三种场景中,在批处理时段到期之前达到最大有效负载大小,因此批处理时段会提前结束。


        当满足以下三个条件中的任何一个时,批处理时段将到期:批处理时段达到最大值、达到批处理大小或有效负载大小达到 6MB。

以下示例显示了从 Kinesis 流读取的事件源映射。如果一批事件的所有处理尝试失败,则事件源映射将有关该批次的详细信息发送到 SQS 队列。


        从 Kinesis 流读取的事件源映射。它在本地将记录排队后才将记录发送给函数。

事件批次是 Lambda 发送到函数的事件。它是由事件源映射在当前批处理时段到期之前读取的项目组成的一批记录或消息。

对于 Kinesis 和 DynamoDB 流,事件源映射为流中的每个分片创建迭代器,并按顺序处理每个分片中的项。您可以将事件源映射配置为只读取流中显示的新项,或者从较旧的项开始。已处理的项目不会从流中删除,并且可以由其他函数或使用者处理。

Lambda 在发送下个批处理之前不会等待任何配置的 Lambda 扩展 完成。换句话说,扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何并发设置或限制,可能会导致节流问题。要检测这是否是潜在问题,请监控函数并检查所显示的并发指标是否高于事件源映射的预期。由于调用间隔时间较短,Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。

预设情况下,如果函数返回错误,事件源映射会重新处理整个批处理,直到函数成功,或直到批处理中的项目到期。为确保按顺序处理,在错误得到解决之前,事件源映射会暂停处理受影响的分片。您可以将事件源映射配置为放弃旧事件或并行处理多个批次。如果并行处理多个批处理,仍然保证每个分区键按顺序处理,但事件源映射会同时处理同一分片中的多个分区键。

对于流源(DynamoDB 和 Kinesis),可以配置 Lambda 在函数返回错误时重试的最大次数。批次未到达您的函数时出现的服务错误或节流,不计入重试次数。

您还可以将事件源映射配置为在放弃某个事件批次时向其他服务发送调用记录。Lambda 支持以下事件源映射的目标

  • Amazon SQS – SQS 队列。

  • Amazon SNS – SNS 主题。

调用记录包含 JSON 格式的失败事件批次的详细信息。

以下示例显示了 Kinesis 流的调用记录。

例 调用记录
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Lambda 还支持 FIFO(先进先出)队列的有序处理,可纵向扩展到活动消息组的数量。对于标准队列,不一定按顺序处理项。Lambda 向上扩展以尽可能快地处理标准队列。出现错误时,Lambda 会将批处理作为单个项退回队列,并且可在与原始批处理不同的分组中处理。有时,即使没有发生任何函数错误,事件源映射也可能会从队列中接收相同的项两次。Lambda 在成功处理项后将其从队列中删除。如果 Lambda 无法处理项目,您可以配置源队列以将项目发送到死信队列或目标

有关直接调用 Lambda 函数的服务的信息,请参阅 将 Amazon Lambda 与其他服务一起使用

为事件源映射调用配置目标

要保留失败的事件源映射调用的记录,请在函数的事件源映射中添加一个目标。仅基于 Kinesis、DynamoDB 和 Kafka 的事件源支持为事件源映射调用配置目标。发送到目标的每条记录都是一个 JSON 文档,其中包含有关调用的详细信息。与错误处理设置一样,您可以在函数版本或别名上配置目标。

注意

对于事件源映射调用,只能保留失败调用的记录。对于其他异步调用,您可以同时保留成功调用和失败调用的记录。有关更多信息,请参阅配置异步调用目标

您可以将任何 Amazon SNS 主题或任何 Amazon SQS 队列配置为目标。对于这些目标类型,Lambda 会将记录元数据发送到目标。仅对于基于 Kafka 的事件源,您还可以选择 Amazon S3 存储桶作为目标。如果您指定 S3 存储桶,Lambda 会将整个调用记录以及元数据发送到目标。

下表总结了事件源映射调用支持的目标类型。要让 Lambda 成功将记录发送到您选择的目标,请确保函数的执行角色也包含相关权限。该表还描述了每种目标类型如何接收 JSON 调用记录。

目标类型 支持以下事件源 所需的权限 特定于目标的 JSON 格式

Amazon SQS 队列

  • Kinesis

  • DynamoDB

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录元数据作为 Message 传递到目标。

Amazon SNS 主题

  • Kinesis

  • DynamoDB

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录元数据作为 Message 传递到目标。

Amazon S3 存储桶

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录及其元数据存储在目标。

以下示例显示了 Lambda 在 Kinesis 事件源调用失败时向 SQS 队列或 SNS 主题发送的内容。由于 Lambda 仅为这些目标类型发送元数据,因此请使用 streamArnshardIdstartSequenceNumberendSequenceNumber 字段获取完整的原始记录。

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

有关 DynamoDB 事件源的示例,请参阅 错误处理。有关 Kafka 事件源的示例,请参阅自行管理的 Apache Kafka 的故障时目标Amazon MSK 的故障时目标