适用于 Amazon DynamoDB 的 Amazon Kinesis Data Streams 入门 - Amazon DynamoDB
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适用于 Amazon DynamoDB 的 Amazon Kinesis Data Streams 入门

本节介绍如何将 Amazon Kinesis Data Streams for Amazon DynamoDB 与 Amazon DynamoDB 控制台、AWS Command Line Interface (AWS CLI) 和 APIs 结合使用。

所有这些示例都使用作为 Music 入门DynamoDB教程的一部分创建的 DynamoDB 表。

Console
  1. 登录 AWS 管理控制台,并通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesis/.

  2. 选择 Create data stream (创建数据流),然后按照说明创建一个名为 samplestream 的流。

  3. 通过以下网址打开 DynamoDB 控制台:https://console.aws.amazon.com/dynamodb/.

  4. 在控制台左侧的导航窗格中,选择 Tables (表)

  5. 选择 Music 表。

  6. 选择 Overview (概述) 选项卡。

  7. Kinesis data stream details (Kinesis 数据流详细信息) 下,选择 Manage streaming to Kinesis (管理流式传输到 Kinesis)

  8. 从下拉列表中选择 samplestream

  9. 选择 Enable (启用) 按钮。

AWS CLI
  1. 使用 samplestreamcreate-stream 命令创建名为 的 Kinesis 流。

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    在设置 Kinesis 数据流的分片数之前,请参阅Amazon Kinesis Data Streams 的分片管理注意事项

  2. 使用 describe-stream 命令检查 Kinesis 流是否处于活动状态并可供使用。

    aws kinesis describe-stream --stream-name samplestream
  3. 使用 DynamoDB DynamoDB 命令在 enable-kinesis-streaming-destination 表上启用 Kinesis 流式处理。将 stream-arn 值替换为上一步中由 describe-stream 返回的值。

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/samplestream
  4. 使用 DynamoDB describe-kinesis-streaming-destination 命令检查 Kinesis 流式传输是否对表处于活动状态。

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. 使用 DynamoDB 命令将数据写入 put-item 表,如 DynamoDB 开发人员指南中所述。

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. 使用 Kinesis get-records CLI 命令检索 Kinesis 流内容。然后,使用以下代码段反序列化流内容。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. 按照 Kinesis Data Streams 开发人员指南中的说明,使用 Java 创建名为 samplestream 的 Kinesis 数据流。

    在设置 Kinesis 数据流的分片数之前,请参阅Amazon Kinesis Data Streams 的分片管理注意事项

  2. 在表中使用以下代码段启用 Amazon Kinesis Data Streams for AmazonDynamoDB

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. 按照 Kinesis Data Streams 开发人员指南中的说明,从创建的数据流中读取

  4. 使用以下代码段反序列化流内容

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }

要了解有关如何构建使用者并将 Kinesis 数据流连接到其他 AWS 服务的更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的从 Amazon Kinesis Data Streams 读取数据