将 Amazon Lambda 与 Amazon SQS 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

将 Amazon Lambda 与 Amazon SQS 结合使用

您可以使用 Amazon Lambda 函数处理 Amazon Simple Queue Service (Amazon SQS) 队列中的消息。Lambda 事件源映射支持标准队列先进先出 (FIFO) 队列。在 Amazon SQS 中,您可以通过将来自一个应用程序组件的任务发送到一个队列中并异步处理它们来进行分载。

Lambda 轮询该队列,并通过一个包含队列消息的事件来同步调用您的 Lambda 函数。Lambda 读取批次中的消息,并为每个批次调用一次函数。当您的函数成功处理一个批次后,Lambda 就会将其消息从队列中删除。以下示例显示了包含两条消息的批次事件。

例 Amazon SQS 消息事件(标准队列)

{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws-cn:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws-cn:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

默认情况下,Lambda 会在 SQS 队列中有记录后立即调用函数。Lambda 将立即轮询队列中最多 10 条消息,并将该批次发送到该函数。为避免在记录数量较少的情况下调用该函数,您可以配置批处理窗口,让事件源缓冲最多 5 分钟的记录。在调用函数之前,Lambda 将继续轮询 SQS 标准队列中的消息,直到批处理窗口到期、达到有效负载限制或达到完整批处理大小为止。

对于 FIFO 队列,记录包含与重复数据消除和顺序相关的其他属性。

例 Amazon SQS 消息事件(FIFO 队列)

{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws-cn:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

当 Lambda 读取批次时,消息将保留在队列中,但会在队列可见性超时的长度内隐藏。如果您的函数成功处理一个批次,Lambda 会将其消息从队列中删除。如果您的函数受到限制、返回错误或没有响应,则消息将再次可见。所有批处理失败的消息都会返回队列中,因此您的函数代码必须能够多次处理同一条消息,而不会产生副作用。

扩展和处理

对于标准队列,Lambda 使用长轮询来轮询一个队列,直到它变为活动状态。当消息可用时,Lambda 最多可读取 5 个批次并将其发送到您的函数。如果仍有消息可用,则 Lambda 增加批量读取的进程数,最多每分钟增加 60 个实例。事件源映射可以同时处理的最大批次数为 1000。

对于 FIFO 队列,Lambda 按照接收消息的顺序向函数发送消息。向 FIFO 队列发送消息时,您可以指定消息组 ID。Amazon SQS 确保同一组中的消息按顺序传递到 Lambda。Lambda 将消息排列到组中,一次仅为一个组发送一批消息。如果函数返回错误,则在对受影响的消息尝试了所有重试之后,Lambda 会从同一个组收到其他消息。

您的函数可以在并发范围内扩展到活动消息组的数量。有关更多信息,请参阅 Amazon 计算博客上的作为事件源的 SQS FIFO

配置队列以便用于 Lambda

创建一个 SQS 队列,用作您的 Lambda 函数的事件源。然后将队列配置为可使您的 Lambda 函数有时间处理每批事件 — 并使 Lambda 在扩展时出现限制错误时能够重试。

为使您的函数有时间处理每批记录,请将源队列的可见性超时至少设置为是您在函数上配置的超时的 6 倍。额外时间可以使 Lambda 在您的函数处理前一批次时,如果您的函数执行受到限制,它可以重试。

如果消息多次处理失败,Amazon SQS 可以将其发送到死信队列。当您的函数返回错误时,Lambda 将其留在队列中。在发生可见性超时之后,Lambda 重新接收消息。要在多次接收之后将消息发送到第二个队列,请在源队列上配置死信队列。

注意

确保在源队列上配置死信队列,而不是在 Lambda 函数上配置。您在函数上配置的死信队列用于函数的异步调用队列,而不是用于事件源队列。

如果您的函数返回错误,或者由于处于最大并发而无法调用,则处理可能会成功,但需要额外尝试。要在将消息发送到死信队列之前给予更好的处理机会,请将源队列重新驱动策略的 maxReceiveCount 设置为至少 5

执行角色权限

Lambda 需要以下权限来管理您的 Amazon SQS 队列中的消息。将这些权限添加到您的函数的执行角色中。

有关更多信息,请参阅 Amazon Lambda 执行角色

将队列配置为事件源

创建事件源映射以指示 Lambda 将队列中的项目发送到 Lambda 函数。您可以创建多个事件源映射,以使用单个函数处理来自多个队列的项目。当 Lambda 调用目标函数时,事件可以包含多个项目(多达可配置的最大批处理大小)。

要在 Lambda 控制台中将您的函数配置为从 Amazon SQS 读取,请创建 SQS 触发器。

创建触发器

  1. 打开 Lambda 控制台的“函数”页面

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add trigger (添加触发器)

  4. 选择触发器类型。

  5. 配置所需选项,然后选择 Add (添加)

Lambda 支持 Amazon SQS 事件源的以下选项。

事件源选项

  • SQS 队列 – 要从其读取记录的 Amazon SQS 队列。

  • 批处理大小 – 每个批次中要发送到函数的记录数。对于标准队列,这最多可以是 10000 条记录。对于 FIFO 队列,最大值为 10。对于 10 以上的批处理大小,还必须将 MaximumBatchingWindowInSeconds 参数设置为至少 1 秒。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的有效负载限制 (6MB)。

    Lambda 和 Amazon SQS 会为每个记录生成元数据。会将这一额外的元数据计入总负载大小,这可能会导致批次中发送的记录总数低于配置的批处理大小。Amazon SQS 发送的元数据字段的长度是可变的。有关 Amazon SQS 元数据字段的更多信息,请参阅 Amazon Simple Queue ServiceAPI 参考 中的 ReceiveMessage 文档。

  • 批处理窗口 – 指定在调用函数之前收集记录的最长时间(以秒为单位)。仅适用于标准队列。

    如果您使用的批处理窗口大于 0 秒,则必须考虑队列可见性超时中增加的处理时间。将队列可见性超时设置为函数超时的 6 倍加上 MaximumBatchingWindowInSeconds 的值。这使 Lambda 函数有时间处理每个批次的事件,并在出现限制错误时重试。

    Lambda 一次最多处理 5 个批次。这意味着任何时候最多有 5 个工作线程可以批处理和并行处理消息。每个工作线程将为其当前批消息显示不同的 Lambda 调用。

  • 已启用 – 设置为 true 可启用事件源映射。设置为 false 可停止处理记录。

注意

Amazon SQS 为一定量的请求提供了永久性的免费套餐。超出免费套餐后,Amazon SQS 按每百万个请求收取费用。当您的事件源映射处于活动状态时,Lambda 会向队列发出请求以获取项目。有关定价的详细信息,请参阅 Amazon Simple Queue Service 定价

之后,要管理事件源配置,请在设计器中选择触发器。

配置您的函数超时,以允许有足够的时间来处理整个批次的项目。如果项目处理需要很长时间,请选择一个较小的批处理大小。大批量处理可以提高非常快速或拥有大量开销的工作负载的效率。但是,如果您的函数返回错误,则批处理中的所有项目都将返回到队列中。如果您在函数上配置预留并发,请将最小并发执行数设置为 5,以降低在 Lambda 调用函数时出现限制错误的几率。要消除限制错误的可能性,请将预留并发 (reserved concurrency) 值设置为 1000,这是 Amazon SQS 事件源的最大并发执行次数。

事件源映射 API

要使用 Amazon CLIAmazon 开发工具包管理事件源,您可以使用以下 API 操作:

以下示例使用 Amazon CLI 将名为 my-function 的函数映射到由 Amazon 资源名称 (ARN) 指定的 Amazon SQS 队列,批处理大小为 5,批处理窗口为 60 秒。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws-cn:sqs:us-east-2:123456789012:my-queue

您应看到以下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws-cn:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws-cn:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }