本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon SDK for Java 2.x 处理分页结果
当响应对象太大而无法在单个响应中返回时,许多 Amazon 操作都会返回分页结果。在 Amazon SDK for Java 1.0 中,响应包含一个用于检索下一页结果的标记。相比之下, Amazon SDK for Java 2.x 具有自动分页方法,可以进行多次服务调用,自动为您获取下一页的结果。您只需编写处理结果的代码。自动分页功能适用于同步和异步客户端。
注意
这些代码片段假设您了解使用的基础知识SDK,并且已将您的环境配置为单点登录访问权限。
同步分页
以下示例演示列出 Amazon S3 桶中对象的同步分页方法。
迭代页面
第一个示例演示如何使用分listRes
页器对象(一个ListObjectsV2Iterable
stream
代码在响应页面上进行流式传输,将响应流转换为S3Object
内容流,然后处理 Amazon S3 对象的内容。
以下导入适用于此同步分页部分中的所有示例。
import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.waiters.S3Waiter; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
ListObjectsV2Request listReq = ListObjectsV2Request.builder() .bucket(bucketName) .maxKeys(1) .build(); ListObjectsV2Iterable listRes = s3.listObjectsV2Paginator(listReq); // Process response pages listRes.stream() .flatMap(r -> r.contents().stream()) .forEach(content -> System.out .println(" Key: " + content.key() + " size = " + content.size()));
请参阅上的完整示例
迭代对象
以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。ListObjectsV2Iterable
类的 contents
方法返回一个 SdkIterable
使用流
以下代码段在响应内容上使用 stream
方法来迭代分页项目集合。
// Helper method to work with paginated collection of items directly. listRes.contents().stream() .forEach(content -> System.out .println(" Key: " + content.key() + " size = " + content.size()));
请参阅上的完整示例
使用 for-each 循环
由于 SdkIterable
扩展了 Iterable
接口,因此您可以像处理任何 Iterable
一样处理内容。以下代码段使用标准 for-each
循环迭代响应的内容。
for (S3Object content : listRes.contents()) { System.out.println(" Key: " + content.key() + " size = " + content.size()); }
请参阅上的完整示例
手动分页
如果您的使用案例需要手动分页,则手动分页仍然可用。对后续请求使用响应对象中的下一个令牌。以下示例使用 while
循环。
ListObjectsV2Request listObjectsReqManual = ListObjectsV2Request.builder() .bucket(bucketName) .maxKeys(1) .build(); boolean done = false; while (!done) { ListObjectsV2Response listObjResponse = s3.listObjectsV2(listObjectsReqManual); for (S3Object content : listObjResponse.contents()) { System.out.println(content.key()); } if (listObjResponse.nextContinuationToken() == null) { done = true; } listObjectsReqManual = listObjectsReqManual.toBuilder() .continuationToken(listObjResponse.nextContinuationToken()) .build(); }
请参阅上的完整示例
异步分页
以下示例演示了列出 DynamoDB 表格的异步分页方法。
迭代表名称页面
以下两个示例使用异步 DynamoDB 客户端,该客户端调用listTablesPaginator
该方法并请求获取。ListTablesPublisher
ListTablesPublisher
实现了两个接口,这为处理响应提供了许多选项。我们将研究每个接口的方法。
使用 Subscriber
以下代码示例演示如何使用 ListTablesPublisher
实现的 org.reactivestreams.Publisher
接口处理分页结果。要了解有关响应式流模型的更多信息,请参阅 React ive St GitHub reams 存储库
以下导入适用于此异步分页部分中的所有示例。
import io.reactivex.rxjava3.core.Flowable; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.services.dynamodb.paginators.ListTablesPublisher; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
以下代码获取一个 ListTablesPublisher
实例。
// Creates a default client with credentials and region loaded from the // environment. final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build(); ListTablesPublisher publisher = asyncClient.listTablesPaginator(listTablesRequest);
以下代码使用 org.reactivestreams.Subscriber
的匿名实现来处理每个页面的结果。
onSubscribe
方法将调用 Subscription.request
方法来对来自发布者的数据启动请求。必须调用此方法以开始从发布者获取数据。
订阅者的 onNext
方法将处理响应页面,它会访问所有表名称并打印出每个表名称。处理完该页面后,会向发布者请求另一个页面。将重复调用该方法,直到检索了所有页面。
如果检索数据时出现错误,将触发 onError
方法。最后,在 onComplete
方法在请求所有页面后调用。
// A Subscription represents a one-to-one life-cycle of a Subscriber subscribing // to a Publisher. publisher.subscribe(new Subscriber<ListTablesResponse>() { // Maintain a reference to the subscription object, which is required to request // data from the publisher. private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; // Request method should be called to demand data. Here we request a single // page. subscription.request(1); } @Override public void onNext(ListTablesResponse response) { response.tableNames().forEach(System.out::println); // After you process the current page, call the request method to signal that // you are ready for next page. subscription.request(1); } @Override public void onError(Throwable t) { // Called when an error has occurred while processing the requests. } @Override public void onComplete() { // This indicates all the results are delivered and there are no more pages // left. } });
请参阅上的完整示例
使用 Consumer
ListTablesPublisher
实现的 SdkPublisher
接口有一个 subscribe
方法,该方法接受 Consumer
并返回 CompletableFuture<Void>
。
此接口中的 subscribe
方法可用于 org.reactivestreams.Subscriber
开销可能过大的简单用例。当下面的代码使用每个页面时,它会在每个页面上调用 tableNames
方法。该 tableNames
方法返回使用 forEach
方法处理的 DynamoDB 表名的 java.util.List
。
// Use a Consumer for simple use cases. CompletableFuture<Void> future = publisher.subscribe( response -> response.tableNames() .forEach(System.out::println));
请参阅上的完整示例
迭代表名称
以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。与之前用 contents
方法演示的同步 Amazon S3 示例类似,DynamoDB 异步结果类 ListTablesPublisher
具有与底层项目集合交互的 tableNames
便捷方法。此 tableNames
方法的返回类型是一个 SdkPublisher
使用 Subscriber
以下代码获取表名底层集合的 SdkPublisher
。
// Create a default client with credentials and region loaded from the // environment. final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build(); ListTablesPublisher listTablesPublisher = asyncClient.listTablesPaginator(listTablesRequest); SdkPublisher<String> publisher = listTablesPublisher.tableNames();
以下代码使用 org.reactivestreams.Subscriber
的匿名实现来处理每个页面的结果。
订阅者的 onNext
方法将处理集合中的单个元素。在本例中,它是一个表名称。处理完该表名称后,会向发布者请求另一个表名称。将重复调用该方法,直到检索了所有表名称。
// Use a Subscriber. publisher.subscribe(new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } @Override public void onNext(String tableName) { System.out.println(tableName); subscription.request(1); } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
请参阅上的完整示例
使用 Consumer
以下示例使用 SdkPublisher
的 subscribe
方法(采用 Consumer
)来处理每个项目。
// Use a Consumer. CompletableFuture<Void> future = publisher.subscribe(System.out::println); future.get();
请参阅上的完整示例
使用第三方库
您可以使用其他第三方库,而不是实现自定义订阅者。此示例演示了的用法 RxJava,但是可以使用任何实现响应式流接口的库。有关该库的更多信息, GitHub请参阅上的 RxJava wiki 页面
要使用该库,请将其作为依赖项添加。如果使用 Maven,则该示例将显示要使用的POM代码片段。
POM参赛作品
<dependency> <groupId>io.reactivex.rxjava3</groupId> <artifactId>rxjava</artifactId> <version>3.1.6</version> </dependency>
代码
DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesPublisher publisher = asyncClient.listTablesPaginator(ListTablesRequest.builder() .build()); // The Flowable class has many helper methods that work with // an implementation of an org.reactivestreams.Publisher. List<String> tables = Flowable.fromPublisher(publisher) .flatMapIterable(ListTablesResponse::tableNames) .toList() .blockingGet(); System.out.println(tables);
请参阅上的完整示例