使用适用于 Amazon SQS 的自动请求批处理 Amazon SDK for Java 2.x - Amazon SDK for Java 2.x
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用适用于 Amazon SQS 的自动请求批处理 Amazon SDK for Java 2.x

Amazon API 的自动请求批处理SQS是一个高级库,它为批处理和缓冲SQS操作请求提供了一种有效的方法。通过使用批处理API,可以将请求数量减少到SQS,从而提高吞吐量并最大限度地降低成本。

由于批处理API方法与方法receiveMessagesendMessage、、changeMessageVisibility、)相匹配deleteMessage,因此您可以将该批次API用作即用替代SqsAsyncClient方法,只需进行最少的更改。

本主题概述了如何配置和使用API适用于 Amazon SQS 的自动请求批处理。

检查先决条件

您需要使用SDK适用于 Java 2.x 的 2.28.0 或更高版本才能访问批处理。API你的 Maven 至少pom.xml应该包含以下元素。

<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.28.231</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>

1 最新版本

创建批处理管理器

自动批处理API请求由SqsAsyncBatchManager接口实现。您可以通过几种方式创建管理器实例。

1. 使用默认配置 SqsAsyncClient

创建批处理管理器的最简单方法是在现有SqsAsyncClient实例上调用batchManager工厂方法。以下片段显示了简单的方法。

SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();

当您使用这种方法时,SqsAsyncBatchManager实例将使用该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的默认值。此外,该SqsAsyncBatchManager实例使用从中创建它的SqsAsyncClient实例的。ExecutorService

1. 使用自定义配置 SqsAsyncBatchManager.Builder

对于更高级的用例,您可以使用自定义批处理管理器SqsAsyncBatchManager.Builder。通过使用这种方法创建SqsAsyncBatchManager实例,您可以微调批处理行为。以下代码段显示了如何使用构建器自定义批处理行为的示例。

SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();

使用这种方法时,您可以调整该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的BatchOverrideConfiguration对象的设置。您也可以使用这种方法ScheduledExecutorService为批处理管理器提供自定义。

发送消息

要使用批处理管理器发送消息,请使用SqsAsyncBatchManager#sendMessage方法。缓SDK冲区请求并在达到maxBatchSizesendRequestFrequency值时将其作为批量发送。

以下示例显示了sendMessage紧随其后的是另一个请求的请求。在本例中,SDK将以单个批次发送这两条消息。

// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();

更改消息可见性超时

您可以使用SqsAsyncBatchManager#changeMessageVisibility方法批量更改消息的可见性超时。缓SDK冲区请求并在达到maxBatchSizesendRequestFrequency值时将其作为批量发送。

以下示例说明如何调用该changeMessageVisibility方法。

CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();

删除消息

您可以使用SqsAsyncBatchManager#deleteMessage方法批量删除消息。缓SDK冲区请求并在达到maxBatchSizesendRequestFrequency值时将其作为批量发送。

以下示例显示了如何调用该deleteMessage方法。

CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();

接收消息

使用默认设置

当您在应用程序中轮询该SqsAsyncBatchManager#receiveMessage方法时,批处理管理器会从其内部缓冲区获取消息,然后在后台SDK自动更新这些消息。

以下示例说明如何调用该receiveMessage方法。

CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));

使用自定义设置

如果您想进一步自定义请求,例如通过设置自定义等待时间和指定要检索的消息数量,则可以自定义请求,如以下示例所示。

CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注意

如果您receiveMessage使用ReceiveMessageRequest包含以下任何参数的调用,则会SDK绕过批处理管理器并发送常规异步receiveMessage请求:

  • messageAttributeNames

  • messageSystemAttributeNames

  • messageSystemAttributeNamesWithStrings

  • overrideConfiguration

覆盖的配置设置 SqsAsyncBatchManager

创建SqsAsyncBatchManager实例时,您可以调整以下设置。上提供了以下设置列表BatchOverrideConfiguration.Builder

设置 描述 默认值
maxBatchSize SendMessageBatchRequestChangeMessageVisibilityBatchRequest或每批次的最大请求数DeleteMessageBatchRequest。最大值为 10。 10
sendRequestFrequency

发送批次之前的时间,maxBatchSize除非提前到达。较高的值可能会减少请求,但会增加延迟。

200 毫秒
receiveMessageVisibilityTimeout 消息的可见性超时。如果未设置,则使用队列的默认值。 队列的默认值
receiveMessageMinWaitDuration receiveMessage请求的最短等待时间。避免设置为 0 以防止CPU浪费。 50 毫秒
receiveMessageSystemAttributeNames 请求receiveMessage呼叫的系统属性名称列表。
receiveMessageAttributeNames 请求receiveMessage呼叫的属性名称列表。