Use PutRecord with an Amazon SDK or CLI - Amazon Kinesis Data Streams
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Use PutRecord with an Amazon SDK or CLI

The following code examples show how to use PutRecord.

Action examples are code excerpts from larger programs and must be run in context. You can see this action in context in the following code example:

CLI
Amazon CLI

To write a record into a data stream

The following put-record example writes a single data record into the specified data stream using the specified partition key.

aws kinesis put-record \ --stream-name samplestream \ --data sampledatarecord \ --partition-key samplepartitionkey

Output:

{ "ShardId": "shardId-000000000009", "SequenceNumber": "49600902273357540915989931256901506243878407835297513618", "EncryptionType": "KMS" }

For more information, see Developing Producers Using the Amazon Kinesis Data Streams API with the Amazon SDK for Java in the Amazon Kinesis Data Streams Developer Guide.

  • For API details, see PutRecord in Amazon CLI Command Reference.

Java
SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }
  • For API details, see PutRecord in Amazon SDK for Java 2.x API Reference.

PowerShell
Tools for PowerShell

Example 1: Writes a record containing the string supplied to the -Text parameter.

Write-KINRecord -Text "test data from string" -StreamName "mystream" -PartitionKey "Key1"

Example 2: Writes a record containing the data contained in the specified file. The file is treated as a sequence of bytes so if it contains text, it should be written with any necessary encoding before using it with this cmdlet.

Write-KINRecord -FilePath "C:\TestData.txt" -StreamName "mystream" -PartitionKey "Key2"
  • For API details, see PutRecord in Amazon Tools for PowerShell Cmdlet Reference.

Python
SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def put_record(self, data, partition_key): """ Puts data into the stream. The data is formatted as JSON before it is passed to the stream. :param data: The data to put in the stream. :param partition_key: The partition key to use for the data. :return: Metadata about the record, including its shard ID and sequence number. """ try: response = self.kinesis_client.put_record( StreamName=self.name, Data=json.dumps(data), PartitionKey=partition_key ) logger.info("Put record in stream %s.", self.name) except ClientError: logger.exception("Couldn't put record in stream %s.", self.name) raise else: return response
  • For API details, see PutRecord in Amazon SDK for Python (Boto3) API Reference.

Rust
SDK for Rust
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

async fn add_record(client: &Client, stream: &str, key: &str, data: &str) -> Result<(), Error> { let blob = Blob::new(data); client .put_record() .data(blob) .partition_key(key) .stream_name(stream) .send() .await?; println!("Put data into stream."); Ok(()) }
  • For API details, see PutRecord in Amazon SDK for Rust API reference.

SAP ABAP
SDK for SAP ABAP
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the Amazon Code Examples Repository.

TRY. oo_result = lo_kns->putrecord( " oo_result is returned for testing purposes. " iv_streamname = iv_stream_name iv_data = iv_data iv_partitionkey = iv_partition_key ). MESSAGE 'Record created.' TYPE 'I'. CATCH /aws1/cx_knsinvalidargumentex . MESSAGE 'The specified argument was not valid.' TYPE 'E'. CATCH /aws1/cx_knskmsaccessdeniedex . MESSAGE 'You do not have permission to perform this AWS KMS action.' TYPE 'E'. CATCH /aws1/cx_knskmsdisabledex . MESSAGE 'KMS key used is disabled.' TYPE 'E'. CATCH /aws1/cx_knskmsinvalidstateex . MESSAGE 'KMS key used is in an invalid state. ' TYPE 'E'. CATCH /aws1/cx_knskmsnotfoundex . MESSAGE 'KMS key used is not found.' TYPE 'E'. CATCH /aws1/cx_knskmsoptinrequired . MESSAGE 'KMS key option is required.' TYPE 'E'. CATCH /aws1/cx_knskmsthrottlingex . MESSAGE 'The rate of requests to AWS KMS is exceeding the request quotas.' TYPE 'E'. CATCH /aws1/cx_knsprovthruputexcdex . MESSAGE 'The request rate for the stream is too high, or the requested data is too large for the available throughput.' TYPE 'E'. CATCH /aws1/cx_knsresourcenotfoundex . MESSAGE 'Resource being accessed is not found.' TYPE 'E'. ENDTRY.
  • For API details, see PutRecord in Amazon SDK for SAP ABAP API reference.

For a complete list of Amazon SDK developer guides and code examples, see Using this service with an Amazon SDK. This topic also includes information about getting started and details about previous SDK versions.