将 Lambda 与 Amazon SQS 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 Lambda 与 Amazon SQS 结合使用

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

您可以使用 Lambda 函数来处理某个 Amazon Simple Queue Service(Amazon SQS)队列中的消息。Lambda 事件源映射支持标准队列先进先出 (FIFO) 队列。在 Amazon SQS 中,您可以通过将来自一个应用程序组件的任务发送到一个队列中并异步处理它们来进行分载。

Lambda 轮询队列并同步调用您的 Lambda 函数,其中有包含队列消息的事件。Lambda 按批次读取消息,并为每个批次调用一次函数。当您的函数成功处理一个批次后,Lambda 就会将其消息从队列中删除。

当 Lambda 读取某个批处理时,消息将保留在队列中,但会根据队列的可见性超时长度隐藏。如果您的函数成功处理一个批次,Lambda 会将其消息从队列中删除。预设情况下,如果您的函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在可见性超时过期后在队列中重新可见。因此,函数代码必须能够多次处理同一条消息,而不会产生意外的副作用。

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 Amazon 知识中心的如何使我的 Lambda 函数具有幂等性

要防止 Lambda 多次处理消息,您可以将事件源映射配置为在函数响应中包含批处理项目失败,也可以在 Lambda 函数成功处理消息后使用 Amazon SQS API 操作 DeleteMessage 将消息从队列中删除。有关使用 Amazon SQS API 的更多信息,请参阅《Amazon Simple Queue Service API 参考》https://docs.amazonaws.cn/AWSSimpleQueueService/latest/APIReference/

示例标准队列消息事件

例 Amazon SQS 消息事件(标准队列)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

默认情况下,Lambda 将一次性轮询队列中最多 10 条消息,并将该批次发送到函数。为避免在记录数量较少的情况下调用该函数,您可以配置批处理时段,让事件源缓冲最多五分钟的记录。在调用函数之前,Lambda 将继续轮询 SQS 标准队列中的消息,直到批处理时段到期、达到调用有效负载大小配额或达到配置的最大批处理大小为止。

如果您使用的是批处理窗口,并且 SQS 队列包含的流量非常低,Lambda 可能会等待最多 20 秒钟才能调用您的函数。即使您将批处理窗口设置为低于 20 秒,情况依然如此。

注意

在 Java 中,反序列化 JSON 时可能会遇到空指针错误。这可能要归因于 JSON 对象映射器转换“Records”和“eventSourceARN”大小写的方式。

示例 FIFO 队列消息事件

对于 FIFO 队列,记录包含与重复数据消除和顺序相关的其他属性。

例 Amazon SQS 消息事件(FIFO 队列)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

配置队列以便用于 Lambda

创建一个 SQS 队列,用作您的 Lambda 函数事件源。然后将队列配置为可使您的 Lambda 函数有时间处理每批事件,并使 Lambda 在扩展时出现限制错误时能够重试。

为使函数有时间处理每批记录,请将源队列的可见性超时至少设置为您在函数上配置的超时的六倍。这一额外的时间有利于 Lambda 在您的函数处理之前的批处理期间遇到限流时进行重试。

如果函数多次都未能处理某条消息,则 Amazon SQS 可以将其发送到某个死信队列。如果函数返回错误,则批处理中的所有项目都将返回到队列中。在发生可见性超时之后,Lambda 会重新接收消息。要在多次接收之后将消息发送到第二个队列,请在源队列上配置死信队列。

注意

确保在源队列上配置死信队列,而不是在 Lambda 函数上配置。您在函数上配置的死信队列用于函数的异步调用队列,而不是用于事件源队列。

如果您的函数返回错误,或者由于处于最大并发而无法调用,则处理可能会成功,但需要额外尝试。要在将消息发送到死信队列之前给予更好的处理机会,请将源队列重新驱动策略的 maxReceiveCount 设置为至少 5

执行角色权限

AWSLambdaSQSQueueExecutionRole Amazon 托管策略 Lambda 从您的 Amazon SQS 队列中读取所需的权限。将此托管策略添加到您的函数的执行角色。

或者,如果您使用的是加密队列,则还需要为执行角色添加以下权限:

添加权限并创建事件源映射

创建事件源映射以指示 Lambda 将队列中的项目发送到 Lambda 函数。您可以创建多个事件源映射,以使用单个函数处理来自多个队列的项目。当 Lambda 调用目标函数时,事件可以包含多个项目(最多为可配置的最大批处理大小)。

要将您的函数配置为从 Amazon SQS 中读取,请将 AWSLambdaSQSQueueExecutionRole Amazon 托管策略附加到您的执行角色,然后创建 SQS 触发器。

要添加权限并创建触发器
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. 选择 Configuration(配置)选项卡,然后选择 Permissions(权限)。

  4. 角色名称下,选择至执行角色的链接。此角色将在 IAM 控制台中打开角色。

    至执行角色的链接
  5. 选择添加权限,然后选择附加策略

    在 IAM 控制台中附加策略
  6. 在搜索字段中输入 AWSLambdaSQSQueueExecutionRole。向执行角色添加此策略。这是一项 Amazon 托管策略,其中包含您的函数从 Amazon SQS 队列中读取所需的权限。有关此策略的更多信息,请参阅《Amazon 托管策略参考》中的 AWSLambdaSQSQueueExecutionRole

  7. 在 Lambda 控制台中返回您的函数。在 Function overview(函数概览)下,选择 Add trigger(添加触发器)。

    Lambda 控制台的函数概述部分
  8. 选择触发器类型。

  9. 配置必填选项,然后选择 Add(添加)。

Lambda 支持 Amazon SQS 事件源的以下选项:

SQS 队列

要从中读取记录的 Amazon SQS 队列。

启用触发器

事件源映射的状态。Enable trigger(启用触发器)默认处于选中状态。

批次大小

每个批次中要发送给函数的最大记录数。对于标准队列,这最高可为 10,000 条记录。对于 FIFO 队列,最大值为 10。对于超过 10 的批处理大小,还必须将批处理时间(MaximumBatchingWindowInSeconds)设置为至少 1 秒。

配置函数超时,以允许有足够的时间来处理整个批次的项目。如果项目处理需要很长时间,请选择一个较小的批处理大小。大批量处理可以提高非常快速或拥有大量开销的工作负载的效率。如果您在函数上配置了预留并发,请将最小并发执行数设置为 5,以降低在 Lambda 调用函数时出现节流错误的几率。

Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的调用有效负载大小配额(6 MB)。Lambda 和 Amazon SQS 都会为每条记录生成元数据。这一额外的元数据将会计入总有效负载大小,并且可能导致批处理中发送的记录总数低于配置的批处理大小。Amazon SQS 发送的元数据字段的长度是可变的。有关 Amazon SQS 元数据字段的更多信息,请参阅《Amazon Simple Queue Service API 参考》中的 ReceiveMessage API 操作文档。

Batch 时间

在调用函数之前收集记录的最长时间(以秒为单位)。它仅适用于标准队列。

如果您使用的批处理时间大于 0 秒,则必须考虑队列可见性超时中增加的处理时间。我们建议将队列可见性超时设置为函数超时的六倍,加上 MaximumBatchingWindowInSeconds 的值。这使 Lambda 函数有时间处理每个批次的事件,并在出现节流错误时重试。

当消息可用时,Lambda 开始批量处理消息。Lambda 通过五次并发调用您的函数开始一次处理五个批处理。如果仍有消息可用,则 Lambda 最多每分钟添加 300 个函数实例,最多为 1000 个函数实例。要了解函数扩展和并发的更多信息,请参阅 Lambda 函数扩展

要处理更多消息,您可以优化 Lambda 函数以提高吞吐量。请参阅了解 Amazon Lambda 如何使用 Amazon SQS 标准队列进行扩展

最大并发数量

事件源可调用的最大并发函数数量。有关更多信息,请参阅 为 Amazon SQS 事件源配置最大并发

筛选条件

添加筛选条件以控制 Lambda 将哪些事件发送给函数进行处理。有关更多信息,请参阅 Lambda 事件筛选

扩展和处理

对于标准队列,Lambda 使用长轮询来轮询一个队列,直到它变为活动状态。当消息可用时,Lambda 会通过函数的五次并发调用开始一次处理五个批次。如果仍有消息可用,则 Lambda 增加批量读取的进程数,最多每分钟增加 300 个实例。事件源映射可以同时处理的最大批处理数量为 1,000。

对于 FIFO 队列,Lambda 按照接收消息的顺序向函数发送消息。向 FIFO 队列发送消息时,需要指定消息组 ID。Amazon SQS 确保同一组中的消息按顺序传递到 Lambda。当 Lambda 按批次读取消息时,每个批次可能包含来自多个消息组的消息,但消息的顺序保持不变。如果函数返回错误,函数会对受影响的消息尝试所有重试,然后 Lambda 才会接收来自同一个组的其他消息。

您的函数可以在并发范围内横向缩减到活动消息组的数量。有关更多信息,请参阅 Amazon 计算博客上的作为事件源的 SQS FIFO

为 Amazon SQS 事件源配置最大并发

最大并发设置限制了 Amazon SQS 事件源可以调用的函数的并发实例数。最大并发属于事件源级别的设置。如果您将多个 Amazon SQS 事件源映射到一个函数,则每个事件源均可进行单独的最大并发设置。您可以使用最大并发,从而防止某个队列使用函数的所有预留并发账户的其余并发限额。在 Amazon SQS 事件源上配置最大并发不收取任何费用。

重要的是,最大并发和预留并发为两个独立的设置。不要将最大并发设置为高于函数的预留并发。配置最大并发后,请确保您的函数的预留并发大于或等于该函数上所有 Amazon SQS 事件源的最大总并发数。否则,Lambda 可能会限制您的消息。

如果未设置最大并发,Lambda 可以将您的 Amazon SQS 事件源纵向扩展到您账户的总并发配额(默认为 1000)。

注意

对于 FIFO 队列,并发调用的上限为消息组 ID 的数量 (messageGroupId) 或最大并发设置,以较低者为准。例如,如果您有六个消息组 ID 并且最大并发设置为 10,则函数最多可以进行六次并发调用。

您可以在新的和现有的 Amazon SQS 事件源映射上配置最大并发。

使用 Lambda 控制台配置最大并发
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. Function overview(函数概览)下,选择 SQS。此操作将打开 Configuration(配置)选项卡。

  4. 选择 Amazon SQS 触发器,然后选择 Edit(编辑)。

  5. 对于 Maximum concurrency(最大并发),输入 2 到 1,000 之间的数字。要关闭最大并发,将该框保留为空。

  6. 选择保存

使用 Amazon Command Line Interface(Amazon CLI)配置最大并发

使用带 --scaling-config 选项的 update-event-source-mapping 命令。例如:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

要关闭最大并发,请为 --scaling-config 输入一个空值:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config "{}"
使用 Lambda API 配置最大并发

CreateEventSourceMappingUpdateEventSourceMapping 操作与 ScalingConfig 对象结合使用。

事件源映射 API

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

以下示例使用 Amazon CLI 将名为 my-function 的函数映射到由 Amazon Resource Name (ARN) 指定的 Amazon SQS 队列,批处理大小为 5,批处理窗口为 60 秒。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

您应看到以下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

失败调用的回退策略

如果调用失败,Lambda 会在实施回退策略时尝试重试调用。回退策略略有不同,具体取决于 Lambda 的故障原因是函数代码中的错误还是节流所致。

  • 如果您的函数代码导致了该错误,Lambda 将停止处理并重试调用。同时,Lambda 会逐渐退出,以减少分配给 Amazon SQS 事件源映射的并发量。队列的可见性超时结束后,消息将再次显示在队列中。

  • 如果调用失败是节流造成的,Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。Lambda 会继续重试该消息,直到消息的时间戳超过队列的可见性超时,此时 Lambda 会删除该消息。

实施部分批处理响应

预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见,包括 Lambda 已经成功处理的消息。因此,您的函数最终可能会多次处理同一消息。

对于处理失败的批处理,要避免重新处理其中已经成功处理的消息,您可以将事件源映射配置为仅使失败的消息重新可见。这称为部分批处理响应。要开启部分批处理响应,请在配置事件源映射时为 FunctionResponseTypes 操作指定 ReportBatchItemFailures。这可以让您的函数返回部分成功,从而有助于减少对记录进行不必要的重试次数。

激活 ReportBatchItemFailures 后,当函数调用失败时,Lambda 不会 缩减消息轮询范围。如果您预计某些消息会失败,并且不希望这些失败影响消息处理速率,请使用 ReportBatchItemFailures

注意

在使用部分批处理响应时,请记住以下几点:

  • 如果您函数发现了一个异常,则整个批处理将被视为完全失败。

  • 如果您将此功能与 FIFO 队列结合使用,则您的函数应在第一次失败后停止处理消息,并返回 batchItemFailures 中的所有失败和未处理的消息。这有助于保持队列中消息的顺序。

激活部分批处理报告
  1. 查看 实施部分批处理响应的最佳实践

  2. 运行以下命令来为您的函数激活 ReportBatchItemFailures。要检索事件源映射的 UUID,请运行 list-event-source-mappings Amazon CLI 命令。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 更新您的函数代码以捕获所有异常并在 batchItemFailures JSON 响应中返回处理失败的消息。batchItemFailures 响应必须包含消息 ID 列表,以作为 itemIdentifier JSON 值。

    例如,假设一个批处理有五条消息,消息 ID 分别为 id1id2id3id4id5。您的函数成功处理了 id1id3id5。要使消息 id2id4 在队列中重新可见,您的函数应运行以下响应:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    以下函数代码示例将返回批处理中处理失败消息的 ID 列表:

    .NET
    Amazon SDK for .NET
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 .NET 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    适用于 Go V2 的 SDK
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Go 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK for Java 2.x
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Java 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    适用于 JavaScript 的 SDK (v3)
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 JavaScript 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    报告使用 TypeScript 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }
    PHP
    适用于 PHP 的 SDK
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 PHP 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { $this->logger->info("Processing SQS records"); $records = $event->getRecords(); foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $this->markAsFailed($record); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); } } $logger = new StderrLogger(); return new Handler($logger);
    Python
    SDK for Python(Boto3)
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Python 进行 Lambda SQS 批处理项目失败。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import json def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    适用于 Ruby 的 SDK
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Ruby 进行 Lambda SQS 批处理项目失败。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    适用于 Rust 的 SDK
    注意

    在 GitHub 上查看更多内容。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Rust 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

如果处理失败的事件没有返回到队列,请参阅 Amazon 知识中心中的 如何排查 Lambda 函数 SQS ReportBatchItemFailures 问题?

成功和失败的条件

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全成功:

  • 空的 batchItemFailures 列表

  • Null batchItemFailures 列表

  • 空的 EventResponse

  • Null EventResponse

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全失败:

  • JSON 响应无效

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

  • 具有某个消息 ID 的 itemIdentifier 值不存在

CloudWatch 指标

要确定函数是否在正确报告批处理项目失败情况,您可以监控 Amazon CloudWatch 中的 NumberOfMessagesDeletedApproximateAgeOfOldestMessage Amazon SQS 指标。

  • NumberOfMessagesDeleted 会跟踪从队列中删除的消息数量。如果该值降至 0,则表明您的函数响应没有正确返回失败的消息。

  • ApproximateAgeOfOldestMessage 会跟踪最早的消息在队列中停留的时间。如果此指标急剧增加,则可能表明您的函数没有正确返回失败的消息。

Amazon SQS 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于 Amazon SQS。

适用于 Amazon SQS 的事件源参数
参数 必需 默认值 备注

BatchSize

10

对于标准队列,最大值为 10000。对于 FIFO 队列,最大值为 10。

启用

真实

EventSourceArn

Y

数据流或流使用者的 ARN

FunctionName

FilterCriteria

Lambda 事件筛选

FunctionResponseTypes

要使您的函数报告某个批处理中的特定失败,请在 FunctionResponseTypes 中包含值 ReportBatchItemFailures。有关更多信息,请参阅 实施部分批处理响应

MaximumBatchingWindowInSeconds

0

ScalingConfig

为 Amazon SQS 事件源配置最大并发