适用于 Java 的 AWS 开发工具包
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

检索分页结果

当响应对象太大而无法在单个响应中返回时,很多 AWS 操作都会返回分页的结果。在AWS SDK for Java 1.0 中,响应包含了您必须使用以检索下一页结果的令牌。AWS SDK for Java 2.0 中新增了自动分页方法,该方法可进行多个服务调用以自动为您获取下一页结果。您只需编写处理结果的代码。此外,这两种类型的方法还具有同步和异步版本。有关异步客户端的更多详细信息,请参阅异步编程

以下示例使用 Amazon S3 和 Amazon DynamoDB 操作来演示从分页响应中检索数据的各种方法。

注意

这些代码段假定您了解使用适用于 Java 的 AWS 开发工具包 2.0 中的内容,并且已使用设置用于开发的 AWS 凭证和区域中的信息配置默认 AWS 凭证。

同步分页

这些示例使用同步分页方法来列出 Amazon S3 存储桶中的对象。

迭代页面

构建一个 ListObjectsV2Request 并提供一个存储桶名称。您可以选择性地提供要一次性检索的最大密钥数。将其传递到 S3ClientlistObjectsV2Paginator 方法。此方法将返回 ListObjectsV2Iterable 对象,该对象是 ListObjectsV2Response 类的 Iterable

第一个示例演示如何使用分页工具对象,借助 stream 方法迭代所有响应页面。您可以直接流式通过响应页面,将响应流转换为 S3Object 内容的流,然后处理 Amazon S3 对象的内容。

导入

import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Paths; import java.util.Random; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer;

代码

// Build the list objects request ListObjectsV2Request listReq = ListObjectsV2Request.builder() .bucket(bucket) .maxKeys(1) .build(); ListObjectsV2Iterable listRes = s3.listObjectsV2Paginator(listReq); // Process response pages listRes.stream() .flatMap(r -> r.contents().stream()) .forEach(content -> System.out.println(" Key: " + content.key() + " size = " + content.size()));

请参阅 GitHub 上的完整示例

迭代对象

以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。

使用流

在响应内容上使用 stream 方法来迭代分页项目集合。

代码

// Helper method to work with paginated collection of items directly listRes.contents().stream() .forEach(content -> System.out.println(" Key: " + content.key() + " size = " + content.size()));

请参阅 GitHub 上的完整示例

使用 For 循环

使用标准 for 循环迭代响应的内容。

代码

// Use simple for loop if stream is not necessary for (S3Object content : listRes.contents()) { System.out.println(" Key: " + content.key() + " size = " + content.size()); }

请参阅 GitHub 上的完整示例

手动分页

如果您的使用案例需要手动分页,则手动分页仍然可用。对后续请求使用响应对象中的下一个令牌。下面是使用 while 循环的示例。

代码

// Use manual pagination ListObjectsV2Request listObjectsReqManual = ListObjectsV2Request.builder() .bucket(bucket) .maxKeys(1) .build(); boolean done = false; while (!done) { ListObjectsV2Response listObjResponse = s3.listObjectsV2(listObjectsReqManual); for (S3Object content : listObjResponse.contents()) { System.out.println(content.key()); } if (listObjResponse.nextContinuationToken() == null) { done = true; } listObjectsReqManual = listObjectsReqManual.toBuilder() .continuationToken(listObjResponse.nextContinuationToken()) .build(); }

请参阅 GitHub 上的完整示例

异步分页

这些示例使用异步分页方法来列出 DynamoDB 中的表。异步编程主题中提供了手动分页示例。

迭代表名称页面

首先,创建一个异步 DynamoDB 客户端。然后,调用 listTablesPaginator 方法来获取 ListTablesPublisher。这是反应式流 Publisher 接口的实现。要了解有关反应式流模型的更多信息,请参阅反应式流 Github 存储库

ListTablesPublisher 调用 subscribe 方法并传递订阅者实现。在本示例中,订阅者有一个 onNext 方法,该方法一次从发布者请求一个项目。这是在检索到所有页面之前重复调用的方法。onSubscribe 方法将调用 Subscription.request 方法来对来自发布者的数据启动请求。必须调用此方法以开始从发布者获取数据。如果检索数据时出现错误,将触发 onError 方法。最后,在 onComplete 方法在请求所有页面后调用。

使用订阅者

导入

import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.services.dynamodb.paginators.ListTablesPublisher; import io.reactivex.Flowable; import reactor.core.publisher.Flux;

代码

首先创建一个异步客户端

// Creates a default client with credentials and regions loaded from the environment final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build();

然后,使用订阅者获取结果。

// Or subscribe method should be called to create a new Subscription. // A Subscription represents a one-to-one life-cycle of a Subscriber subscribing to a Publisher. publisher.subscribe(new Subscriber<ListTablesResponse>() { // Maintain a reference to the subscription object, which is required to request data from the publisher private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; // Request method should be called to demand data. Here we request a single page subscription.request(1); } @Override public void onNext(ListTablesResponse response) { response.tableNames().forEach(System.out::println); // Once you process the current page, call the request method to signal that you are ready for next page subscription.request(1); } @Override public void onError(Throwable t) { // Called when an error has occurred while processing the requests } @Override public void onComplete() { // This indicates all the results are delivered and there are no more pages left }

请参阅 GitHub 上的完整示例

使用 For 循环

使用 for 循环,针对创建新订阅者可能开销过高的简单使用案例迭代页面。响应发布者对象有一个 forEach 帮助程序方法来达到此目的。

代码

ListTablesPublisher publisher = asyncClient.listTablesPaginator(listTablesRequest); // Use a for-loop for simple use cases CompletableFuture<Void> future = publisher.subscribe(response -> response.tableNames() .forEach(System.out::println));

请参阅 GitHub 上的完整示例

迭代表名称

以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。与同步结果类似,异步结果类具有一个与基础项目集合交互的方法。此便捷方法的返回类型是一个可用于跨所有页面请求项目的发布者。

使用订阅者

代码

首先创建一个异步客户端

System.out.println("running AutoPagination - iterating on item collection...\n"); // Creates a default client with credentials and regions loaded from the environment final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build();

然后,使用订阅者获取结果。

// Use subscriber publisher.subscribe(new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } @Override public void onNext(String tableName) { System.out.println(tableName); subscription.request(1); } @Override public void onError(Throwable t) { } @Override public void onComplete() { }

请参阅 GitHub 上的完整示例

使用 For 循环

使用 forEach 便捷方法迭代结果。

代码

// Use forEach CompletableFuture<Void> future = publisher.subscribe(System.out::println); future.get();

请参阅 GitHub 上的完整示例

使用第三方库

您可以使用其他第三方库,而不是实现自定义订阅者。本示例演示了如何使用 RxJava 实现,但实现反应式流接口的任何库都可以使用。有关该库的更多信息,请参阅 Github 上的 RxJava 维基页面

要使用该库,请将其作为依赖项添加。如果使用了 Maven,示例将显示要使用的 POM 代码段。

POM 条目

<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.9</version> </dependency>

导入

import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.services.dynamodb.paginators.ListTablesPublisher; import io.reactivex.Flowable; import reactor.core.publisher.Flux;

代码

System.out.println("running AutoPagination - using third party subscriber...\n"); DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesPublisher publisher = asyncClient.listTablesPaginator(ListTablesRequest.builder() .build()); // The Flowable class has many helper methods that work with any reactive streams compatible publisher implementation List<String> tables = Flowable.fromPublisher(publisher) .flatMapIterable(ListTablesResponse::tableNames) .toList() .blockingGet(); System.out.println(tables);

本页内容: