Kinesis Data Streams for Amazon DynamoDB 入门 - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

Kinesis Data Streams for Amazon DynamoDB 入门

本节介绍如何通过 Amazon DynamoDB 控制台、Amazon Command Line Interface (Amazon CLI) 和 API 使用 Kinesis Data Streams for Amazon DynamoDB 表。

所有这些示例都使用 DynamoDB 入门教程中创建的 Music DynamoDB 表。

Console
  1. 登录 Amazon Web Services Management Console,打开 Kinesis 控制台:https://console.aws.amazon.com/kinesis/。

  2. 选择创建数据流并按照说明创建一个名为 samplestream 的流。

  3. 打开 DynamoDB 控制台:https://console.aws.amazon.com/dynamodb/。

  4. 在控制台左侧的导航窗格中,选择

  5. 选择 Music 表。

  6. 选择导出和流式传输选项卡。

  7. 根据Kinesis Data Streams 详细信息,选择启用按钮。

  8. 从下拉列表选择 samplestream

  9. 选择启用流式传输按钮。

Amazon CLI
  1. 使用 create-stream 命令创建名为 samplestream 的 Kinesis 流。

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    请参阅 Kinesis Data Streams 的分片管理注意事项,然后设置 Kinesis 数据流的分片数。

  2. 使用 describe-stream 命令,检查 Kinesis 流是否处于活动状态并准备好使用。

    aws kinesis describe-stream --stream-name samplestream
  3. 使用 DynamoDBenable-kinesis-streaming-destination 命令,在 DynamoDB 表上启用 Kinesis 数据流。将 stream-arn 值替换为上一步返回的 describe-stream

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/samplestream
  4. 使用 DynamoDB describe-kinesis-streaming-destination 命令检查是否在表上激活 Kinesis 流。

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. 使用 DynamoDB 开发人员指南介绍的 put-item 命令,将数据写入 DynamoDB 表。

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. 使用 Kinesis get-records CLI 命令检索 Kinesis 流内容。然后使用以下代码片段来反序列化流内容。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. 按照 Kinesis Data Streams 开发人员指南中的说明,使用 Java 创建一个名为 samplestream 的 Kinesis 数据流。

    请参阅 Kinesis Data Streams 的分片管理注意事项,然后设置 Kinesis 数据流的分片数。

  2. 使用以下代码段在DynamoDB 表上启用 Kinesis 数据流。

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. 按照 Kinesis Data Streams 开发人员指南中的说明进行操作,从创建的数据流读取

  4. 然后使用以下代码段 来反序列化流内容。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }

要了解有关如何构建消费者并将 Kinesis 数据流连接到其他 Amazon 服务,请参阅 Amazon Kinesis Data Streams 开发人员指南从 Amazon Kinesis Data Streams 读取数据