

# 演练：DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

本节是使用 Amazon Kinesis Client Library 和 Amazon DynamoDB Streams Kinesis Adapter 的 Java 应用程序的演练。此应用程序演示了数据复制示例，其中将一个表中的写入活动应用于另一个表，并且两个表中的内容保持同步。有关源代码，请参阅 [完成程序：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)。

此程序执行以下操作：

1. 创建名为 `KCL-Demo-src` 和 `KCL-Demo-dst` 的两个 DynamoDB 表。每个表上均启用一个流。

1. 通过添加、更新和删除项目在源表中生成更新活动。这会导致数据写入表的流中。

1. 从流中读取记录、将记录重新构造为 DynamoDB 请求并将请求应用于目标表。

1. 扫描源表和目标表，以确保其内容一致。

1. 通过删除表进行清除。

以下各节将描述这些步骤，本演练结尾将显示完整的应用程序。

**Topics**
+ [第 1 步：创建 DynamoDB 表](#Streams.KCLAdapter.Walkthrough.Step1)
+ [第 2 步：在源表中生成更新活动](#Streams.KCLAdapter.Walkthrough.Step2)
+ [第 3 步：处理流](#Streams.KCLAdapter.Walkthrough.Step3)
+ [第 4 步：确保两个表具有相同的内容](#Streams.KCLAdapter.Walkthrough.Step4)
+ [第 5 步：清理](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完成程序：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 第 1 步：创建 DynamoDB 表
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

第一步是创建两个 DynamoDB 表，一个源表和一个目标表。源表的流上的 `StreamViewType` 为 `NEW_IMAGE`。这意味着无论何时修改此表中的项目，项目“之后”的映像都将写入到流中。这样一来，流将跟踪表上的所有写入活动。

以下示例显示用于创建两个表的代码。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 第 2 步：在源表中生成更新活动
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

下一步是在源表上生成某些写入活动。在此活动发生时，源表的流也会近乎实时更新。

此应用程序通过调用用于写入数据的 `PutItem`、`UpdateItem` 和 `DeleteItem` API 操作的方法来定义帮助程序类。以下代码示例演示如何使用这些方法。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 第 3 步：处理流
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

现在，此程序开始处理流。DynamoDB Streams Kinesis Adapter 充当 KCL 和 DynamoDB Streams 端点之间的透明层，以便代码可充分利用 KCL 而不必进行低级 DynamoDB Streams 调用。此程序执行以下任务：
+ 它通过符合 KCL 接口定义的方法 (`StreamsRecordProcessor`、`initialize` 和 `processRecords`) 定义记录处理器类 `shutdown`。`processRecords` 方法包含从源表的流中进行读取以及对目标表进行写入时所需的逻辑。
+ 它定义了记录处理器类的类工厂 (`StreamsRecordProcessorFactory`)。这是使用 KCL 的 Java 程序所需的。
+ 它实例化一个新的 KCL `Worker`，它与类工厂关联。
+ 当记录处理完成时，它会关闭 `Worker`。

或者，在 Streams KCL Adapter 配置中启用追赶模式，以便在流处理滞后超过一分钟（默认值）时，自动将 GetRecords API 调用速率扩展 3 倍（默认值），从而有助于流使用者处理表中的高吞吐量峰值。

要了解有关 KCL 接口定义的详细信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*的[使用 Kinesis 客户端库开发使用者](https://docs.amazonaws.cn/kinesis/latest/dev/developing-consumers-with-kcl.html)。

以下代码示例演示了 `StreamsRecordProcessor` 中的主循环。`case` 语句基于流记录中显示的 `OperationType` 来确定要执行的操作。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 第 4 步：确保两个表具有相同的内容
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

此时，源表和目标表的内容是同步的。此应用程序针对两个表发送 `Scan` 请求以验证其内容是否实质相同。

`DemoHelper` 类包含调用低级 `ScanTable` API 的 `Scan` 方法。下例说明具体用法。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 第 5 步：清理
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

演示完成后，此应用程序将删除源表和目标表。请看下面的代码示例。甚至在删除两个表后，其流也可在自动删除后的最多 24 个小时内保持可用。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```