Amazon Simple Queue Service
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

客户端缓冲和请求批处理

AWS SDK for Java (http://www.amazonaws.cn/sdkforjava/) 包含一个缓冲的异步客户端 AmazonSQSBufferedAsyncClient,用于访问 Amazon SQS。通过启用客户端缓冲 (其中,从客户端发出的调用先进行缓冲,然后再作为批处理请求发送到 Amazon SQS),此新客户端允许您更轻松地对请求进行批处理。

客户端缓冲最多允许缓冲 10 个请求并将这些请求作为批处理请求发送,而不是分别发送各个请求。因此,随着发送到 Amazon SQS 的请求数量的减少,使用该服务的费用会降低。AmazonSQSBufferedAsyncClient 会缓冲同步和异步调用。此外,成批请求以及对长轮询的支持还可以帮助提高吞吐量 (每秒传输的消息数量)。有关更多信息,请参阅 Amazon SQS 长轮询通过水平扩展和批处理提高吞吐量

从异步客户端 AmazonSQSAsyncClient 迁移到缓冲的异步客户端 AmazonSQSBufferedAsyncClient 只需要对现有的代码进行最少的更改。这是因为 AmazonSQSBufferedAsyncClient 会实施与 AmazonSQSAsyncClient 相同的接口。

注意

Amazon SQS 缓冲的异步客户端当前不支持 FIFO 队列。

AmazonSQSBufferedAsyncClient 入门

开始使用本节中的示例代码之前,您必须先安装 AWS SDK for Java 并设置 AWS 证书。有关说明,请参阅适用于 Java 的 AWS 软件开发工具包开发人员指南 中的入门

以下代码示例显示了如何基于 AmazonSQSAsyncClient 创建新的 AmazonSQSBufferedAsyncClient。

Copy
// Create the basic Amazon SQS async client AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

创建了新的 AmazonSQSBufferedAsyncClient 后,您可以像调用 AmazonSQSAsyncClient 那样调用该客户端,如以下代码示例所示。

Copy
CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyTestQueue"); CreateQueueResult res = bufferedSqs.createQueue(createRequest); SendMessageRequest request = new SendMessageRequest(); String body = "test message_" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); SendMessageResult sendResult = bufferedSqs.sendMessage(request); ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(1) .withQueueUrl(queueUrl); ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

高级配置

AmazonSQSBufferedAsyncClient 预配置了适用于大多数使用案例的设置。如果您要自己配置它,则可使用 QueueBufferConfig 类进行配置。只需使用所需的设置创建一个 QueueBufferConfig 实例,并将该实例提供给 AmazonSQSBufferedAsyncClient 构造函数,如以下示例代码所示。

Copy
// Create the basic Amazon SQS async client AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); QueueBufferConfig config = new QueueBufferConfig() .withMaxInflightReceiveBatches(5) .withMaxDoneReceiveBatches(15); // Create the buffered client AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);

可用于配置 QueueBufferConfig 的参数如下:

  • longPoll – 如果此参数设置为 true,则 AmazonBufferedAsyncClient 会在取回消息时尝试使用长轮询。默认值为 true

  • longPollWaitTimeoutSeconds – 在返回空接收结果前,接收消息调用在服务器上阻塞以等待消息显示在队列中的最长时间 (以秒为单位)。如果禁用长轮询,则此设置不起作用。此设置的默认值为 20 秒。

  • maxBatchOpenMs – 传出调用等待其他同类调用进行批处理的最长时间 (以毫秒为单位)。该设置越长,则执行等量工作所需的批处理就越少。当然,该设置越长,则批处理中的首次调用必须等待的时间就越长。如果此参数设置为零,则提交的请求不会等待其他请求,从而有效地禁用了批处理。此设置的默认值为 200 毫秒。

  • maxBatchSize – 将在单一批处理请求中一起进行批处理的最大消息数量。该设置越大,则执行等量请求所需的批处理就越少。此设置的默认值为每个批处理 10 个请求,这也是 Amazon SQS 当前允许的最大批处理大小。

  • maxBatchSizeBytes — 客户端尝试向 Amazon SQS 发送的消息批处理的最大大小 (以字节为单位)。默认值为 256KB,这也是 Amazon SQS 当前允许的最大消息和批处理大小。

  • maxDoneReceiveBatches – AmazonBufferedAsyncClient 预取并存储在客户端上的最大接收批处理数量。该设置越大,则不必调用 Amazon SQS 服务器即可满足的接收请求就越多。但是,预取的消息越多,消息在缓冲区中停留的时间就越长,这意味着其可见性超时将过期。如果此参数设置为零,则消息的所有预取操作都会禁用,并且系统仅按需取回消息。默认值为 10 个批处理。

  • maxInflightOutboundBatches – 可以同时处理的最大活跃出站批处理数量。该设置越大,则可以发送出站批处理的速度就越快 (受限于其他限制,例如 CPU 或带宽)。该设置越大,则 AmazonSQSBufferedAsyncClient 占用的线程就越多。默认值为 5 个批处理。

  • maxInflightReceiveBatches – 可以同时处理的最大活跃接收批处理数量。该设置越大,则可以接收的消息就越多 (受限于其他限制,例如 CPU 或带宽、命中)。尽管如此,但是该设置越大,则 AmazonSQSBufferedAsyncClient 占用的线程就越多。如果此参数设置为 0,则消息的所有预取操作都会禁用,并且系统仅按需取回消息。默认值为 10 个批处理。

  • visibilityTimeoutSeconds – 如果此参数设置为非零正值,则此可见性超时会覆盖从中取回消息的队列上设置的可见性超时。零秒的可见性超时不受支持。默认值为 -1,这意味着使用默认队列设置。