本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
适用于 Amazon DynamoDB 的 Amazon Kinesis Data Streams 入门
本节介绍如何将 Amazon Kinesis Data Streams for Amazon DynamoDB 与 Amazon DynamoDB 控制台、AWS Command Line Interface (AWS CLI) 和 APIs 结合使用。
所有这些示例都使用作为 Music
入门DynamoDB教程的一部分创建的 DynamoDB 表。
- Console
-
-
登录 AWS 管理控制台,并通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesis/.
-
选择 Create data stream (创建数据流),然后按照说明创建一个名为
samplestream
的流。 -
通过以下网址打开 DynamoDB 控制台:https://console.aws.amazon.com/dynamodb/.
-
在控制台左侧的导航窗格中,选择 Tables (表)。
-
选择 Music 表。
-
选择 Overview (概述) 选项卡。
-
在 Kinesis data stream details (Kinesis 数据流详细信息) 下,选择 Manage streaming to Kinesis (管理流式传输到 Kinesis)。
-
从下拉列表中选择 samplestream。
-
选择 Enable (启用) 按钮。
-
- AWS CLI
-
-
使用
samplestream
create-stream 命令创建名为 的 Kinesis 流。aws kinesis create-stream --stream-name samplestream --shard-count 3
在设置 Kinesis 数据流的分片数之前,请参阅Amazon Kinesis Data Streams 的分片管理注意事项 。
-
使用 describe-stream 命令检查 Kinesis 流是否处于活动状态并可供使用。
aws kinesis describe-stream --stream-name samplestream
-
使用 DynamoDB DynamoDB 命令在
enable-kinesis-streaming-destination
表上启用 Kinesis 流式处理。将 stream-arn 值替换为上一步中由describe-stream
返回的值。aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/samplestream
-
使用 DynamoDB
describe-kinesis-streaming-destination
命令检查 Kinesis 流式传输是否对表处于活动状态。aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
使用 DynamoDB 命令将数据写入
put-item
表,如 DynamoDB 开发人员指南中所述。aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
使用 Kinesis get-records CLI 命令检索 Kinesis 流内容。然后,使用以下代码段反序列化流内容。
/** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
-
- Java
-
-
按照 Kinesis Data Streams 开发人员指南中的说明,使用 Java 创建名为
samplestream
的 Kinesis 数据流。在设置 Kinesis 数据流的分片数之前,请参阅Amazon Kinesis Data Streams 的分片管理注意事项 。
-
在表中使用以下代码段启用 Amazon Kinesis Data Streams for AmazonDynamoDB
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
按照 Kinesis Data Streams 开发人员指南中的说明,从创建的数据流中读取。
-
使用以下代码段反序列化流内容
/** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
-
要了解有关如何构建使用者并将 Kinesis 数据流连接到其他 AWS 服务的更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的从 Amazon Kinesis Data Streams 读取数据。