使用 Amazon SDK for Java 2.x 处理分页结果 - Amazon SDK for Java 2.x
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon SDK for Java 2.x 处理分页结果

当响应对象太大而无法在单个响应中返回时,许多 Amazon 操作都会返回分页结果。在 Amazon SDK for Java 1.0 中,响应包含一个用于检索下一页结果的标记。相比之下, Amazon SDK for Java 2.x 具有自动分页方法,可以进行多次服务调用,自动为您获取下一页的结果。您只需编写处理结果的代码。自动分页功能适用于同步和异步客户端。

注意

这些代码片段假设您了解使用的基础知识SDK,并且已将您的环境配置为单点登录访问权限

同步分页

以下示例演示列出 Amazon S3 桶中对象的同步分页方法。

迭代页面

第一个示例演示如何使用分listRes页器对象(一个ListObjectsV2Iterable实例)来使用该方法遍历所有响应页面。stream代码在响应页面上进行流式传输,将响应流转换为S3Object内容流,然后处理 Amazon S3 对象的内容。

以下导入适用于此同步分页部分中的所有示例。

import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.waiters.S3Waiter; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
ListObjectsV2Request listReq = ListObjectsV2Request.builder() .bucket(bucketName) .maxKeys(1) .build(); ListObjectsV2Iterable listRes = s3.listObjectsV2Paginator(listReq); // Process response pages listRes.stream() .flatMap(r -> r.contents().stream()) .forEach(content -> System.out .println(" Key: " + content.key() + " size = " + content.size()));

请参阅上的完整示例 GitHub。

迭代对象

以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。ListObjectsV2Iterable 类的 contents 方法返回一个 SdkIterable,它提供了几种处理底层内容元素的方法。

使用流

以下代码段在响应内容上使用 stream 方法来迭代分页项目集合。

// Helper method to work with paginated collection of items directly. listRes.contents().stream() .forEach(content -> System.out .println(" Key: " + content.key() + " size = " + content.size()));

请参阅上的完整示例 GitHub。

使用 for-each 循环

由于 SdkIterable 扩展了 Iterable 接口,因此您可以像处理任何 Iterable 一样处理内容。以下代码段使用标准 for-each 循环迭代响应的内容。

for (S3Object content : listRes.contents()) { System.out.println(" Key: " + content.key() + " size = " + content.size()); }

请参阅上的完整示例 GitHub。

手动分页

如果您的使用案例需要手动分页,则手动分页仍然可用。对后续请求使用响应对象中的下一个令牌。以下示例使用 while 循环。

ListObjectsV2Request listObjectsReqManual = ListObjectsV2Request.builder() .bucket(bucketName) .maxKeys(1) .build(); boolean done = false; while (!done) { ListObjectsV2Response listObjResponse = s3.listObjectsV2(listObjectsReqManual); for (S3Object content : listObjResponse.contents()) { System.out.println(content.key()); } if (listObjResponse.nextContinuationToken() == null) { done = true; } listObjectsReqManual = listObjectsReqManual.toBuilder() .continuationToken(listObjResponse.nextContinuationToken()) .build(); }

请参阅上的完整示例 GitHub。

异步分页

以下示例演示了列出 DynamoDB 表格的异步分页方法。

迭代表名称页面

以下两个示例使用异步 DynamoDB 客户端,该客户端调用listTablesPaginator该方法并请求获取。ListTablesPublisher ListTablesPublisher实现了两个接口,这为处理响应提供了许多选项。我们将研究每个接口的方法。

使用 Subscriber

以下代码示例演示如何使用 ListTablesPublisher 实现的 org.reactivestreams.Publisher 接口处理分页结果。要了解有关响应式流模型的更多信息,请参阅 React ive St GitHub reams 存储库

以下导入适用于此异步分页部分中的所有示例。

import io.reactivex.rxjava3.core.Flowable; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.services.dynamodb.paginators.ListTablesPublisher; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;

以下代码获取一个 ListTablesPublisher 实例。

// Creates a default client with credentials and region loaded from the // environment. final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build(); ListTablesPublisher publisher = asyncClient.listTablesPaginator(listTablesRequest);

以下代码使用 org.reactivestreams.Subscriber 的匿名实现来处理每个页面的结果。

onSubscribe 方法将调用 Subscription.request 方法来对来自发布者的数据启动请求。必须调用此方法以开始从发布者获取数据。

订阅者的 onNext 方法将处理响应页面,它会访问所有表名称并打印出每个表名称。处理完该页面后,会向发布者请求另一个页面。将重复调用该方法,直到检索了所有页面。

如果检索数据时出现错误,将触发 onError 方法。最后,在 onComplete 方法在请求所有页面后调用。

// A Subscription represents a one-to-one life-cycle of a Subscriber subscribing // to a Publisher. publisher.subscribe(new Subscriber<ListTablesResponse>() { // Maintain a reference to the subscription object, which is required to request // data from the publisher. private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; // Request method should be called to demand data. Here we request a single // page. subscription.request(1); } @Override public void onNext(ListTablesResponse response) { response.tableNames().forEach(System.out::println); // After you process the current page, call the request method to signal that // you are ready for next page. subscription.request(1); } @Override public void onError(Throwable t) { // Called when an error has occurred while processing the requests. } @Override public void onComplete() { // This indicates all the results are delivered and there are no more pages // left. } });

请参阅上的完整示例 GitHub。

使用 Consumer

ListTablesPublisher 实现的 SdkPublisher 接口有一个 subscribe 方法,该方法接受 Consumer 并返回 CompletableFuture<Void>

此接口中的 subscribe 方法可用于 org.reactivestreams.Subscriber 开销可能过大的简单用例。当下面的代码使用每个页面时,它会在每个页面上调用 tableNames 方法。该 tableNames 方法返回使用 forEach 方法处理的 DynamoDB 表名的 java.util.List

// Use a Consumer for simple use cases. CompletableFuture<Void> future = publisher.subscribe( response -> response.tableNames() .forEach(System.out::println));

请参阅上的完整示例 GitHub。

迭代表名称

以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。与之前用 contents 方法演示的同步 Amazon S3 示例类似,DynamoDB 异步结果类 ListTablesPublisher 具有与底层项目集合交互的 tableNames 便捷方法。此 tableNames 方法的返回类型是一个 SdkPublisher,可用于跨所有页面请求项目。

使用 Subscriber

以下代码获取表名底层集合的 SdkPublisher

// Create a default client with credentials and region loaded from the // environment. final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build(); ListTablesPublisher listTablesPublisher = asyncClient.listTablesPaginator(listTablesRequest); SdkPublisher<String> publisher = listTablesPublisher.tableNames();

以下代码使用 org.reactivestreams.Subscriber 的匿名实现来处理每个页面的结果。

订阅者的 onNext 方法将处理集合中的单个元素。在本例中,它是一个表名称。处理完该表名称后,会向发布者请求另一个表名称。将重复调用该方法,直到检索了所有表名称。

// Use a Subscriber. publisher.subscribe(new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } @Override public void onNext(String tableName) { System.out.println(tableName); subscription.request(1); } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });

请参阅上的完整示例 GitHub。

使用 Consumer

以下示例使用 SdkPublishersubscribe 方法(采用 Consumer)来处理每个项目。

// Use a Consumer. CompletableFuture<Void> future = publisher.subscribe(System.out::println); future.get();

请参阅上的完整示例 GitHub。

使用第三方库

您可以使用其他第三方库,而不是实现自定义订阅者。此示例演示了的用法 RxJava,但是可以使用任何实现响应式流接口的库。有关该库的更多信息, GitHub请参阅上的 RxJava wiki 页面

要使用该库,请将其作为依赖项添加。如果使用 Maven,则该示例将显示要使用的POM代码片段。

POM参赛作品

<dependency> <groupId>io.reactivex.rxjava3</groupId> <artifactId>rxjava</artifactId> <version>3.1.6</version> </dependency>

代码

DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesPublisher publisher = asyncClient.listTablesPaginator(ListTablesRequest.builder() .build()); // The Flowable class has many helper methods that work with // an implementation of an org.reactivestreams.Publisher. List<String> tables = Flowable.fromPublisher(publisher) .flatMapIterable(ListTablesResponse::tableNames) .toList() .blockingGet(); System.out.println(tables);

请参阅上的完整示例 GitHub。