Work with paginated results: scans and queries
The scan
, query
and batch
methods of the
DynamoDB Enhanced Client API return responses with one or more pages. A page
contains one or more items. Your code can process the response on per-page basis or it can
process individual items.
A paginated response returned by the synchronous DynamoDbEnhancedClient
client returns a PageIterableDynamoDbEnhancedAsyncClient
returns a PagePublisher
This section looks at processing paginated results and provides examples that use the scan and query APIs.
Scan a table
The SDK's scan
First, we explore the PageIterable
interface by looking at the
scan
method of the synchronous mapping class, DynamoDbTable
Use the synchronous API
The following example shows the scan
method that uses an expression
The filtering expression shown after comment line 2 limits the
ProductCatalog
items that are returned to those with a price value between
8.00 and 80.00 inclusively.
This example also excludes the isbn
values by using the
attributesToProject
method shown after comment line 1.
After comment line 3, the PageIterable
object, pagedResults
,
is returned by the scan
method. The stream
method of
PageIterable
returns a java.util.Stream
Starting with comment line 4, the example shows two variations of accessing the
ProductCatalog
items. The version after comment line 4a streams through
each page and sorts and logs the items on each page. The version after comment line 4b
skips the page iteration and accesses the items directly.
The PageIterable
interface offers multiple ways to process results
because of its two parent interfaces—java.lang.Iterable
SdkIterable
Iterable
brings the
forEach
, iterator
and spliterator
methods, and
SdkIterable
brings the stream
method.
public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }
Use the asynchronous API
The asynchronous scan
method returns results as a
PagePublisher
object. The PagePublisher
interface has two
subscribe
methods that you can use to process response pages. One
subscribe
method comes from the org.reactivestreams.Publisher
parent interface. To process pages using this first option, pass a Subscriber
instance to the subscribe
method. The first
example that follows shows the use of subscribe
method.
The second subscribe
method comes from the SdkPublishersubscribe
accepts a Consumer
Subscriber
. This
subscribe
method variation is shown in the second example that
follows.
The following example shows the asynchronous version of the scan
method
that uses the same filter expression shown in the previous example.
After comment line 3, DynamoDbAsyncTable.scan
returns a
PagePublisher
object. On the next line, the code creates an instance of the
org.reactivestreams.Subscriber
interface,
ProductCatalogSubscriber
, which subscribes to the
PagePublisher
after comment line 4.
The Subscriber
object collects the ProductCatalog
items from
each page in the onNext
method after comment line 8 in the
ProductCatalogSubscriber
class example. The items are stored in the private
List
variable and are accessed in the calling code with the
ProductCatalogSubscriber.getSubscribedItems()
method. This is called after
comment line 5.
After the list is retrieved, the code sorts all ProductCatalog
items by
price and logs each item.
The CountDownLatchProductCatalogSubscriber
class blocks the
calling thread until all items have been added to the list before continuing after comment
line 5.
public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }
The following snippet example uses the version of the
PagePublisher.subscribe
method that accepts a Consumer
after
comment line 6. The Java lambda parameter consumes pages, which further process each item.
In this example, each page is processed and the items on each page are sorted and then
logged.
// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
The items
method of PagePublisher
unwraps the model
instances so that your code can process the items directly. This approach is shown in the
following snippet.
// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
Query a table
The query()
DynamoDbTable
class finds items
based on primary key values. The @DynamoDbPartitionKey
annotation and the
optional @DynamoDbSortKey
annotation are used to define the primary key on your
data class.
The query()
method requires a partition key value that finds items that
match the supplied value. If your table also defines a sort key, you can add a value for it
to your query as an additional comparison condition to fine tune the results.
Except for processing the results, the synchronous and asynchronous versions of
query()
work the same. As with the scan
API, the
query
API returns a PageIterable
for a synchronous call and a
PagePublisher
for asynchronous call. We discussed the use of
PageIterable
and PagePublisher
previously in the scan
section.
Query
method
examples
The query()
method code example that follows uses the
MovieActor
class. The data class defines a composite primary key that is
made up of the movie
attribute for the
partition key and the actor
attribute for
the sort key.
The class also signals that it uses a global secondary index named acting_award_year
. The index's composite primary key
is composed of the actingaward
attribute for
the partition key and the actingyear
for the
sort key. Later in this topic, when we show how to create and use indexes, we'll refer to
the
acting_award_year
index.
package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }
The code examples that follow query against the following items.
MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}
The following code defines two QueryConditionalQueryConditionals
work with key
values—either the partition key alone or in combination with the sort
key—and correspond to the key conditional expressionskeyEqual
instance that matches items with a partition
value of movie01
.
This example also defines a filter expression that filters off any item that has no
actingschoolname
on after comment line
2.
After comment line 3, the example shows the QueryEnhancedRequestDynamoDbTable.query()
method. This object combines the key condition and
filter that the SDK uses to generate the request to the DynamoDB service.
public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );
The following is the output from running the method. The output displays items with a
movieName
value of movie01 and displays no
items with actingSchoolName
equal to null
.
2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}
In the following query request variation shown previously after comment line 3, the
code replaces the keyEqual
QueryConditional
with the sortGreaterThanOrEqualTo
QueryConditional
that was defined after comment line 1a. The following code
also removes the filter expression.
QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo)
Because this table has a composite primary key, all QueryConditional
instances require a partition key value. QueryConditional
methods that begin
with sort...
indicate that a sort key is
required. The results are not sorted.
The following output displays the results from the query. The query returns items that
have a movieName
value equal to movie01 and
only items that have an actorName
value that is greater than or equal to
actor2. Because the filter was removed, the query
returns items that have no value for the actingSchoolName
attribute.
2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}