适用于 Amazon DynamoDB 的 Kinesis Data Streams 入门 - Amazon DynamoDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

适用于 Amazon DynamoDB 的 Kinesis Data Streams 入门

本节介绍了如何通过 Amazon DynamoDB 控制台、Amazon Command Line Interface(Amazon CLI)和 API 将 Kinesis Data Streams 用于 Amazon DynamoDB 表。

所有这些示例都使用 DynamoDB 入门教程中创建的 Music DynamoDB 表。

要了解如何构建使用器并将 Kinesis Data Stream 连接到其他 Amazon 服务,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的从 Kinesis Data Streams 读取数据

注意

当您首次使用 KDS 分片时,建议您将分片设置为根据使用模式横向和纵向扩展。在积累了更多有关使用模式的数据后,您可以调整数据流中的分片以与模式匹配。

Console
  1. 登录 Amazon Web Services Management Console,打开 Kinesis 控制台:https://console.aws.amazon.com/kinesis/。

  2. 选择创建数据流并按照说明创建一个名为 samplestream 的流。

  3. 打开 DynamoDB 控制台:https://console.aws.amazon.com/dynamodb/。

  4. 在控制台左侧的导航窗格中,选择

  5. 选择 Music 表。

  6. 选择导出和流式传输选项卡。

  7. 根据Kinesis Data Streams 详细信息,选择启用按钮。

  8. 从下拉列表选择 samplestream

  9. 选择启用流式传输按钮。

Amazon CLI
  1. 使用 create-stream 命令创建名为 samplestream 的 Kinesis 流。

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    请参阅Kinesis Data Streams 的分片管理注意事项,然后设置 Kinesis 数据流的分片数。

  2. 使用 describe-stream 命令,检查 Kinesis 流是否处于活动状态并准备好使用。

    aws kinesis describe-stream --stream-name samplestream
  3. 使用 DynamoDB enable-kinesis-streaming-destination 命令,在 DynamoDB 表上启用 Kinesis 数据流。将 stream-arn 值替换为上一步返回的 describe-stream

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/samplestream
  4. 使用 DynamoDB describe-kinesis-streaming-destination 命令检查是否在表上激活 Kinesis 流。

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. 使用 DynamoDB 开发人员指南介绍的 put-item 命令,将数据写入 DynamoDB 表。

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. 使用 Kinesis get-records CLI 命令检索 Kinesis 流内容。然后使用以下代码片段来反序列化流内容。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. 按照 Kinesis Data Streams 开发人员指南中的说明,使用 Java 创建一个名为 samplestreamhttps://docs.amazonaws.cn/streams/latest/dev/kinesis-using-sdk-java-create-stream.html Kinesis 数据流。

    请参阅Kinesis Data Streams 的分片管理注意事项,然后设置 Kinesis 数据流的分片数。

  2. 使用以下代码段在DynamoDB 表上启用 Kinesis 数据流

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. 按照 Kinesis Data Streams 开发人员指南中的说明进行操作,从创建的数据流读取

  4. 然后使用以下代码段来反序列化流内容。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }