Amazon DynamoDB
开发人员指南 (API 版本 2012-08-10)
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

DynamoDB 流 低级 API:Java 示例


本页上的代码并不详尽,无法处理使用 DynamoDB 流的所有场景。建议利用 Kinesis Client Library (KCL) 通过 Kinesis 适配器使用 DynamoDB 中的流记录,如使用 DynamoDB 流 Kinesis 适配器处理流记录中所述。

本节包含一个演示 DynamoDB 流 的实际运用的 Java 程序。此程序执行以下操作:

  1. 创建一个启用了流的 DynamoDB 表。

  2. 描述此表的流设置。

  3. 修改表中的数据。

  4. 描述流中的分区。

  5. 从分区中读取流记录。

  6. 清除。


Issuing CreateTable request for TestTableForStreams Waiting for TestTableForStreams to be created... Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208 Stream enabled: true Update view type: NEW_AND_OLD_IMAGES Performing write activities on TestTableForStreams Processing item 1 of 100 Processing item 2 of 100 Processing item 3 of 100 ... Processing item 100 of 100 Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 01234567890...,},} Shard iterator: EjYFEkX2a26eVTWe... ApproximateCreationDateTime: Tue Mar 20 09:50:00 PDT 2018,Keys: {Id={N: 1,}},NewImage: {Message={S: New item!,}, Id={N: 1,}},SequenceNumber: 100000000003218256368,SizeBytes: 24,StreamViewType: NEW_AND_OLD_IMAGES} {ApproximateCreationDateTime: Tue Mar 20 09:50:00 PDT 2018,Keys: {Id={N: 1,}},NewImage: {Message={S: This item has changed,}, Id={N: 1,}},OldImage: {Message={S: New item!,}, Id={N: 1,}},SequenceNumber: 200000000003218256412,SizeBytes: 56,StreamViewType: NEW_AND_OLD_IMAGES} {ApproximateCreationDateTime: Tue Mar 20 09:50:00 PDT 2018,Keys: {Id={N: 1,}},OldImage: {Message={S: This item has changed,}, Id={N: 1,}},SequenceNumber: 300000000003218256413,SizeBytes: 36,StreamViewType: NEW_AND_OLD_IMAGES} ... Deleting the table... Demo complete

// Copyright 2012-2015, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.Regions; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; import; public class StreamsLowLevelDemo { public static void main(String args[]) throws InterruptedException { AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder .standard() .withRegion(Regions.US_EAST_2) .withCredentials(new DefaultAWSCredentialsProviderChain()) .build(); AmazonDynamoDBStreams streamsClient = AmazonDynamoDBStreamsClientBuilder .standard() .withRegion(Regions.US_EAST_2) .withCredentials(new DefaultAWSCredentialsProviderChain()) .build(); // Create a table, with a stream enabled String tableName = "TestTableForStreams"; ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<>( Arrays.asList(new AttributeDefinition() .withAttributeName("Id") .withAttributeType("N"))); ArrayList<KeySchemaElement> keySchema = new ArrayList<>( Arrays.asList(new KeySchemaElement() .withAttributeName("Id") .withKeyType(KeyType.HASH))); // Partition key StreamSpecification streamSpecification = new StreamSpecification() .withStreamEnabled(true) .withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withKeySchema(keySchema).withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits(10L) .withWriteCapacityUnits(10L)) .withStreamSpecification(streamSpecification); System.out.println("Issuing CreateTable request for " + tableName); dynamoDBClient.createTable(createTableRequest); System.out.println("Waiting for " + tableName + " to be created..."); try { TableUtils.waitUntilActive(dynamoDBClient, tableName); } catch (AmazonClientException e) { e.printStackTrace(); } // Print the stream settings for the table DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String streamArn = describeTableResult.getTable().getLatestStreamArn(); System.out.println("Current stream ARN for " + tableName + ": " + describeTableResult.getTable().getLatestStreamArn()); StreamSpecification streamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Stream enabled: " + streamSpec.getStreamEnabled()); System.out.println("Update view type: " + streamSpec.getStreamViewType()); System.out.println(); // Generate write activity in the table System.out.println("Performing write activities on " + tableName); int maxItemCount = 100; for (Integer i = 1; i <= maxItemCount; i++) { System.out.println("Processing item " + i + " of " + maxItemCount); // Write a new item Map<String, AttributeValue> item = new HashMap<>(); item.put("Id", new AttributeValue().withN(i.toString())); item.put("Message", new AttributeValue().withS("New item!")); dynamoDBClient.putItem(tableName, item); // Update the item Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", new AttributeValue().withN(i.toString())); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<>(); attributeUpdates.put("Message", new AttributeValueUpdate() .withAction(AttributeAction.PUT) .withValue(new AttributeValue() .withS("This item has changed"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); // Delete the item dynamoDBClient.deleteItem(tableName, key); } // Get all the shard IDs from the stream. Note that DescribeStream returns // the shard IDs one page at a time. String lastEvaluatedShardId = null; do { DescribeStreamResult describeStreamResult = streamsClient.describeStream( new DescribeStreamRequest() .withStreamArn(streamArn) .withExclusiveStartShardId(lastEvaluatedShardId)); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); // Process each shard on this page for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println("Shard: " + shard); // Get an iterator for the current shard GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamArn(streamArn) .withShardId(shardId) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String currentShardIter = getShardIteratorResult.getShardIterator(); // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY). // To prevent running the loop until the Shard is sealed, which will be on average // 4 hours, we process only the items that were written into DynamoDB and then exit. int processedRecordCount = 0; while (currentShardIter != null && processedRecordCount < maxItemCount) { System.out.println(" Shard iterator: " + currentShardIter.substring(380)); // Use the shard iterator to read the stream records GetRecordsResult getRecordsResult = streamsClient.getRecords(new GetRecordsRequest() .withShardIterator(currentShardIter)); List<Record> records = getRecordsResult.getRecords(); for (Record record : records) { System.out.println(" " + record.getDynamodb()); } processedRecordCount += records.size(); currentShardIter = getRecordsResult.getNextShardIterator(); } } // If LastEvaluatedShardId is set, then there is // at least one more page of shard IDs to retrieve lastEvaluatedShardId = describeStreamResult.getStreamDescription().getLastEvaluatedShardId(); } while (lastEvaluatedShardId != null); // Delete the table System.out.println("Deleting the table..."); dynamoDBClient.deleteTable(tableName); System.out.println("Demo complete"); } }