将 Lambda 与 Amazon SQS 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 Lambda 与 Amazon SQS 结合使用

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

您可以使用 Lambda 函数来处理某个 Amazon Simple Queue Service(Amazon SQS)队列中的消息。Lambda 事件源映射支持标准队列先进先出 (FIFO) 队列。在 Amazon SQS 中,您可以通过将来自一个应用程序组件的任务发送到一个队列中并异步处理它们来进行分载。

Lambda 轮询队列并同步调用您的 Lambda 函数,其中有包含队列消息的事件。Lambda 按批次读取消息,并为每个批次调用一次函数。当您的函数成功处理一个批次后,Lambda 就会将其消息从队列中删除。

当 Lambda 读取某个批处理时,消息将保留在队列中,但会根据队列的可见性超时长度隐藏。如果您的函数成功处理一个批次,Lambda 会将其消息从队列中删除。预设情况下,如果您的函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见。因此,函数代码必须能够多次处理同一条消息,而不会产生意外的副作用。

要防止 Lambda 多次处理消息,您可以将事件源映射配置为在函数响应中包含批处理项目失败,也可以在 Lambda 函数成功处理消息后使用 Amazon SQS API 操作 DeleteMessage 将消息从队列中删除。有关使用 Amazon SQS API 的更多信息,请参阅 Amazon Simple Queue Service API Reference(Amazon Simple Email Service API 参考)。

示例标准队列消息事件

例 Amazon SQS 消息事件(标准队列)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

默认情况下,Lambda 将一次性轮询队列中最多 10 条消息,并将该批次发送到函数。为避免在记录数量较少的情况下调用该函数,您可以配置批处理时段,让事件源缓冲最多五分钟的记录。在调用函数之前,Lambda 将继续轮询 SQS 标准队列中的消息,直到批处理时段到期、达到调用有效负载大小配额或达到配置的最大批处理大小为止。

注意

如果您使用的是批处理窗口,并且 SQS 队列包含的流量非常低,Lambda 可能会等待最多 20 秒钟才能调用您的函数。即使您将批处理窗口设置为低于 20 秒,情况依然如此。

示例 FIFO 队列消息事件

对于 FIFO 队列,记录包含与重复数据消除和顺序相关的其他属性。

例 Amazon SQS 消息事件(FIFO 队列)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

配置队列以便用于 Lambda

创建一个 SQS 队列,用作您的 Lambda 函数事件源。然后将队列配置为可使您的 Lambda 函数有时间处理每批事件,并使 Lambda 在扩展时出现限制错误时能够重试。

为使函数有时间处理每批记录,请将源队列的可见性超时至少设置为您在函数上配置的超时的六倍。这一额外的时间有利于 Lambda 在您的函数处理之前的批处理期间遇到限流时进行重试。

如果函数多次都未能处理某条消息,则 Amazon SQS 可以将其发送到某个死信队列。如果函数返回错误,则批处理中的所有项目都将返回到队列中。在发生可见性超时之后,Lambda 会重新接收消息。要在多次接收之后将消息发送到第二个队列,请在源队列上配置死信队列。

注意

确保在源队列上配置死信队列,而不是在 Lambda 函数上配置。您在函数上配置的死信队列用于函数的异步调用队列,而不是用于事件源队列。

如果您的函数返回错误,或者由于处于最大并发而无法调用,则处理可能会成功,但需要额外尝试。要在将消息发送到死信队列之前给予更好的处理机会,请将源队列重新驱动策略的 maxReceiveCount 设置为至少 5

执行角色权限

Lambda 需要以下权限来管理 Amazon SQS 队列中的消息。将这些权限添加到您的函数的执行角色中。

将队列配置为事件源

创建事件源映射以指示 Lambda 将队列中的项目发送到 Lambda 函数。您可以创建多个事件源映射,以使用单个函数处理来自多个队列的项目。当 Lambda 调用目标函数时,事件可以包含多个项目(最多为可配置的最大批处理大小)。

要在 Lambda 控制台中将您的函数配置为从 Amazon SQS 读取,请创建 SQS 触发器。

创建触发器
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. 选择 SQS 触发器类型。

  5. 配置必填选项,然后选择 Add(添加)。

Lambda 支持 Amazon SQS 事件源的以下选项:

SQS 队列

要从中读取记录的 Amazon SQS 队列。

启用触发器

事件源映射的状态。Enable trigger(启用触发器)默认处于选中状态。

Batch 大小

每个批次中要发送给函数的记录数。对于标准队列,这最高可为 10,000 条记录。对于 FIFO 队列,最大值为 10。对于超过 10 的批处理大小,还必须将批处理时间(MaximumBatchingWindowInSeconds)设置为至少 1 秒。

配置函数超时,以允许有足够的时间来处理整个批次的项目。如果项目处理需要很长时间,请选择一个较小的批处理大小。大批量处理可以提高非常快速或拥有大量开销的工作负载的效率。如果您在函数上配置了预留并发,请将最小并发执行数设置为 5,以降低在 Lambda 调用函数时出现节流错误的几率。

Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的调用有效负载大小配额(6 MB)。Lambda 和 Amazon SQS 都会为每条记录生成元数据。这一额外的元数据将会计入总有效负载大小,并且可能导致批处理中发送的记录总数低于配置的批处理大小。Amazon SQS 发送的元数据字段的长度是可变的。有关 Amazon SQS 元数据字段的更多信息,请参阅《Amazon Simple Queue Service API 参考》中的 ReceiveMessage API 操作文档。

Batch 时间

在调用函数之前收集记录的最长时间(以秒为单位)。它仅适用于标准队列。

如果您使用的批处理时间大于 0 秒,则必须考虑队列可见性超时中增加的处理时间。我们建议将队列可见性超时设置为函数超时的六倍,加上 MaximumBatchingWindowInSeconds 的值。这使 Lambda 函数有时间处理每个批次的事件,并在出现节流错误时重试。

注意

如果您的批处理时段大于 0,并且 (batch window) + (function timeout) > (queue visibility timeout),则您的有效队列可见性超时为 (batch window) + (function timeout) + 30s

当消息可用时,Lambda 开始批量处理消息。Lambda 通过五次并发调用您的函数开始一次处理五个批处理。如果仍有消息可用,则 Lambda 最多每分钟添加 60 个函数实例,最多为 1000 个函数实例。要了解函数扩展和并发的更多信息,请参阅 Lambda 函数扩展

要处理更多消息,您可以优化 Lambda 函数以提高吞吐量。请参阅了解 Amazon Lambda 如何使用 Amazon SQS 标准队列进行扩展

最大并发数量

事件源可调用的最大并发函数数量。有关更多信息,请参阅为 Amazon SQS 事件源配置最大并发

筛选条件

添加筛选条件以控制 Lambda 将哪些事件发送给函数进行处理。有关更多信息,请参阅正确筛选 Amazon SQS 消息

扩展和处理

对于标准队列,Lambda 使用长轮询来轮询一个队列,直到它变为活动状态。当消息可用时,Lambda 最多可读取五个批处理并将其发送到您的函数。如果仍有消息可用,则 Lambda 增加批量读取的进程数,最多每分钟增加 60 个实例。事件源映射可以同时处理的最大批处理数量为 1,000。

对于 FIFO 队列,Lambda 按照接收消息的顺序向函数发送消息。向 FIFO 队列发送消息时,需要指定消息组 ID。Amazon SQS 确保同一组中的消息按顺序传递到 Lambda。Lambda 将消息分组,并且每次只为一组发送一个批次。如果函数返回错误,函数会对受影响的消息尝试所有重试,然后 Lambda 才会接收来自同一个组的其他消息。

您的函数可以在并发范围内横向缩减到活动消息组的数量。有关更多信息,请参阅 Amazon 计算博客上的作为事件源的 SQS FIFO

为 Amazon SQS 事件源配置最大并发

最大并发设置限制了 Amazon SQS 事件源可以调用的函数的并发实例数。最大并发属于事件源级别的设置。如果您将多个 Amazon SQS 事件源映射到一个函数,则每个事件源均可进行单独的最大并发设置。您可以使用最大并发,从而防止某个队列使用函数的所有预留并发账户的其余并发限额。您可以同时使用最大并发和预留并发,也可以单独使用。

注意

不能将最大并发设置为高于函数的预留并发。配置最大并发后,请确保您不会将函数的预留并发减少至低于该函数上所有 Amazon SQS 事件源的最大总并发数。否则,Lambda 可能会限制您的消息。对于 FIFO 队列,最大并发以消息组数为上限。

在 Amazon SQS 事件源上配置最大并发不收取任何费用。

您可以在新的和现有的 Amazon SQS 事件源映射上配置最大并发。

使用 Lambda 控制台配置最大并发
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. Function overview(函数概览)下,选择 SQS。此操作将打开 Configuration(配置)选项卡。

  4. 选择 Amazon SQS 触发器,然后选择 Edit(编辑)。

  5. 对于 Maximum concurrency(最大并发),输入 2 到 1,000 之间的数字。要关闭最大并发,将该框保留为空。

  6. 选择 Save(保存)。

使用 Amazon Command Line Interface(Amazon CLI)配置最大并发

使用带 --scaling-config 选项的 update-event-source-mapping 命令。示例:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

要关闭最大并发,请为 --scaling-config 输入一个空值:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config "{}"
使用 Lambda API 配置最大并发

CreateEventSourceMappingUpdateEventSourceMapping 操作与 ScalingConfig 对象结合使用。

事件源映射 API

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

以下示例使用 Amazon CLI 将名为 my-function 的函数映射到由 Amazon Resource Name (ARN) 指定的 Amazon SQS 队列,批处理大小为 5,批处理窗口为 60 秒。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

您应看到以下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

失败调用的回退策略

如果调用失败,Lambda 会在实施回退策略时尝试重试调用。回退策略略有不同,具体取决于 Lambda 的故障原因是函数代码中的错误还是节流所致。

  • 如果错误是函数代码造成的,Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。如果调用仍然失败,Lambda 最终会删除该消息,并且不会重试。

  • 如果调用失败是节流造成的,Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。Lambda 会继续重试该消息,直到消息的时间戳超过队列的可见性超时,此时 Lambda 会删除该消息。

实施部分批处理响应

预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见,包括 Lambda 已经成功处理的消息。因此,您的函数最终可能会多次处理同一消息。

对于处理失败的批处理,要避免重新处理其中已经成功处理的消息,您可以将事件源映射配置为仅使失败的消息重新可见。这称为部分批处理响应。要开启部分批处理响应,请在配置事件源映射时为 FunctionResponseTypes 操作指定 ReportBatchItemFailures。这可以让您的函数返回部分成功,从而有助于减少对记录进行不必要的重试次数。

激活 ReportBatchItemFailures 后,当函数调用失败时,Lambda 不会 缩减消息轮询范围。如果您预计某些消息会失败,并且不希望这些失败影响消息处理速率,请使用 ReportBatchItemFailures

注意

在使用部分批处理响应时,请记住以下几点:

  • 如果您函数发现了一个异常,则整个批处理将被视为完全失败。

  • 如果您将此功能与 FIFO 队列结合使用,则您的函数应在第一次失败后停止处理消息,并返回 batchItemFailures 中的所有失败和未处理的消息。这有助于保持队列中消息的顺序。

激活部分批处理报告
  1. 查看 实施部分批处理响应的最佳实践

  2. 运行以下命令来为您的函数激活 ReportBatchItemFailures。要检索事件源映射的 UUID,请运行 list-event-source-mappings Amazon CLI 命令。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 更新您的函数代码以捕获所有异常并在 batchItemFailures JSON 响应中返回处理失败的消息。batchItemFailures 响应必须包含消息 ID 列表,以作为 itemIdentifier JSON 值。

    例如,假设一个批处理有五条消息,消息 ID 分别为 id1id2id3id4id5。您的函数成功处理了 id1id3id5。要使消息 id2id4 在队列中重新可见,您的函数应运行以下响应:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    以下函数代码示例将返回批处理中处理失败消息的 ID 列表:

    Python
    例 – batchItemFailures 的 Python 函数代码
    import json def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Java
    例 – batchItemFailures 的 Java 函数代码
    import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }

如果处理失败的事件没有返回到队列,请参阅 Amazon 知识中心中的 如何排查 Lambda 函数 SQS ReportBatchItemFailures 问题?

成功和失败的条件

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全成功:

  • 空的 batchItemFailures 列表

  • Null batchItemFailures 列表

  • 空的 EventResponse

  • Null EventResponse

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全失败:

  • JSON 响应无效

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

  • 具有某个消息 ID 的 itemIdentifier 值不存在

CloudWatch metrics(CloudWatch 指标)

要确定函数是否在正确报告批处理项目失败情况,您可以监控 Amazon CloudWatch 中的 NumberOfMessagesDeletedApproximateAgeOfOldestMessage Amazon SQS 指标。

  • NumberOfMessagesDeleted 会跟踪从队列中删除的消息数量。如果该值降至 0,则表明您的函数响应没有正确返回失败的消息。

  • ApproximateAgeOfOldestMessage 会跟踪最早的消息在队列中停留的时间。如果此指标急剧增加,则可能表明您的函数没有正确返回失败的消息。

Amazon SQS 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于 Amazon SQS。

适用于 Amazon SQS 的事件源参数
参数 必需 默认值 备注

BatchSize

10

对于标准队列,最大值为 10000。对于 FIFO 队列,最大值为 10。

启用

真实

EventSourceArn

数据流或流使用者的 ARN

FunctionName

FilterCriteria

Lambda 事件筛选

FunctionResponseTypes

要使您的函数报告某个批处理中的特定失败,请在 FunctionResponseTypes 中包含值 ReportBatchItemFailures。有关更多信息,请参阅实施部分批处理响应

MaximumBatchingWindowInSeconds

0

ScalingConfig

为 Amazon SQS 事件源配置最大并发