处理分页结果:扫描和查询 - Amazon SDK for Java 2.x
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

处理分页结果:扫描和查询

DynamoDB 增强型客户端 API 的 scanquerybatch 方法返回包含一个或多个页面 的响应。一个页面包含一个或多个项目。您的代码可以按页处理响应,也可以处理单个项目。

同步DynamoDbEnhancedClient客户端返回的分页响应返回一个PageIterable对象,而异步客户端返回的响应DynamoDbEnhancedAsyncClient返回一个PagePublisher对象。

本部分介绍如何处理分页结果,并提供使用扫描和查询 API 的示例。

扫描表

SDK 的 scan 方法对应于同名的 DynamoDB 操作。DynamoDB 增强型客户端 API 提供了相同的选项,但它使用熟悉的对象模型并为您处理分页。

首先,我们通过查看同步映射类的scan方法来探索PageIterable接口DynamoDbTable

使用同步 API

以下示例演示使用表达式筛选返回项的 scan 方法。ProductCatalog是前面显示的模型对象。

在注释行 1 之后显示的筛选表达式将返回的 ProductCatalog 项目限制为价格在 8.00 到 80.00 之间(含)的项目。

此示例还使用注释行 2 后面显示的 attributesToProject 方法排除 isbn 值。

在注释行 3 中,scan 方法返回 PageIterable 对象 pagedResultPageIterablestream 方法返回一个 java.util.Stream 对象,您可以使用该对象来处理页面。在此示例中,计算并记录了页数。

从注释行 4 开始,该示例演示访问 ProductCatalog 项目的两种变体。注释行 2a 之后的版本对每个页面进行流式处理,并对每页上的项目进行排序和记录。注释行 2b 之后的版本会跳过页面迭代并直接访问项目。

由于 PageIterable 接口有两个父接口(java.lang.IterableSdkIterable),因此提供了多种处理结果的方。Iterable 引入了 forEachiteratorspliterator 方法,而 SdkIterable 引入了 stream 方法。

public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }

使用异步 API

异步 scan 方法将结果作为 PagePublisher 对象返回。PagePublisher 接口有两种 subscribe 方法可用于处理响应页面。一种 subscribe 方法来自 org.reactivestreams.Publisher 父接口。要使用第一个选项处理页面,请向 subscribe 方法传递一个 Subscriber 实例。接下来的第一个示例演示了 subscribe 方法的用法。

第二种subscribe方法来自接SdkPublisher口。此版本的 subscribe 接受 Consumer 而不是 Subscriber。此 subscribe 方法变体如接下来的第二个示例所示。

以下示例演示 scan 方法的异步版本,该版本使用了上一个示例中的相同筛选表达式。

在注释行 3 之后,DynamoDbAsyncTable.scan 返回一个 PagePublisher 对象。在下一行,代码创建一个 org.reactivestreams.Subscriber 接口实例 ProductCatalogSubscriber,在注释行 4 之后,该实例订阅到 PagePublisher

ProductCatalogSubscriber 类示例的注释行 8 之后,Subscriber 对象从 onNext 方法中的每个页面收集 ProductCatalog 项目。这些项目存储在私有 List 变量中,并在调用代码中使用 ProductCatalogSubscriber.getSubscribedItems() 方法对它们进行访问。这是在注释行 5 之后调用的。

检索了列表后,代码按价格对所有 ProductCatalog 项目进行排序并记录每个项目。

ProductCatalogSubscriberCountDownLatch中的会阻塞调用线程,直到所有项目都已添加到列表中,然后在注释行 5 之后继续。

public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }

以下代码段示例使用的 PagePublisher.subscribe 方法版本在注释行 6 之后接受 Consumer。Java lambda 参数使用页面,进一步处理每个项目。在此示例中,对每个页面进行处理,并对每页上的项目进行排序和记录。

// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

PagePublisheritems 方法对模型实例进行解包,以便您的代码可以直接处理这些项目。以下代码段演示了这种方法。

// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

查询表

DynamoDbTable 类的 query() 方法基于主键值查找项目。@DynamoDbPartitionKey 注释和可选的 @DynamoDbSortKey 注释用于定义数据类的主键。

query() 方法需要一个分区键值来查找与提供的值相匹配的项目。如果您的表还定义了排序键,则可以在查询中为它添加一个值作为额外的比较条件来微调结果。

除了处理结果之外,同步版本和异步版本 query() 的工作原理相同。与 scan API 一样,query API 会为同步调用返回 PageIterable,为异步调用返回 PagePublisher。我们之前在扫描部分讨论了 PageIterablePagePublisher 的使用。

Query 方法示例

下面的 query() 方法代码示例使用 MovieActor 类。数据类定义了一个复合主键,该主键由分区键的 movie 属性和排序键的 actor 属性组成。

该类还表示它使用名为 acting_award_year 的全局二级索引。索引的复合主键由分区键的 actingaward 属性和排序键的 actingyear 属性组成。在本主题的后面部分,我们将在演示如何创建和使用索引时,引用 acting_award_year 索引。

package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }

对以下项目进行查询之后的代码示例。

MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}

以下代码定义了两个QueryConditional实例。 QueryConditionals使用键值(可以单独使用分区键,也可以与排序键组合使用),并与 DynamoDB 服务 API 的密钥条件表达式相对应。在注释行 1 之后,该示例定义了与分区值为 movie01 的项目匹配的 keyEqual 实例。

此示例还在注释行 2 之后,定义了一个筛选表达式,过滤掉任何没有 actingschoolname 的项目。

在注释第 3 行之后,该示例显示了代码传递给DynamoDbTable.query()方法的QueryEnhancedRequest实例。此对象结合了 SDK 用来生成对 DynamoDB 服务的请求的关键条件和筛选条件。

public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );

下面是运行该方法的输出。该输出显示 movieName 值为 movie01 的项目,不显示 actingSchoolName 等于 null 的项目。

2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}

在以下查询请求代码变体中,之前在注释行 3 之后显示的 keyEqual QueryConditional 将替换为注释行 1a 之后定义的 sortGreaterThanOrEqualTo QueryConditional。以下代码还删除了筛选表达式。

QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo)

由于此表具有复合主键,因此所有 QueryConditional 实例都需要分区键值。以 sort... 开头的 QueryConditional 方法指示需要排序 键。结果未排序。

以下输出显示了查询的结果。该查询仅返回 movieName 值等于 movie01actorName 值大于或等于 actor2 的项目。由于筛选条件已被删除,因此查询会返回没有 actingSchoolName 属性值的项目。

2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}