Getting started with Kinesis Data Streams for Amazon DynamoDB - Amazon DynamoDB
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China.

Getting started with Kinesis Data Streams for Amazon DynamoDB

This section describes how to use Kinesis Data Streams for Amazon DynamoDB tables with the Amazon DynamoDB console, the Amazon Command Line Interface (Amazon CLI), and the API.

All of these examples use the Music DynamoDB table that was created as part of the Getting started with DynamoDB tutorial.

Console
  1. Sign in to the Amazon Web Services Management Console and open the Kinesis console at https://console.aws.amazon.com/kinesis/.

  2. Choose Create data stream and follow the instructions to create a stream called samplestream.

  3. Open the DynamoDB console at https://console.aws.amazon.com/dynamodb/.

  4. In the navigation pane on the left side of the console, choose Tables.

  5. Choose the Music table.

  6. Choose the Exports and streams tab.

  7. Under Kinesis data stream details, choose the Enable button.

  8. Choose samplestream from the dropdown list.

  9. Choose the Enable stream button.

Amazon CLI
  1. Create a Kinesis Data Streams named samplestream by using the create-stream command.

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    See Shard management considerations for Kinesis Data Streams before setting the number of shards for the Kinesis data stream.

  2. Check that the Kinesis stream is active and ready for use by using the describe-stream command.

    aws kinesis describe-stream --stream-name samplestream
  3. Enable Kinesis streaming on the DynamoDB table by using the DynamoDB enable-kinesis-streaming-destination command. Replace the stream-arn value with the one that was returned by describe-stream in the previous step.

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/samplestream
  4. Check if Kinesis streaming is active on the table by using the DynamoDB describe-kinesis-streaming-destination command.

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. Write data to the DynamoDB table by using the put-item command, as described in the DynamoDB Developer Guide.

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. Use the Kinesis get-records CLI command to retrieve the Kinesis stream contents. Then use the following code snippet to deserialize the stream content.

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. Follow the instructions in the Kinesis Data Streams developer guide to create a Kinesis data stream named samplestream using Java.

    See Shard management considerations for Kinesis Data Streams before setting the number of shards for the Kinesis data stream.

  2. Use the following code snippet to enable Kinesis streaming on the DynamoDB table

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. Follow the instructions in the Kinesis Data Streams developer guide to read from the created data stream.

  4. Use the following code snippet to deserialize the stream content

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }

To learn more about how to build consumers and connect your Kinesis data stream to other Amazon services, see Reading data from Kinesis Data Streams in theAmazon Kinesis Data Streams developer guide.