本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
处理分页结果:扫描和查询
DynamoDB 增强型客户端 API 的 scan、query 和 batch 方法返回包含一个或多个页面 的响应。一个页面包含一个或多个项目。您的代码可以按页处理响应,也可以处理单个项目。
同步DynamoDbEnhancedClient客户端返回的分页响应返回一个PageIterableDynamoDbEnhancedAsyncClient返回一个PagePublisher
本节介绍如何处理分页结果,并提供使用扫描和查询 APIs的示例。
扫描表
SDK 的 scan
首先,我们通过查看同步映射类的scan方法来探索PageIterable接口DynamoDbTable
使用同步 API
以下示例演示使用表达式scan 方法。ProductCatalog是前面显示的模型对象。
在评论行 2 之后显示的筛选表达式将退回的ProductCatalog商品限制为价格在 8.00 到 80.00 之间(含)的商品。
此示例还使用注释行 1 后面显示的attributesToProject方法排除这些isbn值。
在注释第 3 行之后pagedResults,scan方法返回PageIterable对象。PageIterable 的 stream 方法返回一个 java.util.Stream
从注释行 4 开始,该示例演示访问 ProductCatalog 项目的两种变体。注释行 4a 之后的版本通过每个页面进行流式传输,并对每页上的项目进行排序和记录。注释行 4b 之后的版本会跳过页面迭代并直接访问项目。
由于 PageIterable 接口有两个父接口(java.lang.IterableSdkIterableIterable 引入了 forEach、iterator 和 spliterator 方法,而 SdkIterable 引入了 stream 方法。
public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }
使用异步 API
异步 scan 方法将结果作为 PagePublisher 对象返回。PagePublisher 接口有两种 subscribe 方法可用于处理响应页面。一种 subscribe 方法来自 org.reactivestreams.Publisher 父接口。要使用第一个选项处理页面,请向 subscribe 方法传递一个 Subscriber 实例。接下来的第一个示例演示了 subscribe 方法的用法。
第二种subscribe方法来自接SdkPublishersubscribe 接受 ConsumerSubscriber。此 subscribe 方法变体如接下来的第二个示例所示。
以下示例演示 scan 方法的异步版本,该版本使用了上一个示例中的相同筛选表达式。
在注释行 3 之后,DynamoDbAsyncTable.scan 返回一个 PagePublisher 对象。在下一行,代码创建一个 org.reactivestreams.Subscriber 接口实例 ProductCatalogSubscriber,在注释行 4 之后,该实例订阅到 PagePublisher。
在 ProductCatalogSubscriber 类示例的注释行 8 之后,Subscriber 对象从 onNext 方法中的每个页面收集 ProductCatalog 项目。这些项目存储在私有 List 变量中,并在调用代码中使用 ProductCatalogSubscriber.getSubscribedItems() 方法对它们进行访问。这是在注释行 5 之后调用的。
检索了列表后,代码按价格对所有 ProductCatalog 项目进行排序并记录每个项目。
ProductCatalogSubscriber类CountDownLatch
public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }
以下代码段示例使用的 PagePublisher.subscribe 方法版本在注释行 6 之后接受 Consumer。Java lambda 参数使用页面,进一步处理每个项目。在此示例中,对每个页面进行处理,并对每页上的项目进行排序和记录。
// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
PagePublisher 的 items 方法对模型实例进行解包,以便您的代码可以直接处理这些项目。以下代码段演示了这种方法。
// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
查询表
您可以使用 DynamoDB 增强版客户端来查询您的表并检索多个符合特定条件的项目。该query()@DynamoDbPartitionKey和可选@DynamoDbSortKey注解,根据主键值查找项目。
该query()方法需要分区键值,并且可以选择接受排序键条件以进一步优化结果。与 scan API 一样,PageIterable对于同步调用,查询会返回 a,PagePublisher对于异步调用,则返回 a。
Query 方法示例
下面的 query() 方法代码示例使用 MovieActor 类。数据类定义了一个复合主键,该主键由作为分区键的movie属性以及作为排序键的actor属性组成。
package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }
对以下项目进行查询之后的代码示例。
MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}
以下代码定义了两个QueryConditional实例:keyEqual(在注释行 1 之后)和sortGreaterThanOrEqualTo(在注释行 1a 之后)。
按分区键查询项目
该keyEqual实例匹配分区键值为的项目movie01。
此示例还在注释行 2 之后定义了一个筛选表达式,用于过滤掉任何没有actingschoolname值的项目。
QueryEnhancedRequest组合了查询的关键条件和筛选表达式。
public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query using the "keyEqual" conditional and filter expression. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );
例 — 使用条件keyEqual查询输出
下面是运行该方法的输出。该输出显示 movieName 值为 movie01 的项目,不显示 actingSchoolName 等于 null 的项目。
2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}
按分区键和排序键查询项目
通过为sortGreaterThanOrEqualToQueryConditional大于或等于 actor2 的值添加排序键条件来细化分区键匹配 (movie01)。
QueryConditional以开头的@@ 方法sort需要分区键值才能匹配,并通过基于排序键值的比较来进一步完善查询。 Sort在方法名称中并不意味着结果已排序,而是将使用排序键值进行比较。
在以下代码段中,我们更改了之前在注释行 3 之后显示的查询请求。此片段将 “keyEqual” 条件查询替换为注释行 1a 之后定义的 sortGreaterThan OrEqualTo “” 查询条件。以下代码还删除了筛选表达式。
QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo).build();
例 — 使用条件sortGreaterThanOrEqualTo查询输出
以下输出显示了查询的结果。该查询仅返回 movieName 值等于 movie01 且 actorName 值大于或等于 actor2 的项目。由于我们移除了过滤器,因此查询返回的项目没有该actingSchoolName属性值。
2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}