使用 Amazon 启用客户端缓冲和请求批处理 SQS - Amazon Simple Queue Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon 启用客户端缓冲和请求批处理 SQS

Amazon SDK for Java包括访问亚马逊AmazonSQSBufferedAsyncClientSQS的内容。此客户端允许使用客户端缓冲进行简单的请求批处理。首先对来自客户端的呼叫进行缓冲,然后作为批量请求发送到 Amazon SQS。

客户端缓冲允许缓冲多达 10 个请求并作为批量请求发送,从而降低您使用 Amazon 的成本SQS并减少发送的请求数量。 AmazonSQSBufferedAsyncClient缓冲同步和异步调用。批量请求和对长轮询的支持还有助于提高吞吐量。有关更多信息,请参阅 使用 Amazon 的水平扩展和操作批处理来提高吞吐量 SQS

由于 AmazonSQSBufferedAsyncClient 实施与 AmazonSQSAsyncClient 相同的接口,因此从 AmazonSQSAsyncClient 迁移到 AmazonSQSBufferedAsyncClient 通常只需要对现有代码进行少量的更改。

注意

Amazon SQS 缓冲异步客户端目前不支持FIFO队列。

使用 A mazonSQSBuffered AsyncClient

在开始之前,请完成 设置 Amazon SQS 中的步骤。

Amazon SDK适用于 Java 1.x

对 Amazon SDK于 Java 1.x,您可以AmazonSQSBufferedAsyncClient根据以下示例创建一个新的:

// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

创建新请求后AmazonSQSBufferedAsyncClient,您可以使用它向 Amazon 发送多个请求SQS(就像您一样AmazonSQSAsyncClient),例如:

final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue"); final CreateQueueResult res = bufferedSqs.createQueue(createRequest); final SendMessageRequest request = new SendMessageRequest(); final String body = "Your message text" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request); final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(1) .withQueueUrl(queueUrl); final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

配置 A mazonSQSBuffered AsyncClient

AmazonSQSBufferedAsyncClient 预配置了适用于大多数使用案例的设置。您可以进一步配置 AmazonSQSBufferedAsyncClient,例如:

  1. 使用必需的配置参数来创建 QueueBufferConfig 类的实例。

  2. 将该实例提供给 AmazonSQSBufferedAsyncClient 构造函数。

// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); final QueueBufferConfig config = new QueueBufferConfig() .withMaxInflightReceiveBatches(5) .withMaxDoneReceiveBatches(15); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
QueueBufferConfig 配置参数
参数 默认值 描述
longPoll true

如果 longPoll 设置为 trueAmazonSQSBufferedAsyncClient 会在使用消息时尝试使用长轮询。

longPollWaitTimeoutSeconds 20 秒

在返回空接收结果前,ReceiveMessage 调用在服务器上阻塞以等待消息显示在队列中的最长时间(以秒为单位)。

注意

如果禁用长轮询,则此设置不起作用。

maxBatchOpenMs 200 毫秒

传出调用等待其他要一起对同类型的消息进行批处理的调用的最长时间(以毫秒为单位)。

设置的时间越长,则执行等量工作所需的批处理次数就越少(但是,批处理中的首次调用必须等待更长的时间)。

如果将此参数设置为 0,则提交的请求不会等待其他请求,从而有效地禁用批处理。

maxBatchSize 每批 10 个请求

在一个请求中一起进行批处理的消息的最大数量。该设置越大,则执行等量请求所需的批处理就越少。

注意

亚马逊允许的最大值是每批 10 个请求SQS。

maxBatchSizeBytes 256 KiB

客户端尝试发送到 Amazon 的消息批次的最大大小(以字节为单位)SQS。

注意

256 KiB 是亚马逊允许的最大值。SQS

maxDoneReceiveBatches 10 个批处理

AmazonSQSBufferedAsyncClient 在客户端预取和存储的接收批处理的最大数量。

设置越高,无需致电 Amazon 即可满足的接收请求越多SQS(但是,预取的消息越多,它们在缓冲区中停留的时间就越长,从而导致其自身的可见性超时过期)。

注意

0 表示所有消息预取操作将被禁用,消息只能按需使用。

maxInflightOutboundBatches 5 个批处理

可以同时处理的最大活跃出站批处理数量。

设置越高,发送出站批次的速度就越快(受限于CPU或带宽等配额),消耗的线程也越多AmazonSQSBufferedAsyncClient

maxInflightReceiveBatches 10 个批处理

可以同时处理的最大活跃接收批处理数量。

设置越高,可以接收的消息越多(受限于配额,例如CPU或带宽),消耗的线程也越多AmazonSQSBufferedAsyncClient

注意

0 表示所有消息预取操作将被禁用,消息只能按需使用。

visibilityTimeoutSeconds –1

如果此参数设置为正值(非零值),则此处设置的可见性超时将覆盖在使用的消息所在的队列上设置的可见性超时。

注意

-1 表示为队列选择默认设置。

不能将可见性超时设置为 0

Amazon SDK适用于 Java 2.x

对 Amazon SDK于 Java 2.x,您可以SqsAsyncBatchManager根据以下示例创建一个新的:

// Create the basic Sqs Async Client SqsAsyncClient sqs = SqsAsyncClient.builder() .region(Region.US_EAST_1) .build(); // Create the batch manager SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();

创建新请求后SqsAsyncBatchManager,您可以使用它向 Amazon 发送多个请求SQS(就像您一样SqsAsyncClient),例如:

final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID(); final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build(); final String queueUrl = sqs.createQueue(request).join().queueUrl(); System.out.println("Queue created: " + queueUrl); // Send messages CompletableFuture<SendMessageResponse> sendMessageFuture; for (int i = 0; i < 10; i++) { final int index = i; sendMessageFuture = sqsAsyncBatchManager.sendMessage( r -> r.messageBody("Message " + index).queueUrl(queueUrl)); SendMessageResponse response= sendMessageFuture.join(); System.out.println("Message " + response.messageId() + " sent!"); } // Receive messages with customized configurations CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage( r -> r.queueUrl(queueUrl) .waitTimeSeconds(10) .visibilityTimeout(20) .maxNumberOfMessages(10) ); System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total."); // Delete messages DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder().queueUrl(queueUrl).build(); int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode(); System.out.println("Queue is deleted, with statusCode " + code);

配置 SqsAsyncBatchManager

SqsAsyncBatchManager 预配置了适用于大多数使用案例的设置。您可以进一步配置 SqsAsyncBatchManager,例如:

通过SqsAsyncBatchManager.Builder以下方式创建自定义配置:

SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() .client(sqs) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .maxBatchSize(10) .sendRequestFrequency(Duration.ofMillis(200)) .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
BatchOverrideConfiguration 参数
参数 默认值 描述
maxBatchSize

每批 10 个请求

在一个请求中一起进行批处理的消息的最大数量。该设置越大,则执行等量请求所需的批处理就越少。

注意

Amazon SQS 允许的最大值为每批 10 个请求。

sendRequestFrequency

200 毫秒

传出调用等待其他要一起对同类型的消息进行批处理的调用的最长时间(以毫秒为单位)。

设置的时间越长,则执行等量工作所需的批处理次数就越少(但是,批处理中的首次调用必须等待更长的时间)。

如果将此参数设置为 0,则提交的请求不会等待其他请求,从而有效地禁用批处理。

receiveMessageVisibilityTimeout

–1

如果此参数设置为正值(非零值),则此处设置的可见性超时将覆盖在使用的消息所在的队列上设置的可见性超时。

注意

1 表示为队列选择默认设置。不能将可见性超时设置为 0

receiveMessageMinWaitDuration

50 毫秒

receiveMessage呼叫等待获取可用消息的最短时间(以毫秒为单位)。设置越高,执行相同数量的请求所需的批次就越少。