Amazon DynamoDB
开发人员指南 (API Version 2012-08-10)
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

完成程序:低级别 DynamoDB 流 API

以下是一个完整的 Java 程序,该程序执行此演练中描述的任务。运行该程序时,将显示每个完整的流记录:

Copy
Issuing CreateTable request for TestTableForStreams Waiting for TestTableForStreams to be created... Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-west-2:111122223333:table/TestTableForStreams/stream/2017-05-01T16:04:24.300 Stream enabled: true Update view type: NEW_AND_OLD_IMAGES Making some changes to table data Processing shardId-00000001493654674152-5245c47a from stream arn:aws:dynamodb:us-west-2:111122223333:table/TestTableForStreams/stream/2017-05-01T16:04:24.300 Getting records... {EventID: bb6acf1d94d80f579df1a8c05b8a7c8d,EventName: INSERT,EventVersion: 1.1,EventSource: aws:dynamodb,AwsRegion: us-west-2,Dynamodb: {ApproximateCreationDateTime: Mon May 01 09:04:00 PDT 2017,Keys: {Id={N: 101,}},NewImage: {Message={S: New item!,}, Id={N: 101,}},SequenceNumber: 100000000001134015798,SizeBytes: 26,StreamViewType: NEW_AND_OLD_IMAGES},} {EventID: ea9cebe39d92adbf23089a179215fa5f,EventName: MODIFY,EventVersion: 1.1,EventSource: aws:dynamodb,AwsRegion: us-west-2,Dynamodb: {ApproximateCreationDateTime: Mon May 01 09:04:00 PDT 2017,Keys: {Id={N: 101,}},NewImage: {Message={S: This item has changed,}, Id={N: 101,}},OldImage: {Message={S: New item!,}, Id={N: 101,}},SequenceNumber: 200000000001134015828,SizeBytes: 59,StreamViewType: NEW_AND_OLD_IMAGES},} {EventID: fc3fe36dc8d50dc0c4b513922d8c1bfd,EventName: REMOVE,EventVersion: 1.1,EventSource: aws:dynamodb,AwsRegion: us-west-2,Dynamodb: {ApproximateCreationDateTime: Mon May 01 09:04:00 PDT 2017,Keys: {Id={N: 101,}},OldImage: {Message={S: This item has changed,}, Id={N: 101,}},SequenceNumber: 300000000001134015829,SizeBytes: 38,StreamViewType: NEW_AND_OLD_IMAGES},} Deleting the table... Demo complete

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazon.codesamples; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.amazonaws.AmazonClientException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.Record; import com.amazonaws.services.dynamodbv2.model.Shard; import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.util.TableUtils; public class StreamsLowLevelDemo { public static void main(String args[]) throws InterruptedException { AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.standard().build(); AmazonDynamoDBStreams streamsClient = AmazonDynamoDBStreamsClientBuilder.standard().build(); // Create the table String tableName = "TestTableForStreams"; ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition key StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withKeySchema(keySchema).withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput().withReadCapacityUnits(1L).withWriteCapacityUnits(1L)) .withStreamSpecification(streamSpecification); System.out.println("Issuing CreateTable request for " + tableName); dynamoDBClient.createTable(createTableRequest); System.out.println("Waiting for " + tableName + " to be created..."); try { TableUtils.waitUntilActive(dynamoDBClient, tableName); } catch (AmazonClientException e) { e.printStackTrace(); } // Determine the Streams settings for the table DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); StreamSpecification myStreamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Current stream ARN for " + tableName + ": " + myStreamArn); System.out.println("Stream enabled: " + myStreamSpec.getStreamEnabled()); System.out.println("Update view type: " + myStreamSpec.getStreamViewType()); // Add a new item int numChanges = 0; System.out.println("Making some changes to table data"); Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN("101")); item.put("Message", new AttributeValue().withS("New item!")); dynamoDBClient.putItem(tableName, item); numChanges++; // Update the item Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN("101")); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); attributeUpdates.put("Message", new AttributeValueUpdate().withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS("This item has changed"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); numChanges++; // Delete the item dynamoDBClient.deleteItem(tableName, key); numChanges++; // Get the shards in the stream DescribeStreamResult describeStreamResult = streamsClient .describeStream(new DescribeStreamRequest().withStreamArn(myStreamArn)); String streamArn = describeStreamResult.getStreamDescription().getStreamArn(); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); // Process each shard for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println("Processing " + shardId + " from stream " + streamArn); // Get an iterator for the current shard GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest().withStreamArn(myStreamArn) .withShardId(shardId).withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String nextItr = getShardIteratorResult.getShardIterator(); while (nextItr != null && numChanges > 0) { // Use the iterator to read the data records from the shard GetRecordsResult getRecordsResult = streamsClient .getRecords(new GetRecordsRequest().withShardIterator(nextItr)); List<Record> records = getRecordsResult.getRecords(); System.out.println("Getting records..."); for (Record record : records) { System.out.println(record); numChanges--; } nextItr = getRecordsResult.getNextShardIterator(); } // Delete the table System.out.println("Deleting the table..."); dynamoDBClient.deleteTable(tableName); System.out.println("Demo complete"); } } }