

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 启用客户端缓冲和请求批处理功能，并将其与 Amazon SQS 结合使用
<a name="sqs-client-side-buffering-request-batching"></a>

[适用于 Java 的 Amazon SDK](https://www.amazonaws.cn/sdkforjava/) 包括可访问 Amazon SQS 的 `AmazonSQSBufferedAsyncClient`。该客户端支持使用客户端缓冲进行简单的请求批处理。客户端进行的调用首先被缓冲，然后作为批处理请求发送到 Amazon SQS。

客户端缓冲最多允许缓冲 10 个请求并将这些请求作为一个批处理请求发送，从而减少使用 Amazon SQS 的成本并减少发送的请求数。`AmazonSQSBufferedAsyncClient` 会缓冲同步和异步调用。批量请求和对[长轮询](sqs-short-and-long-polling.md)的支持还有助于提高吞吐量。有关更多信息，请参阅 [利用水平扩缩和操作批处理，借助 Amazon SQS 来提高吞吐量](sqs-throughput-horizontal-scaling-and-batching.md)。

由于 `AmazonSQSBufferedAsyncClient` 实施与 `AmazonSQSAsyncClient` 相同的接口，因此从 `AmazonSQSAsyncClient` 迁移到 `AmazonSQSBufferedAsyncClient` 通常只需要对现有代码进行少量的更改。

**注意**  
Amazon SQS 缓冲异步客户端目前不支持 FIFO 队列。

## 使用亚马逊 SQSBuffered AsyncClient
<a name="using-buffered-async-client"></a>

在开始之前，请完成 [设置 Amazon SQS](sqs-setting-up.md) 中的步骤。

### Amazon 适用于 Java 的 SDK 1.x
<a name="using-buffered-async-client-java1"></a>

对于 Amazon 适用于 Java 的 SDK 1.x，你可以`AmazonSQSBufferedAsyncClient`根据以下示例创建一个新的：

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
```

在创建新的 `AmazonSQSBufferedAsyncClient` 之后，您可以使用它将多个请求发送到 Amazon SQS（就像使用 `AmazonSQSAsyncClient` 所做的那样），例如：

```
final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue");
 
final CreateQueueResult res = bufferedSqs.createQueue(createRequest);
 
final SendMessageRequest request = new SendMessageRequest();
final String body = "Your message text" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
 
final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request);
 
final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(1)
    .withQueueUrl(queueUrl);
final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
```

### 配置亚马逊 SQSBuffered AsyncClient
<a name="configuring-buffered-async-client"></a>

`AmazonSQSBufferedAsyncClient` 预配置了适用于大多数使用案例的设置。您可以进一步配置 `AmazonSQSBufferedAsyncClient`，例如：

1. 使用必需的配置参数来创建 `QueueBufferConfig` 类的实例。

1. 将该实例提供给 `AmazonSQSBufferedAsyncClient` 构造函数。

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
final QueueBufferConfig config = new QueueBufferConfig()
    .withMaxInflightReceiveBatches(5)
    .withMaxDoneReceiveBatches(15);
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
```


**QueueBufferConfig 配置参数**  

| 参数 | 默认 值 | 说明 | 
| --- | --- | --- | 
| longPoll | true |  如果 `longPoll` 设置为 `true`，`AmazonSQSBufferedAsyncClient` 会在使用消息时尝试使用长轮询。  | 
| longPollWaitTimeoutSeconds | 20 秒 |  在返回空接收结果前，`ReceiveMessage` 调用在服务器上阻塞以等待消息显示在队列中的最长时间（以秒为单位）。  如果禁用长轮询，则此设置不起作用。   | 
| maxBatchOpenMs | 200 毫秒 |  传出调用等待其他要一起对同类型的消息进行批处理的调用的最长时间（以毫秒为单位）。 设置的时间越长，则执行等量工作所需的批处理次数就越少（但是，批处理中的首次调用必须等待更长的时间）。 如果将此参数设置为 `0`，则提交的请求不会等待其他请求，从而有效地禁用批处理。  | 
| maxBatchSize | 每批 10 个请求 |  在一个请求中一起进行批处理的消息的最大数量。该设置越大，则执行等量请求所需的批处理就越少。  Amazon SQS 允许的最大值为每批 10 个请求。   | 
| maxBatchSizeBytes | 1 MiB |  客户端尝试向 Amazon SQS 发送的消息批处理的最大大小（以字节为单位）。  Amazon SQS 允许的最大值为 1 MiB。   | 
| maxDoneReceiveBatches | 10 个批处理 |  `AmazonSQSBufferedAsyncClient` 在客户端预取和存储的接收批处理的最大数量。 设置的值越高，则可满足越多的接收请求而不必调用 Amazon SQS（但是，预取的消息越多，则消息在缓冲区中停留的时间就越长，从而导致它们的可见性超时过期）。  `0` 表示所有消息预取操作将被禁用，消息只能按需使用。   | 
| maxInflightOutboundBatches | 5 个批处理 |  可以同时处理的最大活跃出站批处理数量。 设置的值越高，发送出站批处理的速度就越快（受限于其他配额，例如 CPU 或带宽），并且 `AmazonSQSBufferedAsyncClient` 使用的线程就越多。  | 
| maxInflightReceiveBatches | 10 个批处理 |  可以同时处理的最大活跃接收批处理数量。 设置的值越高，可接收的消息就越多（受限于其他配额，例如 CPU 或带宽），并且 `AmazonSQSBufferedAsyncClient` 使用的线程就越多。  `0` 表示所有消息预取操作将被禁用，消息只能按需使用。   | 
| visibilityTimeoutSeconds | –1 |  如果此参数设置为正值（非零值），则此处设置的可见性超时将覆盖在使用的消息所在的队列上设置的可见性超时。  `-1` 表示为队列选择默认设置。 不能将可见性超时设置为 `0`。   | 

### Amazon 适用于 Java 的 SDK 2.x
<a name="using-buffered-async-client-java2"></a>

对于 Amazon 适用于 Java 的 SDK 2.x，你可以`SqsAsyncBatchManager`根据以下示例创建一个新的：

```
// Create the basic Sqs Async Client
SqsAsyncClient sqs = SqsAsyncClient.builder() 
    .region(Region.US_EAST_1) 
    .build();

// Create the batch manager
SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();
```

在创建新的 `SqsAsyncBatchManager` 之后，您可以使用它将多个请求发送到 Amazon SQS（就像使用 `SqsAsyncClient` 所做的那样），例如：

```
final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID();
final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build();
final String queueUrl = sqs.createQueue(request).join().queueUrl();
System.out.println("Queue created: " + queueUrl);


// Send messages
CompletableFuture<SendMessageResponse> sendMessageFuture;
for (int i = 0; i < 10; i++) {
    final int index = i;
    sendMessageFuture = sqsAsyncBatchManager.sendMessage(
            r -> r.messageBody("Message " + index).queueUrl(queueUrl));
    SendMessageResponse response= sendMessageFuture.join();
    System.out.println("Message " + response.messageId() + " sent!");
}

// Receive messages with customized configurations
CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage(
        r -> r.queueUrl(queueUrl)
                .waitTimeSeconds(10)
                .visibilityTimeout(20)
                .maxNumberOfMessages(10)
);
System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total.");

// Delete messages
DeleteQueueRequest deleteQueueRequest =  DeleteQueueRequest.builder().queueUrl(queueUrl).build();
int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode();
System.out.println("Queue is deleted, with statusCode " + code);
```

### 正在配置 SqsAsyncBatchManager
<a name="configuring-SqsAsyncBatchManager"></a>

`SqsAsyncBatchManager` 预配置了适用于大多数使用案例的设置。您可以进一步配置 `SqsAsyncBatchManager`，例如：

通过 `SqsAsyncBatchManager.Builder` 创建自定义配置：

```
SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() 
    .client(sqs)
    .scheduledExecutor(Executors.newScheduledThreadPool(5))
    .overrideConfiguration(b -> b 
        .maxBatchSize(10)
        .sendRequestFrequency(Duration.ofMillis(200))
        .receiveMessageMinWaitDuration(Duration.ofSeconds(10))
        .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) 
        .receiveMessageAttributeNames(Collections.singletonList("*"))
        .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL)))
    .build();
```


**`BatchOverrideConfiguration` 参数**  

| 参数 | 默认 值 | 说明 | 
| --- | --- | --- | 
| maxBatchSize |  每批 10 个请求  | 在一个请求中一起进行批处理的消息的最大数量。该设置越大，则执行等量请求所需的批处理就越少。  Amazon SQS 允许的最大值为每批次 10 个请求。  | 
| sendRequestFrequency |  200 毫秒  | 传出调用等待其他要一起对同类型的消息进行批处理的调用的最长时间（以毫秒为单位）。 设置的时间越长，则执行等量工作所需的批处理次数就越少（但是，批处理中的首次调用必须等待更长的时间）。 如果将此参数设置为 `0`，则提交的请求不会等待其他请求，从而有效地禁用批处理。 | 
| receiveMessageVisibilityTimeout |  –1  | 如果此参数设置为正值（非零值），则此处设置的可见性超时将覆盖在使用的消息所在的队列上设置的可见性超时。   `1` 表示为队列选择默认设置。不能将可见性超时设置为 `0`。   | 
| receiveMessageMinWaitDuration |  50 毫秒  | `receiveMessage` 调用等待可用消息被获取的最短时间（以毫秒为单位）。设置的值越高，则执行等量请求所需的批量操作就越少。  | 