Complete program: DynamoDB Streams Kinesis adapter
The following is the complete Java program that performs the tasks described in Walkthrough: DynamoDB Streams Kinesis adapter. When you run it, you should see output similar to the following.
Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
Important
To run this program, ensure that the client application has access to DynamoDB and Amazon CloudWatch using policies. For more information, see Identity-based policies for DynamoDB.
The source code consists of four .java
files:
-
StreamsAdapterDemo.java
-
StreamsRecordProcessor.java
-
StreamsRecordProcessorFactory.java
-
StreamsAdapterDemoHelper.java
StreamsAdapterDemo.java
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; public class StreamsAdapterDemo { private static DynamoDbAsyncClient dynamoDbAsyncClient; private static CloudWatchAsyncClient cloudWatchAsyncClient; private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient; private static String tablePrefix = "KCL-Demo"; private static String streamArn; private static Region region = Region.US_EAST_1; private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); public static void main( String[] args ) throws Exception { System.out.println("Starting demo..."); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); cloudWatchAsyncClient = CloudWatchAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; setUpTables(); StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); ShardRecordProcessorFactory shardRecordProcessorFactory = new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, "streams-adapter-demo", amazonDynamoDbStreamsAdapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, "streams-demo-worker", shardRecordProcessorFactory ); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); System.out.println("Creating scheduler for stream " + streamArn); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, amazonDynamoDbStreamsAdapterClient ); System.out.println("Starting scheduler..."); Thread t = new Thread(scheduler); t.start(); Thread.sleep(250000); System.out.println("Stopping scheduler..."); scheduler.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName); created = result.table().tableStatusAsString().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build()); dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build()); System.exit(returnValue); } }
StreamsRecordProcessor.java
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { private Integer checkpointCounter; private final DynamoDbAsyncClient dynamoDbAsyncClient; private final String tableName; public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) { this.dynamoDbAsyncClient = dynamoDbAsyncClient; this.tableName = tableName; } @Override public void initialize(InitializationInput initializationInput) { this.checkpointCounter = 0; } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) { for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) { String data = new String(record.data().array(), StandardCharsets.UTF_8); System.out.println(data); Record streamRecord = record.getRecord(); switch (streamRecord.eventName()) { case INSERT: case MODIFY: StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().newImage()); case REMOVE: StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().keys().get("Id").n()); } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease Lost"); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } }
StreamsRecordProcessorFactory.java
package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory { private final String tableName; private final DynamoDbAsyncClient dynamoDbAsyncClient; public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) { this.tableName = tableName; this.dynamoDbAsyncClient = asyncClient; } @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName); } }
StreamsAdapterDemoHelper.java
package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.StreamSpecification; import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(DynamoDbAsyncClient client, String tableName) { List<AttributeDefinition> attributeDefinitions = new ArrayList<>(); attributeDefinitions.add(AttributeDefinition.builder() .attributeName("Id") .attributeType("N") .build()); List<KeySchemaElement> keySchema = new ArrayList<>(); keySchema.add(KeySchemaElement.builder() .attributeName("Id") .keyType(KeyType.HASH) // Partition key .build()); StreamSpecification streamSpecification = StreamSpecification.builder() .streamEnabled(true) .streamViewType(StreamViewType.NEW_IMAGE) .build(); CreateTableRequest createTableRequest = CreateTableRequest.builder() .tableName(tableName) .attributeDefinitions(attributeDefinitions) .keySchema(keySchema) .billingMode(BillingMode.PAY_PER_REQUEST) .streamSpecification(streamSpecification) .build(); try { System.out.println("Creating table " + tableName); CreateTableResponse result = client.createTable(createTableRequest).join(); return result.tableDescription().latestStreamArn(); } catch (Exception e) { if (e.getCause() instanceof ResourceInUseException) { System.out.println("Table already exists."); return describeTable(client, tableName).table().latestStreamArn(); } throw e; } } public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) { return client.describeTable(DescribeTableRequest.builder() .tableName(tableName) .build()) .join(); } public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) { return dynamoDbClient.scan(ScanRequest.builder() .tableName(tableName) .build()) .join(); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> item = new HashMap<>(); item.put("Id", AttributeValue.builder().n(id).build()); item.put("attribute-1", AttributeValue.builder().s(val).build()); putItem(dynamoDbClient, tableName, item); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, Map<String, AttributeValue> items) { PutItemRequest putItemRequest = PutItemRequest.builder() .tableName(tableName) .item(items) .build(); dynamoDbClient.putItem(putItemRequest).join(); } public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); Map<String, String> expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put("#attr2", "attribute-2"); Map<String, AttributeValue> expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build()); UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() .tableName(tableName) .key(key) .updateExpression("SET #attr2 = :val") .expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues) .build(); dynamoDbClient.updateItem(updateItemRequest).join(); } public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) .key(key) .build(); dynamoDbClient.deleteItem(deleteItemRequest).join(); } }