

# 将 Lambda 与 Amazon SQS 结合使用
<a name="with-sqs"></a>

**注意**  
如果想要将数据发送到 Lambda 函数以外的目标，或要在发送数据之前丰富数据，请参阅 [Amazon EventBridge Pipes](https://docs.amazonaws.cn/eventbridge/latest/userguide/eb-pipes.html)（Amazon EventBridge 管道）。

您可以使用 Lambda 函数来处理某个 Amazon Simple Queue Service（Amazon SQS）队列中的消息。Lambda 支持[事件源映射](invocation-eventsourcemapping.md)的[标准队列](https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html)和[先进先出（FIFO）队列](https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html)。您也可以使用预置模式为您的 Amazon SQS 事件源映射分配专用的轮询资源。Lambda 函数和 Amazon SQS 队列必须位于同一 Amazon Web Services 区域，即便二者可能位于[不同的 Amazon Web Services 账户](with-sqs-cross-account-example.md)。

在处理 Amazon SQS 消息时，您需要实施部分批处理响应逻辑，以防止在批处理中的某些消息失败时重试成功处理的消息。Powertools for Amazon Lambda 中的[批处理器实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)通过自动处理部分批处理响应逻辑简化了此实现，减少了开发时间并提高了可靠性。

**Topics**
+ [了解 Amazon SQS 事件源映射的轮询和批处理行为](#sqs-polling-behavior)
+ [对 Amazon SQS 事件源映射使用预置模式](#sqs-provisioned-mode)
+ [为 Amazon SQS 事件源映射配置预置模式](#sqs-configuring-provisioned-mode)
+ [示例标准队列消息事件](#example-standard-queue-message-event)
+ [示例 FIFO 队列消息事件](#sample-fifo-queues-message-event)
+ [创建和配置 Amazon SQS 事件源映射](services-sqs-configure.md)
+ [为 SQS 事件源映射配置扩展行为](services-sqs-scaling.md)
+ [在 Lambda 中处理 SQS 事件源错误](services-sqs-errorhandling.md)
+ [Amazon SQS 事件源映射的 Lambda 参数](services-sqs-parameters.md)
+ [对 Amazon SQS 事件源使用事件筛选](with-sqs-filtering.md)
+ [教程：将 Lambda 与 Amazon SQS 结合使用](with-sqs-example.md)
+ [教程：将跨账户 Amazon SQS 队列用作事件源](with-sqs-cross-account-example.md)

## 了解 Amazon SQS 事件源映射的轮询和批处理行为
<a name="sqs-polling-behavior"></a>

使用 Amazon SQS 事件源映射，Lambda 会轮询队列并通过事件[同步](invocation-sync.md)调用您的函数。每个事件可以包含来自队列的一批多条消息。Lambda 每次接收一批这些事件，并为每个批次调用一次您的函数。当您的函数成功处理一个批次后，Lambda 就会将其消息从队列中删除。

当 Lambda 接收某个批次时，消息将保留在队列中，但会根据队列的[可见性超时](https://docs.amazonaws.cn/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)长度隐藏。如果您的函数成功处理了批次中的所有消息，Lambda 会将消息从队列中删除。预设情况下，如果您的函数在处理某个批处理时遇到错误，则该批处理中的所有消息都会在可见性超时过期后在队列中重新可见。因此，函数代码必须能够多次处理同一条消息，而不会产生意外的副作用。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 Amazon 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

要防止 Lambda 多次处理消息，您可以将事件源映射配置为在函数响应中包含[批次项目失败](services-sqs-errorhandling.md#services-sqs-batchfailurereporting)，也可以在 Lambda 函数成功处理消息后使用 [DeleteMessage](https://docs.amazonaws.cn/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html) API 将消息从队列中删除。

有关 Lambda 支持的 SQS 事件源映射配置参数的更多信息，请参阅 [创建 SQS 事件源映射](services-sqs-configure.md#events-sqs-eventsource)。

## 对 Amazon SQS 事件源映射使用预置模式
<a name="sqs-provisioned-mode"></a>

对于需要微调事件源映射吞吐量的工作负载，您可以使用预调配模式。在预调配模式下，您可以为预调配事件轮询器数量定义最小和最大限制。这些预调配事件轮询器专用于事件源映射，并且可以通过响应式自动扩缩处理意外的消息激增。配置了预置模式的 Amazon SQS 事件源映射的扩展速度比默认的 Amazon SQS 事件源映射功能快 3 倍（每分钟最多 1000 次并发调用），并且支持高 16 倍的并发性（最多 20000 次并发调用）。我们建议，对于具有严格性能要求的 Amazon SQS 事件驱动型工作负载（例如处理市场数据源的金融服务公司、提供实时个性化推荐的电子商务平台以及管理实时玩家互动的游戏公司等），您应采用预置模式。使用预调配模式会产生额外成本。有关详细定价，请参阅 [Amazon Lambda 定价](https://www.amazonaws.cn/lambda/pricing/)。

每个事件轮询器在预置模式下能处理最高 1 MB/s 的吞吐量、最多 10 次并发调用，或者每秒最多进行 10 次 Amazon SQS 轮询 API 调用。最小事件轮询器数量（MinimumPollers）的可接受值范围介于 2 到 200 之间（默认为 2）。最大事件轮询器数量（MaximumPollers）的可接受值范围介于 2 到 2000 之间（默认为 200）。MaximumPollers 必须大于或等于 MinimumPollers。

### 确定所需的事件轮询器
<a name="sqs-determining-event-pollers"></a>

要估算在使用 SQS ESM 的预置模式下确保最佳消息处理性能所需的事件轮询器数量，请为您的应用程序收集以下指标：每秒需要低延迟处理的峰值 SQS 事件数量、平均 SQS 事件有效载荷大小、平均 Lambda 函数执行持续时间以及配置的批处理大小。

首先，您可以使用以下公式来估算事件轮询器为您的工作负载支持的每秒 SQS 事件数量（EPS）：

```
EPS per event poller = 
        minimum(
            ceiling(1024 / average event size in KB),
            ceiling(10 / average function duration in seconds) * batch size, 
            min(100, 10 * batch size)
                )
```

然后，您可以使用以下公式计算所需的最小轮询器数量。此计算可确保您预置足够的容量来满足峰值流量要求。

```
Required event pollers = (Peak number of events per second in Queue) / EPS per event poller
```

考虑这样一个工作负载，其默认批处理大小为 10，平均事件大小为 3KB，平均函数执行时间为 100 ms，并且需要每秒处理 1000 个事件。在这种情况下，每个事件轮询器每秒将支持大约 100 个事件（EPS）。因此，您应将最小轮询器数量设置为 10，以充分满足您的峰值流量需求。如果您的工作负载具有相同的特性，但函数平均持续时间为 1 秒，则每个轮询器仅能支持 10 个 EPS，这就要求您将最小轮询器数量配置为 100，以在低延迟的情况下每秒支持 1000 个事件。

我们建议使用 10 或更高的默认批处理大小，以最大限度地提高预置模式事件轮询器的效率。更大的批次大小使得每个轮询器在每次调用时能够处理更多的事件，从而提高了吞吐量和成本效益。在规划事件轮询器容量时，请考虑潜在的流量峰值，并考虑将 minimumPollers 值设置为略高于计算得出的最小值，以提供缓冲区。此外，持续监控您的工作负载特征的变化情况，因为消息大小、函数持续时间或流量模式的改变可能会需要对您的事件轮询器配置进行调整，以保持最佳性能和成本效益。为了进行精确的容量规划，我们建议您测试您的特定工作负载，以确定每个事件轮询器可以驱动的实际 EPS。

## 为 Amazon SQS 事件源映射配置预置模式
<a name="sqs-configuring-provisioned-mode"></a>

您可以使用控制台或 Lambda API 为 Amazon SQS 事件源映射配置预置模式。

**为现有的 Amazon SQS 事件源映射配置预置模式（控制台）**

1. 打开 Lamba 控制台的[函数](https://console.amazonaws.cn/lambda/home#/functions)页面。

1. 选择具有要为其配置预置模式的 Amazon SQS 事件源映射的函数。

1. 选择**配置**，然后选择**触发器**。

1. 选择要为其配置预置模式的 Amazon SQS 事件源映射，然后选择**编辑**。

1. 在**事件源映射配置**下，选择**配置预调配模式**。
   + 对于**最少事件轮询器**，输入介于 2 到 200 之间的值。如果未指定值，则 Lambda 将选择默认值 2。
   + 对于**最大事件轮询器**，输入介于 2 到 2000 之间的值。此值必须大于或等于**最少事件轮询器**的值。如果未指定值，则 Lambda 将选择默认值 200。

1. 选择**保存**。

您可以使用 `EventSourceMappingConfiguration` 中的 `ProvisionedPollerConfig` 对象以编程方式配置预置模式。例如，以下 `UpdateEventSourceMapping` CLI 命令将 `MinimumPollers` 值配置为 5，将 `MaximumPollers` 值配置为 100。

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'
```

配置预调配模式后，您可以通过监控 `ProvisionedPollers` 指标来观测事件轮询器对您的工作负载的使用情况。有关更多信息，请参阅事件源映射指标。

要禁用预置模式并返回默认（按需）模式，您可以使用以下 `UpdateEventSourceMapping` CLI 命令：

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --provisioned-poller-config '{}'
```

**注意**  
预置模式不能与最大并发设置结合使用。使用预置模式时，您可以通过事件轮询器的最大数量来控制最大并发性。

有关配置预置模式的更多信息，请参阅[创建和配置 Amazon SQS 事件源映射](services-sqs-configure.md)。

## 示例标准队列消息事件
<a name="example-standard-queue-message-event"></a>

**Example Amazon SQS 消息事件（标准队列）**  

```
{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {
                "myAttribute": {
                    "stringValue": "myValue", 
                    "stringListValues": [], 
                    "binaryListValues": [], 
                    "dataType": "String"
                }
            },
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        },
        {
            "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
            "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082650636",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082650649"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}
```

默认情况下，Lambda 将一次性轮询队列中最多 10 条消息，并将该批次发送到函数。为避免在记录数量较少的情况下调用该函数，您可以配置批次时段，将事件源配置为缓冲最多五分钟的记录。在调用函数之前，Lambda 将继续轮询标准队列中的消息，直到批次时段到期、达到[调用有效负载大小配额](gettingstarted-limits.md)或达到配置的最大批次大小为止。

如果您使用的是批处理窗口，并且 SQS 队列包含的流量非常低，Lambda 可能会等待最多 20 秒钟才能调用您的函数。即使您将批处理窗口设置为低于 20 秒，情况依然如此。

**注意**  
在 Java 中，反序列化 JSON 时可能会遇到空指针错误。这可能要归因于 JSON 对象映射器转换“Records”和“eventSourceARN”大小写的方式。

## 示例 FIFO 队列消息事件
<a name="sample-fifo-queues-message-event"></a>

对于 FIFO 队列，记录包含与重复数据消除和顺序相关的其他属性。

**Example Amazon SQS 消息事件（FIFO 队列）**  

```
{
    "Records": [
        {
            "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5",
            "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1573251510774",
                "SequenceNumber": "18849496460467696128",
                "MessageGroupId": "1",
                "SenderId": "AIDAIO23YVJENQZJOL4VO",
                "MessageDeduplicationId": "1",
                "ApproximateFirstReceiveTimestamp": "1573251510774"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo",
            "awsRegion": "us-east-2"
        }
    ]
}
```