本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon 启用客户端缓冲和请求批处理 SQS
Amazon SDK for JavaAmazonSQSBufferedAsyncClient
SQS的内容。此客户端允许使用客户端缓冲进行简单的请求批处理。首先对来自客户端的呼叫进行缓冲,然后作为批量请求发送到 Amazon SQS。
客户端缓冲允许缓冲多达 10 个请求并作为批量请求发送,从而降低您使用 Amazon 的成本SQS并减少发送的请求数量。 AmazonSQSBufferedAsyncClient
缓冲同步和异步调用。批量请求和对长轮询的支持还有助于提高吞吐量。有关更多信息,请参阅 使用 Amazon 的水平扩展和操作批处理来提高吞吐量 SQS。
由于 AmazonSQSBufferedAsyncClient
实施与 AmazonSQSAsyncClient
相同的接口,因此从 AmazonSQSAsyncClient
迁移到 AmazonSQSBufferedAsyncClient
通常只需要对现有代码进行少量的更改。
注意
Amazon SQS 缓冲异步客户端目前不支持FIFO队列。
使用 A mazonSQSBuffered AsyncClient
在开始之前,请完成 设置 Amazon SQS 中的步骤。
Amazon SDK适用于 Java 1.x
对 Amazon SDK于 Java 1.x,您可以AmazonSQSBufferedAsyncClient
根据以下示例创建一个新的:
// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
创建新请求后AmazonSQSBufferedAsyncClient
,您可以使用它向 Amazon 发送多个请求SQS(就像您一样AmazonSQSAsyncClient
),例如:
final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue"); final CreateQueueResult res = bufferedSqs.createQueue(createRequest); final SendMessageRequest request = new SendMessageRequest(); final String body = "Your message text" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request); final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(1) .withQueueUrl(queueUrl); final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
配置 A mazonSQSBuffered AsyncClient
AmazonSQSBufferedAsyncClient
预配置了适用于大多数使用案例的设置。您可以进一步配置 AmazonSQSBufferedAsyncClient
,例如:
-
使用必需的配置参数来创建
QueueBufferConfig
类的实例。 -
将该实例提供给
AmazonSQSBufferedAsyncClient
构造函数。
// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); final QueueBufferConfig config = new QueueBufferConfig() .withMaxInflightReceiveBatches(5) .withMaxDoneReceiveBatches(15); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
参数 | 默认值 | 描述 |
---|---|---|
longPoll |
true |
如果 |
longPollWaitTimeoutSeconds |
20 秒 |
在返回空接收结果前, 注意如果禁用长轮询,则此设置不起作用。 |
maxBatchOpenMs |
200 毫秒 |
传出调用等待其他要一起对同类型的消息进行批处理的调用的最长时间(以毫秒为单位)。 设置的时间越长,则执行等量工作所需的批处理次数就越少(但是,批处理中的首次调用必须等待更长的时间)。 如果将此参数设置为 |
maxBatchSize |
每批 10 个请求 |
在一个请求中一起进行批处理的消息的最大数量。该设置越大,则执行等量请求所需的批处理就越少。 注意亚马逊允许的最大值是每批 10 个请求SQS。 |
maxBatchSizeBytes |
256 KiB |
客户端尝试发送到 Amazon 的消息批次的最大大小(以字节为单位)SQS。 注意256 KiB 是亚马逊允许的最大值。SQS |
maxDoneReceiveBatches |
10 个批处理 |
设置越高,无需致电 Amazon 即可满足的接收请求越多SQS(但是,预取的消息越多,它们在缓冲区中停留的时间就越长,从而导致其自身的可见性超时过期)。 注意
|
maxInflightOutboundBatches |
5 个批处理 |
可以同时处理的最大活跃出站批处理数量。 设置越高,发送出站批次的速度就越快(受限于CPU或带宽等配额),消耗的线程也越多 |
maxInflightReceiveBatches |
10 个批处理 |
可以同时处理的最大活跃接收批处理数量。 设置越高,可以接收的消息越多(受限于配额,例如CPU或带宽),消耗的线程也越多 注意
|
visibilityTimeoutSeconds |
–1 |
如果此参数设置为正值(非零值),则此处设置的可见性超时将覆盖在使用的消息所在的队列上设置的可见性超时。 注意
不能将可见性超时设置为 |
Amazon SDK适用于 Java 2.x
对 Amazon SDK于 Java 2.x,您可以SqsAsyncBatchManager
根据以下示例创建一个新的:
// Create the basic Sqs Async Client SqsAsyncClient sqs = SqsAsyncClient.builder() .region(Region.US_EAST_1) .build(); // Create the batch manager SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();
创建新请求后SqsAsyncBatchManager
,您可以使用它向 Amazon 发送多个请求SQS(就像您一样SqsAsyncClient
),例如:
final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID(); final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build(); final String queueUrl = sqs.createQueue(request).join().queueUrl(); System.out.println("Queue created: " + queueUrl); // Send messages CompletableFuture<SendMessageResponse> sendMessageFuture; for (int i = 0; i < 10; i++) { final int index = i; sendMessageFuture = sqsAsyncBatchManager.sendMessage( r -> r.messageBody("Message " + index).queueUrl(queueUrl)); SendMessageResponse response= sendMessageFuture.join(); System.out.println("Message " + response.messageId() + " sent!"); } // Receive messages with customized configurations CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage( r -> r.queueUrl(queueUrl) .waitTimeSeconds(10) .visibilityTimeout(20) .maxNumberOfMessages(10) ); System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total."); // Delete messages DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder().queueUrl(queueUrl).build(); int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode(); System.out.println("Queue is deleted, with statusCode " + code);
配置 SqsAsyncBatchManager
SqsAsyncBatchManager
预配置了适用于大多数使用案例的设置。您可以进一步配置 SqsAsyncBatchManager
,例如:
通过SqsAsyncBatchManager.Builder
以下方式创建自定义配置:
SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() .client(sqs) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .maxBatchSize(10) .sendRequestFrequency(Duration.ofMillis(200)) .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
参数 | 默认值 | 描述 |
---|---|---|
maxBatchSize |
每批 10 个请求 |
在一个请求中一起进行批处理的消息的最大数量。该设置越大,则执行等量请求所需的批处理就越少。 注意Amazon SQS 允许的最大值为每批 10 个请求。 |
sendRequestFrequency |
200 毫秒 |
传出调用等待其他要一起对同类型的消息进行批处理的调用的最长时间(以毫秒为单位)。 设置的时间越长,则执行等量工作所需的批处理次数就越少(但是,批处理中的首次调用必须等待更长的时间)。 如果将此参数设置为 |
receiveMessageVisibilityTimeout |
–1 |
如果此参数设置为正值(非零值),则此处设置的可见性超时将覆盖在使用的消息所在的队列上设置的可见性超时。 注意 |
receiveMessageMinWaitDuration |
50 毫秒 |
|