扫描表和索引:Java - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

扫描表和索引:Java

Scan 操作读取 Amazon DynamoDB 中表或索引中的所有项目。

以下是使用 Amazon SDK for Java 文档 API 扫描表格的步骤。

  1. 创建 AmazonDynamoDB 类的实例。

  2. 创建 ScanRequest 类的实例,并提供扫描参数。

    唯一需要的参数是表名称。

  3. 执行 scan 方法并提供您在之前步骤中创建的 ScanRequest 对象。

以下 Reply 表存储论坛主题的回复。

Reply ( Id, ReplyDateTime, Message, PostedBy )

该表保存各种论坛主题的所有回复。因此,主键由 Id(分区键)和 ReplyDateTime(排序键)这两者组成。以下 Java 代码示例扫描整个表。ScanRequest 实例指定要扫描的表的名称。

AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); ScanRequest scanRequest = new ScanRequest() .withTableName("Reply"); ScanResponse result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); }

指定可选参数

scan 方法支持多个可选参数。例如,您可以选择使用筛选器表达式筛选扫描结果。在筛选器表达式中,您可以指定条件,以及计算条件的属性名称和值。有关更多信息,请参阅 Scan

以下 Java 示例扫描 ProductCatalog 表查找定价小于 0 的项目。该示例指定了以下可选参数:

  • 筛选表达式,仅检索价格小于 0 的项目(错误条件)。

  • 为查询结果中的项目检索的属性列表。

Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>(); expressionAttributeValues.put(":val", new AttributeValue().withN("0")); ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withFilterExpression("Price < :val") .withProjectionExpression("Id") .withExpressionAttributeValues(expressionAttributeValues); ScanResponse result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()) { printItem(item); }

您还可以选择使用扫描请求的 withLimit 方法来限制页大小或每页的项目数。每次执行 scan 方法时,您都会获得一页结果,其中包含指定数量的项目。要抓取下一页结果,您可以提供上一页中最后一个项目的主键值再次执行 scan 方法,这样 scan 方法就会返回下一组项目。可以使用 withExclusiveStartKey 方法在请求中提供此信息。最初,此方法的参数可以为空。要检索后面的页面,您必须将此属性值更新为上一页中最后一个项目的主键。

以下 Java 代码示例扫描 ProductCatalog 表。在请求中,使用 withLimitwithExclusiveStartKey 方法。do/while 循环将不断执行一次扫描一页的操作,直到 getLastEvaluatedKey 方法结果返回空值。

Map<String, AttributeValue> lastKeyEvaluated = null; do { ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withLimit(10) .withExclusiveStartKey(lastKeyEvaluated); ScanResponse result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); } lastKeyEvaluated = result.getLastEvaluatedKey(); } while (lastKeyEvaluated != null);

示例 – 使用 Java 进行扫描

以下 Java 代码示例提供了一个工作示例,该示例将扫描 ProductCatalog 表查找定价小于 100 的项目。

注意

SDK for Java 还提供一个对象持久化模型,可用来将客户端类映射到 DynamoDB 表。该方法可以减少需要编写的代码数量。有关更多信息,请参阅 Java 1.x:DynamoDBMapper

注意

此代码示例假定您已将数据加载到您的帐户的 DynamoDB 中,方法是按照 为 DynamoDB 中的代码示例创建表和加载数据 部分。

有关运行以下示例的 step-by-step 说明,请参阅Java 代码示例

package com.amazonaws.codesamples.document; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; public class DocumentAPIScan { static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); static String tableName = "ProductCatalog"; public static void main(String[] args) throws Exception { findProductsForPriceLessThanOneHundred(); } private static void findProductsForPriceLessThanOneHundred() { Table table = dynamoDB.getTable(tableName); Map<String, Object> expressionAttributeValues = new HashMap<String, Object>(); expressionAttributeValues.put(":pr", 100); ItemCollection<ScanOutcome> items = table.scan("Price < :pr", // FilterExpression "Id, Title, ProductCategory, Price", // ProjectionExpression null, // ExpressionAttributeNames - not used in this example expressionAttributeValues); System.out.println("Scan of " + tableName + " for items with a price less than 100."); Iterator<Item> iterator = items.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next().toJSONPretty()); } } }

示例 – 使用 Java 进行并行扫描

以下 Java 代码示例演示了并行扫描的用法。程序删除并重新创建一个名为 ParallelScanTest 的表,然后加载数据。当数据加载完成时,程序会生成多个线程,发出并行 Scan 请求。该程序会输出每个并行请求的运行时统计信息。

注意

SDK for Java 还提供一个对象持久化模型,可用来将客户端类映射到 DynamoDB 表。该方法可以减少需要编写的代码数量。有关更多信息,请参阅 Java 1.x:DynamoDBMapper

注意

此代码示例假定您已将数据加载到您的帐户的 DynamoDB 中,方法是按照 为 DynamoDB 中的代码示例创建表和加载数据 部分。

有关运行以下示例的 step-by-step 说明,请参阅Java 代码示例

package com.amazonaws.codesamples.document; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; public class DocumentAPIParallelScan { // total number of sample items static int scanItemCount = 300; // number of items each scan request should return static int scanItemLimit = 10; // number of logical segments for parallel scan static int parallelScanThreads = 16; // table that will be used for scanning static String parallelScanTestTableName = "ParallelScanTest"; static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); public static void main(String[] args) throws Exception { try { // Clean up the table deleteTable(parallelScanTestTableName); createTable(parallelScanTestTableName, 10L, 5L, "Id", "N"); // Upload sample data for scan uploadSampleProducts(parallelScanTestTableName, scanItemCount); // Scan the table using multiple threads parallelScan(parallelScanTestTableName, scanItemLimit, parallelScanThreads); } catch (AmazonServiceException ase) { System.err.println(ase.getMessage()); } } private static void parallelScan(String tableName, int itemLimit, int numberOfThreads) { System.out.println( "Scanning " + tableName + " using " + numberOfThreads + " threads " + itemLimit + " items at a time"); ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); // Divide DynamoDB table into logical segments // Create one task for scanning each segment // Each thread will be scanning one segment int totalSegments = numberOfThreads; for (int segment = 0; segment < totalSegments; segment++) { // Runnable task that will only scan one segment ScanSegmentTask task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment); // Execute the task executor.execute(task); } shutDownExecutorService(executor); } // Runnable task for scanning a single segment of a DynamoDB table private static class ScanSegmentTask implements Runnable { // DynamoDB table to scan private String tableName; // number of items each scan request should return private int itemLimit; // Total number of segments // Equals to total number of threads scanning the table in parallel private int totalSegments; // Segment that will be scanned with by this task private int segment; public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment) { this.tableName = tableName; this.itemLimit = itemLimit; this.totalSegments = totalSegments; this.segment = segment; } @Override public void run() { System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time..."); int totalScannedItemCount = 0; Table table = dynamoDB.getTable(tableName); try { ScanSpec spec = new ScanSpec().withMaxResultSize(itemLimit).withTotalSegments(totalSegments) .withSegment(segment); ItemCollection<ScanOutcome> items = table.scan(spec); Iterator<Item> iterator = items.iterator(); Item currentItem = null; while (iterator.hasNext()) { totalScannedItemCount++; currentItem = iterator.next(); System.out.println(currentItem.toString()); } } catch (Exception e) { System.err.println(e.getMessage()); } finally { System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName); } } } private static void uploadSampleProducts(String tableName, int itemCount) { System.out.println("Adding " + itemCount + " sample items to " + tableName); for (int productIndex = 0; productIndex < itemCount; productIndex++) { uploadProduct(tableName, productIndex); } } private static void uploadProduct(String tableName, int productIndex) { Table table = dynamoDB.getTable(tableName); try { System.out.println("Processing record #" + productIndex); Item item = new Item().withPrimaryKey("Id", productIndex) .withString("Title", "Book " + productIndex + " Title").withString("ISBN", "111-1111111111") .withStringSet("Authors", new HashSet<String>(Arrays.asList("Author1"))).withNumber("Price", 2) .withString("Dimensions", "8.5 x 11.0 x 0.5").withNumber("PageCount", 500) .withBoolean("InPublication", true).withString("ProductCategory", "Book"); table.putItem(item); } catch (Exception e) { System.err.println("Failed to create item " + productIndex + " in " + tableName); System.err.println(e.getMessage()); } } private static void deleteTable(String tableName) { try { Table table = dynamoDB.getTable(tableName); table.delete(); System.out.println("Waiting for " + tableName + " to be deleted...this may take a while..."); table.waitForDelete(); } catch (Exception e) { System.err.println("Failed to delete table " + tableName); e.printStackTrace(System.err); } } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType) { createTable(tableName, readCapacityUnits, writeCapacityUnits, partitionKeyName, partitionKeyType, null, null); } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType, String sortKeyName, String sortKeyType) { try { System.out.println("Creating table " + tableName); List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName(partitionKeyName).withKeyType(KeyType.HASH)); // Partition // key List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions .add(new AttributeDefinition().withAttributeName(partitionKeyName) .withAttributeType(partitionKeyType)); if (sortKeyName != null) { keySchema.add(new KeySchemaElement().withAttributeName(sortKeyName).withKeyType(KeyType.RANGE)); // Sort // key attributeDefinitions .add(new AttributeDefinition().withAttributeName(sortKeyName).withAttributeType(sortKeyType)); } Table table = dynamoDB.createTable(tableName, keySchema, attributeDefinitions, new ProvisionedThroughput() .withReadCapacityUnits(readCapacityUnits).withWriteCapacityUnits(writeCapacityUnits)); System.out.println("Waiting for " + tableName + " to be created...this may take a while..."); table.waitForActive(); } catch (Exception e) { System.err.println("Failed to create table " + tableName); e.printStackTrace(System.err); } } private static void shutDownExecutorService(ExecutorService executor) { executor.shutdown(); try { if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } }