演练:DynamoDB Streams Kinesis Adapter - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

演练:DynamoDB Streams Kinesis Adapter

本节是使用 Amazon Kinesis Client Library 和 Amazon DynamoDB Streams Kinesis Adapter 的 Java 应用程序的演练。此应用程序演示了数据复制示例,其中将一个表中的写入活动应用于另一个表,并且两个表中的内容保持同步。有关源代码,请参阅 完成程序:DynamoDB Streams Kinesis Adapter

此程序执行以下操作:

  1. 创建名为 KCL-Demo-srcKCL-Demo-dst 的两个 DynamoDB 表。每个表上均启用一个流。

  2. 通过添加、更新和删除项目在源表中生成更新活动。这会导致数据写入表的流中。

  3. 从流中读取记录、将记录重新构造为 DynamoDB 请求并将请求应用于目标表。

  4. 扫描源表和目标表,以确保其内容一致。

  5. 通过删除表进行清除。

以下各节将描述这些步骤,本演练结尾将显示完整的应用程序。

第 1 步:创建 DynamoDB 表

第一步是创建两个 DynamoDB 表,一个源表和一个目标表。源表的流上的 StreamViewTypeNEW_IMAGE。这意味着无论何时修改此表中的项目,项目“之后”的映像都将写入到流中。这样一来,流将跟踪表上的所有写入活动。

以下示例显示用于创建两个表的代码。

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

第 2 步:在源表中生成更新活动

下一步是在源表上生成某些写入活动。在此活动发生时,源表的流也会近乎实时更新。

此应用程序通过调用用于写入数据的 PutItemUpdateItemDeleteItem API 操作的方法来定义帮助程序类。以下代码示例演示如何使用这些方法。

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

第 3 步:处理流

现在,此程序开始处理流。DynamoDB Streams Kinesis Adapter 充当 KCL 和 DynamoDB Streams 终端节点之间的透明层,以便代码可充分利用 KCL 而不必进行低级 DynamoDB Streams 调用。此程序执行以下任务:

  • 它通过符合 KCL 接口定义的方法 (StreamsRecordProcessorinitializeprocessRecords) 定义记录处理器类 shutdownprocessRecords 方法包含从源表的流中进行读取以及对目标表进行写入时所需的逻辑。

  • 它定义了记录处理器类的类工厂 (StreamsRecordProcessorFactory)。这是使用 KCL 的 Java 程序所需的。

  • 它实例化一个新的 KCL Worker,它与类工厂关联。

  • 当记录处理完成时,它会关闭 Worker

要了解有关 KCL 接口定义的详细信息,请参阅 Amazon Kinesis Data Streams 开发人员指南使用 Kinesis 客户端库开发使用者

以下代码示例演示了 StreamsRecordProcessor 中的主循环。case 语句基于流记录中显示的 OperationType 来确定要执行的操作。

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

第 4 步:确保两个表具有相同的内容

此时,源表和目标表的内容是同步的。此应用程序针对两个表发送 Scan 请求以验证其内容是否实质相同。

DemoHelper 类包含调用低级 Scan API 的 ScanTable 方法。下例说明具体用法。

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

第 5 步:清理

演示完成后,此应用程序将删除源表和目标表。请看下面的代码示例。甚至在删除两个表后,其流也可在自动删除后的最多 24 个小时内保持可用。

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));