扫描表和索引:Java - Amazon DynamoDB
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

扫描表和索引:Java

操作读取 Scan 中表或索引中的所有项目。Amazon DynamoDB

以下是使用 AWS SDK for Java 文档 API 扫描表的步骤。

  1. 创建 AmazonDynamoDB 类的实例。

  2. 创建 ScanRequest 类的实例并提供扫描参数。

    唯一必需的参数是表名称。

  3. 运行 scan 方法并提供您在上一步中创建的 ScanRequest 对象。

以下 Reply 表存储论坛话题的回复。

Reply ( Id, ReplyDateTime, Message, PostedBy )

该表维护各种论坛话题的所有回复。因此,主键由 Id(分区键)和 ReplyDateTime(排序键)这两者组成。以下 Java 代码示例扫描整个表。实例指定要扫描的表的名称。ScanRequest

AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); ScanRequest scanRequest = new ScanRequest() .withTableName("Reply"); ScanResult result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); }

指定可选参数

scan 方法支持多个可选参数。例如,您可以选择使用筛选表达式筛选扫描结果。在筛选条件表达式中,您可以指定条件和属性名称以及要对其计算条件的值。有关更多信息,请参阅Scan

以下 Java 示例扫描 ProductCatalog 表以查找定价低于 0 的项目。该示例指定了以下可选参数:

  • 一个筛选表达式,用于仅检索定价低于 0 的项目(错误条件)。

  • 要为查询结果中的项目检索的属性的列表。

Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>(); expressionAttributeValues.put(":val", new AttributeValue().withN("0")); ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withFilterExpression("Price < :val") .withProjectionExpression("Id") .withExpressionAttributeValues(expressionAttributeValues); ScanResult result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()) { printItem(item); }

您还可以选择使用扫描请求的 withLimit 方法,选择性地限制页面大小或每页的项目数。每次运行 scan 方法时,您都会获得一页结果,其中包含指定数量的项目。要提取下一页,您可以通过提供上一页中最后一个项目的主键值来再次运行 scan 方法,以便 scan 方法可以返回下一组项目。您可使用 withExclusiveStartKey 方法在请求中提供此信息。最初,此方法的参数可以为空。要检索后面的页面,您必须将此属性值更新为上一页中最后一个项目的主键。

以下 Java 代码示例扫描 ProductCatalog 表。在请求中,使用 withLimitwithExclusiveStartKey 方法。循环会不断执行一次扫描一页的操作,直到结果的 do/while 方法返回空值。getLastEvaluatedKey

Map<String, AttributeValue> lastKeyEvaluated = null; do { ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withLimit(10) .withExclusiveStartKey(lastKeyEvaluated); ScanResult result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); } lastKeyEvaluated = result.getLastEvaluatedKey(); } while (lastKeyEvaluated != null);

示例 – 使用 Java 进行扫描

以下 Java 代码示例提供了一个工作示例,该示例扫描 ProductCatalog 表以查找定价低于 100 的项目。

注意

适用于 Java 的开发工具包还提供一个对象持久化模型,可用来将客户端类映射到 DynamoDB 表。该方法可以减少您必须编写的代码数量。有关更多信息,请参阅AWS 开发工具包: DynamoDBMapper

注意

此代码示例假定您已按照 为 DynamoDB 中的代码示例创建表和加载数据部分的说明,为您的账户将数据加载到 DynamoDB 中。

有关运行以下示例的分步说明,请参阅 Java 代码示例

/** * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * This file is licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. A copy of * the License is located at * * http://aws.amazon.com/apache2.0/ * * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR * CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package com.amazonaws.codesamples.document; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; public class DocumentAPIScan { static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); static String tableName = "ProductCatalog"; public static void main(String[] args) throws Exception { findProductsForPriceLessThanOneHundred(); } private static void findProductsForPriceLessThanOneHundred() { Table table = dynamoDB.getTable(tableName); Map<String, Object> expressionAttributeValues = new HashMap<String, Object>(); expressionAttributeValues.put(":pr", 100); ItemCollection<ScanOutcome> items = table.scan("Price < :pr", // FilterExpression "Id, Title, ProductCategory, Price", // ProjectionExpression null, // ExpressionAttributeNames - not used in this example expressionAttributeValues); System.out.println("Scan of " + tableName + " for items with a price less than 100."); Iterator<Item> iterator = items.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next().toJSONPretty()); } } }

示例 - 使用 Java 的并行扫描

以下 Java 代码示例演示了并行扫描。程序删除并重新创建名为 ParallelScanTest 的表,然后加载包含数据的表。数据加载完成后,程序将生成多个线程并发出并行 Scan 请求。程序为每个并行请求打印运行时统计数据。

注意

适用于 Java 的开发工具包还提供一个对象持久化模型,可用来将客户端类映射到 DynamoDB 表。该方法可以减少您必须编写的代码数量。有关更多信息,请参阅AWS 开发工具包: DynamoDBMapper

注意

此代码示例假定您已按照 为 DynamoDB 中的代码示例创建表和加载数据部分的说明,为您的账户将数据加载到 DynamoDB 中。

有关运行以下示例的分步说明,请参阅 Java 代码示例

/** * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * This file is licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. A copy of * the License is located at * * http://aws.amazon.com/apache2.0/ * * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR * CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package com.amazonaws.codesamples.document; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; public class DocumentAPIParallelScan { // total number of sample items static int scanItemCount = 300; // number of items each scan request should return static int scanItemLimit = 10; // number of logical segments for parallel scan static int parallelScanThreads = 16; // table that will be used for scanning static String parallelScanTestTableName = "ParallelScanTest"; static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); public static void main(String[] args) throws Exception { try { // Clean up the table deleteTable(parallelScanTestTableName); createTable(parallelScanTestTableName, 10L, 5L, "Id", "N"); // Upload sample data for scan uploadSampleProducts(parallelScanTestTableName, scanItemCount); // Scan the table using multiple threads parallelScan(parallelScanTestTableName, scanItemLimit, parallelScanThreads); } catch (AmazonServiceException ase) { System.err.println(ase.getMessage()); } } private static void parallelScan(String tableName, int itemLimit, int numberOfThreads) { System.out.println( "Scanning " + tableName + " using " + numberOfThreads + " threads " + itemLimit + " items at a time"); ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); // Divide DynamoDB table into logical segments // Create one task for scanning each segment // Each thread will be scanning one segment int totalSegments = numberOfThreads; for (int segment = 0; segment < totalSegments; segment++) { // Runnable task that will only scan one segment ScanSegmentTask task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment); // Execute the task executor.execute(task); } shutDownExecutorService(executor); } // Runnable task for scanning a single segment of a DynamoDB table private static class ScanSegmentTask implements Runnable { // DynamoDB table to scan private String tableName; // number of items each scan request should return private int itemLimit; // Total number of segments // Equals to total number of threads scanning the table in parallel private int totalSegments; // Segment that will be scanned with by this task private int segment; public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment) { this.tableName = tableName; this.itemLimit = itemLimit; this.totalSegments = totalSegments; this.segment = segment; } @Override public void run() { System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time..."); int totalScannedItemCount = 0; Table table = dynamoDB.getTable(tableName); try { ScanSpec spec = new ScanSpec().withMaxResultSize(itemLimit).withTotalSegments(totalSegments) .withSegment(segment); ItemCollection<ScanOutcome> items = table.scan(spec); Iterator<Item> iterator = items.iterator(); Item currentItem = null; while (iterator.hasNext()) { totalScannedItemCount++; currentItem = iterator.next(); System.out.println(currentItem.toString()); } } catch (Exception e) { System.err.println(e.getMessage()); } finally { System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName); } } } private static void uploadSampleProducts(String tableName, int itemCount) { System.out.println("Adding " + itemCount + " sample items to " + tableName); for (int productIndex = 0; productIndex < itemCount; productIndex++) { uploadProduct(tableName, productIndex); } } private static void uploadProduct(String tableName, int productIndex) { Table table = dynamoDB.getTable(tableName); try { System.out.println("Processing record #" + productIndex); Item item = new Item().withPrimaryKey("Id", productIndex) .withString("Title", "Book " + productIndex + " Title").withString("ISBN", "111-1111111111") .withStringSet("Authors", new HashSet<String>(Arrays.asList("Author1"))).withNumber("Price", 2) .withString("Dimensions", "8.5 x 11.0 x 0.5").withNumber("PageCount", 500) .withBoolean("InPublication", true).withString("ProductCategory", "Book"); table.putItem(item); } catch (Exception e) { System.err.println("Failed to create item " + productIndex + " in " + tableName); System.err.println(e.getMessage()); } } private static void deleteTable(String tableName) { try { Table table = dynamoDB.getTable(tableName); table.delete(); System.out.println("Waiting for " + tableName + " to be deleted...this may take a while..."); table.waitForDelete(); } catch (Exception e) { System.err.println("Failed to delete table " + tableName); e.printStackTrace(System.err); } } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType) { createTable(tableName, readCapacityUnits, writeCapacityUnits, partitionKeyName, partitionKeyType, null, null); } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType, String sortKeyName, String sortKeyType) { try { System.out.println("Creating table " + tableName); List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName(partitionKeyName).withKeyType(KeyType.HASH)); // Partition // key List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions .add(new AttributeDefinition().withAttributeName(partitionKeyName).withAttributeType(partitionKeyType)); if (sortKeyName != null) { keySchema.add(new KeySchemaElement().withAttributeName(sortKeyName).withKeyType(KeyType.RANGE)); // Sort // key attributeDefinitions .add(new AttributeDefinition().withAttributeName(sortKeyName).withAttributeType(sortKeyType)); } Table table = dynamoDB.createTable(tableName, keySchema, attributeDefinitions, new ProvisionedThroughput() .withReadCapacityUnits(readCapacityUnits).withWriteCapacityUnits(writeCapacityUnits)); System.out.println("Waiting for " + tableName + " to be created...this may take a while..."); table.waitForActive(); } catch (Exception e) { System.err.println("Failed to create table " + tableName); e.printStackTrace(System.err); } } private static void shutDownExecutorService(ExecutorService executor) { executor.shutdown(); try { if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } }