执行批量操作 - Amazon SDK for Java 2.x
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

执行批量操作

DynamoDB 增强型客户端 API 提供两种批处理方法:batchGetItem()batchWriteItem()

batchGetItem() 示例

使用 DynamoDbEnhancedClient.batchGetItem() 方法,您可以在一个总请求中检索多个表中最多 100 个单独的项目。以下示例使用前面显示的 CustomerMovieActor 数据类。

在示例的注释行 1 和 2 之后,您将构建 ReadBatch 对象,然后在注释行 3 之后将其作为参数添加到 batchGetItem() 方法中。注释行 1 之后的代码生成要从 Customer 表中读取的批次。注释行 1a 之后的代码显示了如何使用 GetItemEnhancedRequest 生成器,该生成器采用主键值来指定要读取的项目。与指定键值来请求项目不同,您可以使用数据类来请求项目,如注释行 1b 后所示。提交请求之前,SDK 会在后台提取键值。

如 2a 之后的两个语句所示,当您使用基于键的方法指定项目时,您还可以指定 DynamoDB 应执行强一致性读取。使用 consistentRead() 方法时,必须对同一个表的所有请求项目使用该方法。

要检索 DynamoDB 找到的项目,请使用注释行 4 之后显示的 resultsForTable() 方法。为请求中读取的每个表调用该方法。resultsForTable() 返回可使用任何 java.util.List 方法处理的已找到项目的列表。此示例记录每个项目。

要发现 DynamoDB 未处理的项目,请使用注释行 5 之后的方法。BatchGetResultPage 类具有允许您访问每个未处理键的 unprocessedKeysForTable() 方法。BatchGetItem API 参考提供了有关导致项目未处理的情况的更多信息。

public static void batchGetItemExample(DynamoDbEnhancedClient enhancedClient, DynamoDbTable<Customer> customerTable, DynamoDbTable<MovieActor> movieActorTable) { Customer customer2 = new Customer(); customer2.setId("2"); customer2.setEmail("cust2@example.org"); // 1. Build a batch to read from the Customer table. ReadBatch customerBatch = ReadBatch.builder(Customer.class) .mappedTableResource(customerTable) // 1a. Specify the primary key values for the item. .addGetItem(b -> b.key(k -> k.partitionValue("1").sortValue("cust1@orgname.org"))) // 1b. Alternatively, supply a data class instances to provide the primary key values. .addGetItem(customer2) .build(); // 2. Build a batch to read from the MovieActor table. ReadBatch moveActorBatch = ReadBatch.builder(MovieActor.class) .mappedTableResource(movieActorTable) // 2a. Call consistentRead(Boolean.TRUE) for each item for the same table. .addGetItem(b -> b.key(k -> k.partitionValue("movie01").sortValue("actor1")).consistentRead(Boolean.TRUE)) .addGetItem(b -> b.key(k -> k.partitionValue("movie01").sortValue("actor4")).consistentRead(Boolean.TRUE)) .build(); // 3. Add ReadBatch objects to the request. BatchGetResultPageIterable resultPages = enhancedClient.batchGetItem(b -> b.readBatches(customerBatch, moveActorBatch)); // 4. Retrieve the successfully requested items from each table. resultPages.resultsForTable(customerTable).forEach(item -> logger.info(item.toString())); resultPages.resultsForTable(movieActorTable).forEach(item -> logger.info(item.toString())); // 5. Retrieve the keys of the items requested but not processed by the service. resultPages.forEach((BatchGetResultPage pageResult) -> { pageResult.unprocessedKeysForTable(customerTable).forEach(key -> logger.info("Unprocessed item key: " + key.toString())); pageResult.unprocessedKeysForTable(customerTable).forEach(key -> logger.info("Unprocessed item key: " + key.toString())); }); }

在运行示例代码之前,假设两个表中包含以下项目。

Customer [id=1, name=CustName1, email=cust1@example.org, regDate=2023-03-31T15:46:27.688Z] Customer [id=2, name=CustName2, email=cust2@example.org, regDate=2023-03-31T15:46:28.688Z] Customer [id=3, name=CustName3, email=cust3@example.org, regDate=2023-03-31T15:46:29.688Z] Customer [id=4, name=CustName4, email=cust4@example.org, regDate=2023-03-31T15:46:30.688Z] Customer [id=5, name=CustName5, email=cust5@example.org, regDate=2023-03-31T15:46:31.689Z] MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}

以下输出显示了在注释行 4 之后返回和记录的项目。

Customer [id=1, name=CustName1, email=cust1@example.org, regDate=2023-03-31T15:46:27.688Z] Customer [id=2, name=CustName2, email=cust2@example.org, regDate=2023-03-31T15:46:28.688Z] MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'}

batchWriteItem() 示例

batchWriteItem() 方法在一个或多个表中放置或删除多个项目。您最多可以在请求中指定 25 个单独的放置或删除操作。以下示例使用前面显示的 ProductCatalogMovieActor 模型类。

WriteBatch 对象是在注释行 1 和 2 之后生成的。对于 ProductCatalog 表,代码放置一个项目并删除一个项目。对于注释行 2 之后的 MovieActor 表,代码放置了两个项目并删除了一个项目。

在注释行 3 之后调用 batchWriteItem 方法。builder 参数提供每个表的批处理请求。

返回的 BatchWriteResult 对象为每个操作提供了不同的方法来查看未处理的请求。注释行 4a 之后的代码为未处理的删除请求提供了键,注释行 4b 之后的代码提供了未处理的放置项目。

public static void batchWriteItemExample(DynamoDbEnhancedClient enhancedClient, DynamoDbTable<ProductCatalog> catalogTable, DynamoDbTable<MovieActor> movieActorTable) { // 1. Build a batch to write to the ProductCatalog table. WriteBatch products = WriteBatch.builder(ProductCatalog.class) .mappedTableResource(catalogTable) .addPutItem(b -> b.item(getProductCatItem1())) .addDeleteItem(b -> b.key(k -> k .partitionValue(getProductCatItem2().id()) .sortValue(getProductCatItem2().title()))) .build(); // 2. Build a batch to write to the MovieActor table. WriteBatch movies = WriteBatch.builder(MovieActor.class) .mappedTableResource(movieActorTable) .addPutItem(getMovieActorYeoh()) .addPutItem(getMovieActorBlanchettPartial()) .addDeleteItem(b -> b.key(k -> k .partitionValue(getMovieActorStreep().getMovieName()) .sortValue(getMovieActorStreep().getActorName()))) .build(); // 3. Add WriteBatch objects to the request. BatchWriteResult batchWriteResult = enhancedClient.batchWriteItem(b -> b.writeBatches(products, movies)); // 4. Retrieve keys for items the service did not process. // 4a. 'unprocessedDeleteItemsForTable()' returns keys for delete requests that did not process. if (batchWriteResult.unprocessedDeleteItemsForTable(movieActorTable).size() > 0) { batchWriteResult.unprocessedDeleteItemsForTable(movieActorTable).forEach(key -> logger.info(key.toString())); } // 4b. 'unprocessedPutItemsForTable()' returns keys for put requests that did not process. if (batchWriteResult.unprocessedPutItemsForTable(catalogTable).size() > 0) { batchWriteResult.unprocessedPutItemsForTable(catalogTable).forEach(key -> logger.info(key.toString())); } }

以下帮助程序方法为放置和删除操作提供了模型对象。

public static ProductCatalog getProductCatItem1() { return ProductCatalog.builder() .id(2) .isbn("1-565-85698") .authors(new HashSet<>(Arrays.asList("a", "b"))) .price(BigDecimal.valueOf(30.22)) .title("Title 55") .build(); } public static ProductCatalog getProductCatItem2() { return ProductCatalog.builder() .id(4) .price(BigDecimal.valueOf(40.00)) .title("Title 1") .build(); } public static MovieActor getMovieActorBlanchettPartial() { MovieActor movieActor = new MovieActor(); movieActor.setActorName("Cate Blanchett"); movieActor.setMovieName("Blue Jasmine"); movieActor.setActingYear(2023); movieActor.setActingAward("Best Actress"); return movieActor; } public static MovieActor getMovieActorStreep() { MovieActor movieActor = new MovieActor(); movieActor.setActorName("Meryl Streep"); movieActor.setMovieName("Sophie's Choice"); movieActor.setActingYear(1982); movieActor.setActingAward("Best Actress"); movieActor.setActingSchoolName("Yale School of Drama"); return movieActor; } public static MovieActor getMovieActorYeoh(){ MovieActor movieActor = new MovieActor(); movieActor.setActorName("Michelle Yeoh"); movieActor.setMovieName("Everything Everywhere All at Once"); movieActor.setActingYear(2023); movieActor.setActingAward("Best Actress"); movieActor.setActingSchoolName("Royal Academy of Dance"); return movieActor; }

在运行示例代码之前,假设这些表包含以下项目。

MovieActor{movieName='Blue Jasmine', actorName='Cate Blanchett', actingAward='Best Actress', actingYear=2013, actingSchoolName='National Institute of Dramatic Art'} MovieActor{movieName='Sophie's Choice', actorName='Meryl Streep', actingAward='Best Actress', actingYear=1982, actingSchoolName='Yale School of Drama'} ProductCatalog{id=4, title='Title 1', isbn='orig_isbn', authors=[b, g], price=10}

示例代码完成后,表中将包含以下项目。

MovieActor{movieName='Blue Jasmine', actorName='Cate Blanchett', actingAward='Best Actress', actingYear=2013, actingSchoolName='null'} MovieActor{movieName='Everything Everywhere All at Once', actorName='Michelle Yeoh', actingAward='Best Actress', actingYear=2023, actingSchoolName='Royal Academy of Dance'} ProductCatalog{id=2, title='Title 55', isbn='1-565-85698', authors=[a, b], price=30.22}

请注意,在 MovieActor 表中,Blue Jasmine 电影项目已被替换为通过 getMovieActorBlanchettPartial() 帮助程序方法获取的放置请求中使用的项目。如果未提供数据 Bean 属性值,则数据库中的值将被删除。这就是为什么 Blue Jasmine 影片项目的结果 actingSchoolName 为空的原因。

注意

尽管 API 文档表示可以使用条件表达式,并且可以在单独的放置删除请求中返回已消耗的容量和集合指标,但在批量写入场景中,情况并非如此。为了提高批处理操作的性能,将忽略这些单独的选项。