Amazon DynamoDB
开发人员指南 (API Version 2012-08-10)
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

完成程序:DynamoDB 流 Kinesis 适配器

此处是执行此演练中描述的任务的完整 Java 程序。当您运行该程序时,将显示与以下内容类似的输出:

Copy
Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.

重要

要运行此程序,请确保客户端应用程序可以使用策略来访问 DynamoDB 和 CloudWatch。有关更多信息,请参阅 为 Amazon DynamoDB 使用基于身份的策略(IAM 策略)

源代码包括四个 .java 文件:

  • StreamsAdapterDemo.java

  • StreamsRecordProcessor.java

  • StreamsRecordProcessorFactory.java

  • StreamsAdapterDemoHelper.java

StreamsAdapterDemo.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; public class StreamsAdapterDemo { private static Worker worker; private static KinesisClientLibConfiguration workerConfig; private static IRecordProcessorFactory recordProcessorFactory; private static AmazonDynamoDBStreamsAdapterClient adapterClient; private static AWSCredentialsProvider streamsCredentials; private static AmazonDynamoDB dynamoDBClient; private static AWSCredentialsProvider dynamoDBCredentials; private static AmazonCloudWatch cloudWatchClient; private static String serviceName = "dynamodb"; private static String dynamodbEndpoint = "DYNAMODB_ENDPOINT_GOES_HERE"; private static String tablePrefix = "KCL-Demo"; private static String streamArn; /** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamsCredentials = new ProfileCredentialsProvider(); dynamoDBCredentials = new ProfileCredentialsProvider(); recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable); adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration()); dynamoDBClient = AmazonDynamoDBClientBuilder.standard().build(); cloudWatchClient = AmazonCloudWatchClientBuilder.standard().build(); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, streamsCredentials, "streams-demo-worker").withMaxRecords(1000).withIdleTimeBetweenReadsInMillis(500) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDBClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDBClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResult result = StreamsAdapterDemoHelper.describeTable(dynamoDBClient, tableName); created = result.getTable().getTableStatus().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable)); System.exit(returnValue); } }

StreamsRecordProcessor.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples; import java.nio.charset.Charset; import java.util.List; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; public class StreamsRecordProcessor implements IRecordProcessor { private Integer checkpointCounter; private final AmazonDynamoDB dynamoDBClient; private final String tableName; public StreamsRecordProcessor(AmazonDynamoDB dynamoDBClient2, String tableName) { this.dynamoDBClient = dynamoDBClient2; this.tableName = tableName; } @Override public void initialize(String shardId) { checkpointCounter = 0; } @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }

StreamsRecordProcessorFactory.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { private final String tableName; public StreamsRecordProcessorFactory(AWSCredentialsProvider dynamoDBCredentials, String dynamoDBEndpoint, String serviceName, String tableName) { this.tableName = tableName; } @Override public IRecordProcessor createProcessor() { AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.standard().build(); return new StreamsRecordProcessor(dynamoDBClient, tableName); } }

StreamsAdapterDemoHelper.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.CreateTableResult; import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.PutItemRequest; import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.amazonaws.services.dynamodbv2.model.ScanResult; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(AmazonDynamoDB client, String tableName) { java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification); try { System.out.println("Creating table " + tableName); CreateTableResult result = client.createTable(createTableRequest); return result.getTableDescription().getLatestStreamArn(); } catch (ResourceInUseException e) { System.out.println("Table already exists."); return describeTable(client, tableName).getTable().getLatestStreamArn(); } } public static DescribeTableResult describeTable(AmazonDynamoDB client, String tableName) { return client.describeTable(new DescribeTableRequest().withTableName(tableName)); } public static ScanResult scanTable(AmazonDynamoDB dynamoDBClient, String tableName) { return dynamoDBClient.scan(new ScanRequest().withTableName(tableName)); } public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) { java.util.Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN(id)); item.put("attribute-1", new AttributeValue().withS(val)); PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(item); dynamoDBClient.putItem(putItemRequest); } public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName, java.util.Map<String, AttributeValue> items) { PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(items); dynamoDBClient.putItem(putItemRequest); } public static void updateItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); AttributeValueUpdate update = new AttributeValueUpdate().withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS(val)); attributeUpdates.put("attribute-2", update); UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(tableName).withKey(key) .withAttributeUpdates(attributeUpdates); dynamoDBClient.updateItem(updateItemRequest); } public static void deleteItem(AmazonDynamoDB dynamoDBClient, String tableName, String id) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); DeleteItemRequest deleteItemRequest = new DeleteItemRequest().withTableName(tableName).withKey(key); dynamoDBClient.deleteItem(deleteItemRequest); } }