Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
自动 Amazon SQS 请求批处理从版本 1 更改为版本 2
本主题详细介绍了版本 1 和版本 2 之间的 Amazon SQS 自动请求批处理的变化。 适用于 Java 的 Amazon SDK
高级别更改
适用于 Java 的 Amazon SDK 1.x 使用单独的类执行客户端缓冲,该AmazonSQSBufferedAsyncClient
类需要显式初始化才能进行请求批处理。
使用 Amazon SDK for Java 2.x 简化并增强了缓冲功能。SqsAsyncBatchManager
该接口的实现提供了直接与标准SqsAsyncClient
集成的自动请求批处理功能。要了解 v2SqsAsyncBatchManager
,请参阅本指南中的将 Amazon SQS 的自动请求批处理与 Amazon SDK for Java 2.x主题。
更改 |
v1 |
v2 |
Maven 依赖项
|
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.12.7821 </version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
</dependency>
</dependencies>
|
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.31.152 </version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
|
Package 名称 |
com.amazonaws.services.sqs.buffered |
software.amazon.awssdk.services.sqs.batchmanager |
类名 |
AmazonSQSBufferedAsyncClient
|
SqsAsyncBatchManager |
1 最新版本。2 最新版本。
使用自动 SQS 请求批处理
更改 |
v1 |
v2 |
创建批处理管理器 |
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
AmazonSQSAsync bufferedSqs = new
AmazonSQSBufferedAsyncClient(sqsAsync);
|
SqsAsyncClient asyncClient = SqsAsyncClient.create();
SqsAsyncBatchManager sqsAsyncBatchManager =
asyncClient.batchManager();
|
使用自定义配置创建批处理管理器 |
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
QueueBufferConfig queueBufferConfig = new QueueBufferConfig()
.withMaxBatchOpenMs(200)
.withMaxBatchSize(10)
.withMinReceiveWaitTimeMs(1000)
.withVisibilityTimeoutSeconds(20)
.withReceiveMessageAttributeNames(messageAttributeValues);
AmazonSQSAsync bufferedSqs =
new AmazonSQSBufferedAsyncClient(sqsAsync, queueBufferConfig);
|
BatchOverrideConfiguration batchOverrideConfiguration =
BatchOverrideConfiguration.builder()
.sendRequestFrequency(Duration.ofMillis(200))
.maxBatchSize(10)
.receiveMessageMinWaitDuration(Duration.ofMillis(1000))
.receiveMessageVisibilityTimeout(Duration.ofSeconds(20))
.receiveMessageSystemAttributeNames(messageSystemAttributeNames)
.receiveMessageAttributeNames(messageAttributeValues)
.build();
SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder()
.overrideConfiguration(batchOverrideConfiguration)
.client(SqsAsyncClient.create())
.scheduledExecutor(Executors.newScheduledThreadPool(8))
.build();
|
发送消息 |
Future<SendMessageResult> sendResultFuture =
bufferedSqs.sendMessageAsync(new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody(body));
|
CompletableFuture<SendMessageResponse> sendCompletableFuture =
sqsAsyncBatchManager.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(body)
.build());
|
删除消息 |
Future<DeleteMessageResult> deletResultFuture =
bufferedSqs.deleteMessageAsync(new DeleteMessageRequest()
.withQueueUrl(queueUrl));
|
CompletableFuture<DeleteMessageResponse> deleteResultCompletableFuture
= sqsAsyncBatchManager.deleteMessage(
DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.build());
|
更改消息的可见性 |
Future<ChangeMessageVisibilityResult> changeVisibilityResultFuture =
bufferedSqs.changeMessageVisibilityAsync
(new ChangeMessageVisibilityRequest()
.withQueueUrl(queueUrl)
.withVisibilityTimeout(20));
|
CompletableFuture<ChangeMessageVisibilityResponse> changeResponseCompletableFuture
= sqsAsyncBatchManager.changeMessageVisibility(
ChangeMessageVisibilityRequest.builder()
.queueUrl(queueUrl)
.visibilityTimeout(20)
.build());
|
接收消息 |
ReceiveMessageResult receiveResult =
bufferedSqs.receiveMessage(
new ReceiveMessageRequest()
.withQueueUrl(queueUrl));
|
CompletableFuture<ReceiveMessageResponse>
responseCompletableFuture = sqsAsyncBatchManager.receiveMessage(
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.build());
|
异步返回类型差异
更改 |
v1 |
v2 |
返回类型 |
Future<ResultType> |
CompletableFuture<ResponseType> |
回调机制 |
需要AsyncHandler 使用单独的 an onSuccess d onError 方法 |
JDK CompletableFuture APIs 提供的用途,例如whenComplete() 、、thenCompose() thenApply() |
异常处理 |
使用AsyncHandler#onError() 方法 |
JDK CompletableFuture APIs 提供的用途,例如exceptionally() handle() 、或 whenComplete() |
取消 |
通过以下方式获得基本支持 Future.cancel() |
取消母公司CompletableFuture 会自动取消链中所有受抚养期货 |
异步完成处理的差异
更改 |
v1 |
v2 |
响应处理程序实现 |
Future<ReceiveMessageResult> future = bufferedSqs.receiveMessageAsync(
receiveRequest,
new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageRequest request,
ReceiveMessageResult result) {
List<Message> messages = result.getMessages();
System.out.println("Received " + messages.size() + " messages");
for (Message message : messages) {
System.out.println("Message ID: " + message.getMessageId());
System.out.println("Body: " + message.getBody());
}
}
@Override
public void onError(Exception e) {
System.err.println("Error receiving messages: " + e.getMessage());
e.printStackTrace();
}
}
);
|
CompletableFuture<ReceiveMessageResponse> completableFuture = sqsAsyncBatchManager
.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queueUrl).build())
.whenComplete((receiveMessageResponse, throwable) -> {
if (throwable != null) {
System.err.println("Error receiving messages: " + throwable.getMessage());
throwable.printStackTrace();
} else {
List<Message> messages = receiveMessageResponse.messages();
System.out.println("Received " + messages.size() + " messages");
for (Message message : messages) {
System.out.println("Message ID: " + message.messageId());
System.out.println("Body: " + message.body());
}
}
});
|
关键配置参数
参数 |
v1 |
v2 |
最大批次大小 |
maxBatchSize (默认每批 10 个请求) |
maxBatchSize (默认每批 10 个请求) |
Batch 等待时间 |
maxBatchOpenMs (默认为 200 毫秒) |
sendRequestFrequency (默认为 200 毫秒) |
可见性超时 |
visibilityTimeoutSeconds (队列默认为 -1) |
receiveMessageVisibilityTimeout (队列默认) |
最短等待时间 |
longPollWaitTimeoutSeconds (如果为真,longPoll 则为 20 秒) |
receiveMessageMinWaitDuration (默认为 50 毫秒) |
消息属性 |
使用以下方法进行设置 ReceiveMessageRequest |
receiveMessageAttributeNames (默认为无) |
系统属性 |
使用以下方法进行设置 ReceiveMessageRequest |
receiveMessageSystemAttributeNames (默认为无) |
长轮询 |
longPoll (默认为 true) |
不支持以避免在服务器发送消息之前打开连接 |
长时间轮询的最长等待时间 |
longPollWaitTimeoutSeconds (默认为 20 秒) |
不支持以避免在服务器发送消息之前打开连接 |
存储在客户端的预取接收批次的最大数量 |
maxDoneReceiveBatches (10 个批次) |
不支持,因为它是在内部处理的 |
同时处理的活动出站批次的最大数量 |
maxInflightOutboundBatches (默认 5 个批次) |
不支持,因为它是在内部处理的 |
同时处理的活动接收批次的最大数量 |
maxInflightReceiveBatches (默认 10 个批次) |
不支持,因为它是在内部处理的 |