自动 Amazon SQS 请求批处理从版本 1 更改为版本 2 - Amazon SDK for Java 2.x
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

自动 Amazon SQS 请求批处理从版本 1 更改为版本 2

本主题详细介绍了版本 1 和版本 2 之间的 Amazon SQS 自动请求批处理的变化。 适用于 Java 的 Amazon SDK

高级别更改

适用于 Java 的 Amazon SDK 1.x 使用单独的类执行客户端缓冲,该AmazonSQSBufferedAsyncClient类需要显式初始化才能进行请求批处理。

使用 Amazon SDK for Java 2.x 简化并增强了缓冲功能。SqsAsyncBatchManager该接口的实现提供了直接与标准SqsAsyncClient集成的自动请求批处理功能。要了解 v2SqsAsyncBatchManager,请参阅本指南中的将 Amazon SQS 的自动请求批处理与 Amazon SDK for Java 2.x主题。

更改 v1 v2

Maven 依赖项

<dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.12.7821</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.31.152</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>
Package 名称 com.amazonaws.services.sqs.buffered software.amazon.awssdk.services.sqs.batchmanager
类名

AmazonSQSBufferedAsyncClient

SqsAsyncBatchManager

1 最新版本2 最新版本

使用自动 SQS 请求批处理

更改 v1 v2
创建批处理管理器
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
使用自定义配置创建批处理管理器
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); QueueBufferConfig queueBufferConfig = new QueueBufferConfig() .withMaxBatchOpenMs(200) .withMaxBatchSize(10) .withMinReceiveWaitTimeMs(1000) .withVisibilityTimeoutSeconds(20) .withReceiveMessageAttributeNames(messageAttributeValues); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, queueBufferConfig);
BatchOverrideConfiguration batchOverrideConfiguration = BatchOverrideConfiguration.builder() .sendRequestFrequency(Duration.ofMillis(200)) .maxBatchSize(10) .receiveMessageMinWaitDuration(Duration.ofMillis(1000)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageSystemAttributeNames(messageSystemAttributeNames) .receiveMessageAttributeNames(messageAttributeValues) .build(); SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder() .overrideConfiguration(batchOverrideConfiguration) .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(8)) .build();
发送消息
Future<SendMessageResult> sendResultFuture = bufferedSqs.sendMessageAsync(new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody(body));
CompletableFuture<SendMessageResponse> sendCompletableFuture = sqsAsyncBatchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(body) .build());
删除消息
Future<DeleteMessageResult> deletResultFuture = bufferedSqs.deleteMessageAsync(new DeleteMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<DeleteMessageResponse> deleteResultCompletableFuture = sqsAsyncBatchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .build());
更改消息的可见性
Future<ChangeMessageVisibilityResult> changeVisibilityResultFuture = bufferedSqs.changeMessageVisibilityAsync (new ChangeMessageVisibilityRequest() .withQueueUrl(queueUrl) .withVisibilityTimeout(20));
CompletableFuture<ChangeMessageVisibilityResponse> changeResponseCompletableFuture = sqsAsyncBatchManager.changeMessageVisibility( ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) .visibilityTimeout(20) .build());
接收消息
ReceiveMessageResult receiveResult = bufferedSqs.receiveMessage( new ReceiveMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<ReceiveMessageResponse> responseCompletableFuture = sqsAsyncBatchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build());

异步返回类型差异

更改 v1 v2
返回类型 Future<ResultType> CompletableFuture<ResponseType>
回调机制 需要AsyncHandler使用单独的 an onSuccess d onError 方法 JDK CompletableFuture APIs 提供的用途,例如whenComplete()、、thenCompose() thenApply()
异常处理 使用AsyncHandler#onError()方法 JDK CompletableFuture APIs 提供的用途,例如exceptionally()handle()、或 whenComplete()
取消 通过以下方式获得基本支持 Future.cancel() 取消母公司CompletableFuture会自动取消链中所有受抚养期货

异步完成处理的差异

更改 v1 v2
响应处理程序实现
Future<ReceiveMessageResult> future = bufferedSqs.receiveMessageAsync( receiveRequest, new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() { @Override public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) { List<Message> messages = result.getMessages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.getMessageId()); System.out.println("Body: " + message.getBody()); } } @Override public void onError(Exception e) { System.err.println("Error receiving messages: " + e.getMessage()); e.printStackTrace(); } } );
CompletableFuture<ReceiveMessageResponse> completableFuture = sqsAsyncBatchManager .receiveMessage(ReceiveMessageRequest.builder() .queueUrl(queueUrl).build()) .whenComplete((receiveMessageResponse, throwable) -> { if (throwable != null) { System.err.println("Error receiving messages: " + throwable.getMessage()); throwable.printStackTrace(); } else { List<Message> messages = receiveMessageResponse.messages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.messageId()); System.out.println("Body: " + message.body()); } } });

关键配置参数

参数 v1 v2
最大批次大小 maxBatchSize(默认每批 10 个请求) maxBatchSize(默认每批 10 个请求)
Batch 等待时间 maxBatchOpenMs(默认为 200 毫秒) sendRequestFrequency(默认为 200 毫秒)
可见性超时 visibilityTimeoutSeconds(队列默认为 -1) receiveMessageVisibilityTimeout(队列默认)
最短等待时间 longPollWaitTimeoutSeconds(如果为真,longPoll则为 20 秒) receiveMessageMinWaitDuration(默认为 50 毫秒)
消息属性 使用以下方法进行设置 ReceiveMessageRequest receiveMessageAttributeNames(默认为无)
系统属性 使用以下方法进行设置 ReceiveMessageRequest receiveMessageSystemAttributeNames(默认为无)
长轮询 longPoll(默认为 true) 不支持以避免在服务器发送消息之前打开连接
长时间轮询的最长等待时间 longPollWaitTimeoutSeconds(默认为 20 秒) 不支持以避免在服务器发送消息之前打开连接
存储在客户端的预取接收批次的最大数量 maxDoneReceiveBatches(10 个批次) 不支持,因为它是在内部处理的
同时处理的活动出站批次的最大数量 maxInflightOutboundBatches(默认 5 个批次) 不支持,因为它是在内部处理的
同时处理的活动接收批次的最大数量 maxInflightReceiveBatches(默认 10 个批次) 不支持,因为它是在内部处理的