Amazon Simple Queue Service
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

通过水平扩展和批处理提高吞吐量

Amazon SQS 队列可以传递极高的吞吐量 (每秒数千条消息)。要实现此吞吐量,您必须水平扩展消息创建器和使用器 (添加更多创建器和使用器)。

除了水平扩展外,批处理也可以实现同样的吞吐量,而且线程、连接和请求的数量比单独的消息请求所需的数量少。您可以使用 Amazon SQS API 批处理操作一次性发送、接收或删除多达 10 条消息。此外,由于 Amazon SQS 按请求 (而不是消息) 收费,因此,批处理还可以大幅降低费用。

水平扩展

由于您通过 HTTP 请求-响应协议访问 Amazon SQS,因此,请求延迟 (启动请求和接收响应之间的时间间隔) 会限制您可以通过单一连接从单一线程达到的吞吐量。例如,如果从基于 Amazon Elastic Compute Cloud (Amazon EC2) 的客户端到同一区域内的 Amazon SQS 的延迟时间平均为大约 20ms,则通过单一连接从单一线程达到的最大吞吐量平均为每秒 50 次操作。

水平扩展意味着增加消息创建者 (提出 SendMessage 请求) 和使用者 (提出 ReceiveMessageDeleteMessage 请求) 的数量,以提高整个队列的吞吐量。您可以通过增加客户端上的线程数量以及/或者添加客户端来进行水平扩展。添加更多客户端后,基本上应该能实现队列吞吐量的线性增长。例如,如果将客户端数量加倍,您将获得两倍的吞吐量。

重要

进行水平扩展时,您需要确保您使用的 Amazon SQS 队列具有足够的连接或线程,以支持将要发送请求和接收响应的并发消息创建者和使用者的数量。例如,默认情况下,AWS SDK for Java 的 AmazonSQSClient 类的实例最多会维持 50 个与 Amazon SQS 的连接。要创建其他并发创建者和使用者,您需要调整该限制。例如,在AWS SDK for Java 中,您可以使用下面一行代码来调整 AmazonSQSClient 对象上允许的最大创建者和使用者线程数量:

Copy
AmazonSQS sqsClient = new AmazonSQSClient(credentials, new ClientConfiguration().withMaxConnections(producerCount + consumerCount));

此外,对于适用于 Java 的开发工具包 异步客户端 AmazonSQSAsyncClient,您还需要确保有足够的线程可用。有关更多信息,请参阅您使用的软件开发工具包库的文档。

批处理

Amazon SQS API (SendMessageBatchDeleteMessageBatch) 中的批处理操作可通过一次处理多达 10 条消息来进一步优化吞吐量。由于 ReceiveMessage 一次可以处理十条消息,因此,API 中没有 ReceiveMessageBatch 操作。

批处理的基本理念是:在与服务的每次往返操作中执行更多工作 (例如,使用单一 SendMessageBatch 请求发送多条消息),以及在批处理请求的多条消息中分配批处理操作的延迟时间,而不是接受单一消息 (例如,SendMessage 请求) 的整个延迟时间。由于每次往返操作都会执行更多工作,因此,批处理请求可以更高效地使用线程和连接,从而提高吞吐量。Amazon SQS 按请求收费,因此,如果由更少的请求来处理等量的消息,则费用可以大幅降低。此外,更少的线程和连接可以降低客户端资源使用率,并且可以通过使用更小或更少的主机执行相同的工作来降低客户端费用。

不过,批处理确实会让应用程序变得有点复杂。例如,应用程序必须先积累消息,然后才能发送消息,并且有时候必须花费较长的时间来等待响应,但是批处理在以下情况下可能很有效:

  • 您的应用程序正在短时间内生成大量消息,因此,延迟时间决不会很长。

  • 消息使用者从队列中自行获取消息,这与典型的消息创建者相反,后者需要发送消息来响应它们无法控制的事件。

重要

即使批处理中单独的消息失败了,批处理请求 (SendMessageBatchDeleteMessageBatch) 也可能会成功。在提出批处理请求后,您应始终检查各条消息是否失败了,并在必要时重试。

使用 Amazon SQS 缓冲的异步客户端,您可以在不更改创建器和使用器的情况下利用批处理。

示例

本节所示的示例会实施简单的创建者-使用者模式。可供免费下载的完整示例位于:https://s3.amazonaws.com/cloudformation-examples/sqs-producer-consumer-sample.tar。本节后面的部分描述了各个模板部署的资源。

/tmp/sqs-producer-consumer-sample/src 中的配置实例提供了示例代码。配置运行的命令行位于 /tmp/sqs-producer-consumer-sample/command.log 中。

主线程会生成大量创建者和使用者线程,这些线程会在指定时间内处理 1KB 消息。示例包括提出单一操作请求的创建者和使用者,以及提出批处理请求的其他创建者和使用者。

在程序中,每个创建者线程都会发送消息,直到主线程停止创建者线程为止。producedCount 对象会跟踪所有创建者线程生成的消息数量。处理错误的操作很简单:如果存在错误,则程序会退出 run() 方法。默认情况下,对于因暂时性错误而失败的请求,AmazonSQSClient 会重试三次,因此,此类错误很少会出现。必要时,可以配置重试计数,以减少系统引发的异常数量。消息创建者上的 run() 方法实施如下:

Copy
try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { // By default AmazonSQSClient retries calls 3 times before failing, // so when this rare condition occurs, simply stop. log.error("Producer: " + e.getMessage()); System.exit(1); }

批处理创建者几乎相同。一个显著的区别是需要重试失败的各个批处理条目:

Copy
SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); }

使用者的 run() 方法如下:

Copy
while (!stop.get()) { result = sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } }

每个使用者线程都会接收和删除消息,直到主线程停止使用者线程为止。consumedCount 对象会跟踪所有使用者线程使用的消息数量,并且计数会定期记录。批处理使用者与此相似,不同之处是一次最多接收十条消息,并且它使用的是 DeleteMessageBatch,而不是 DeleteMessage

运行示例

您可以使用提供的 AWS CloudFormation 模板以下列三种不同配置运行示例代码:单一主机和单一操作请求、两台主机和单一操作请求、一台主机和批处理请求。

重要

完整示例位于单一 .tar 文件中。本节后面的部分描述了各个模板部署的资源。

/tmp/sqs-producer-consumer-sample/src 中的配置实例提供了示例代码。配置运行的命令行位于 /tmp/sqs-producer-consumer-sample/command.log 中。

示例设置了默认持续时间 (20 分钟),以提供容量指标的三个或四个 5 分钟 CloudWatch 数据点。每次运行的 Amazon EC2 费用将是 m1.large 实例费用。Amazon SQS 费用基于每个示例的 API 调用率而发生变化,并且该调用率应介于大约 38000 次 API 调用/分 (针对批处理示例) 和 380000 次 API 调用/分 (针对 2 台主机的单一 API 示例) 之间。例如,在单一主机上运行一次单一 API 示例应花费大约 1 实例小时的 m1.large (大型标准按需实例,自 2012 年 7 月起为 0.32 USD),而对于持续时间为默认的 20 分钟的 Amazon SQS 操作,则应花费大约 20 分钟 x 190000 次 API 调用/分 x 1 USD/1000000 次 API 调用 = 3.80 USD (自 2012 年 7 月起的定价,请检查当前定价)。

如果您要在除 美国东部(弗吉尼亚北部) 区域以外的区域部署 AWS CloudFormation 堆栈,请在 AWS CloudFormation 控制台的“Region”框中选择一个区域。

运行示例

  1. 在下面选择与您要启动的堆栈相对应的链接:

  2. 如果系统提示您,请登录 AWS 管理控制台。

  3. Create Stack 向导的 Select Template 页面上,选择 Continue

  4. Specify Parameters 页面上,指定程序应运行的时长以及您是否希望在运行完成后自动终止 Amazon EC2 实例,并且提供 Amazon EC2 密钥对,以便您可以访问运行该示例的实例。示例如下:

  5. 选中 I acknowledge that this template may create IAM resources 复选框。所有模板都会创建 AWS Identity and Access Management (IAM) 用户,以便创建者-使用者程序可以访问队列。

  6. 根据需要完成所有设置后,选择 Continue

  7. Review 页面上,检查设置。如果这些设置与您期望的设置相符,请选择 Continue。否则,请选择 Back 并进行必要的更改。

  8. 在此向导的最后一个页面上,选择 Close。堆栈部署可能需要花费几分钟时间。

要了解堆栈部署的进度,请在 AWS CloudFormation 控制台中选择示例堆栈。在下窗格中,选择 Events 选项卡。创建堆栈后,不到 5 分钟时间,示例就会开始运行。示例开始运行后,您可以在 Amazon SQS 控制台中查看队列。

要监控队列活动,您可以执行以下操作:

  • 访问客户端实例,然后打开其针对迄今为止生成和使用的消息计数的输出日志文件 (/tmp/sqs-producer-consumer-sample/output.log)。此计数每秒更新一次。

  • Amazon SQS 控制台中,观察 Message AvailableMessages in Flight 数字的变化。

此外,在启动队列后最多延迟 15 分钟,您就可以在 CloudWatch 中监控队列,如本主题后面所述。

虽然模板和示例有安全措施来防止过度使用资源,但是在运行完示例后,您最好删除您的 AWS CloudFormation 堆栈。要执行此操作,请在 Amazon SQS 控制台中选择要删除的堆栈,然后选择 Delete Stack。删除所有资源后,CloudWatch 指标将全部下降为零。

监控示例运行的容量指标

Amazon SQS 会针对发送、接收和删除的消息自动生成容量指标。您可以通过 CloudWatch 控制台访问这些指标和其他指标。队列启动后,这些指标最多可能需要花费 15 分钟才可用。要管理搜索结果集,请选择 Search,然后选中与您要监控的队列和指标相对应的复选框。

以下是针对这三个示例连续运行的 NumberOfMessageSent 指标。您的结果可能会有所不同,但这些结果在本质上应该是相似的:

  • NumberOfMessagesReceivedNumberOfMessagesDeleted 指标显示了相同的模式,但我们在此图中省略了这两个指标,以减少凌乱。

  • 第一个示例 (单一 m1.large 上的单一操作 API) 在 5 分钟内传递了大约 210000 条消息 (每秒传递了大约 700 条消息),并且接收和删除操作的吞吐量相同。

  • 第二个示例 (两个 m1.large 实例上的单一操作 API) 传递了大约相当于该吞吐量两倍的吞吐量:在 5 分钟内传递了大约 440000 条消息 (每秒传递了大约 1450 条消息),并且接收和删除操作的吞吐量相同。

  • 最后一个示例 (单一 m1.large 上的批处理 API) 在 5 分钟内传递了 800000 多条消息 (每秒传递了大约 2500 条消息),并且接收和删除的消息的吞吐量相同。如果批处理大小为 10,则系统会使用少得多的请求 (并因此而以更低的费用) 来处理这些消息。