Amazon Kinesis Data Streams
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

通过将 Amazon Kinesis Data Streams API 与适用于 Java 的 AWS 开发工具包结合使用来开发 Amazon Kinesis Data Streams 生产者

您可以将 Amazon Kinesis Data Streams API 与适用于 Java 的 AWS 开发工具包结合使用来开发创建器。如果您是首次使用 Kinesis Data Streams,请先熟悉一下什么是 Amazon Kinesis Data Streams?开始使用 Amazon Kinesis Data Streams中介绍的概念和术语。

这些示例讨论 Kinesis Data Streams API 并使用适用于 Java 的 AWS 开发工具包向流中添加(放置)数据。但是,在大多数使用案例中,您应会更喜欢使用 Kinesis Data Streams KPL 库。有关更多信息,请参阅 使用 Kinesis Producer Library 开发 Amazon Kinesis Data Streams 创建器

本章中的 Java 示例代码演示如何执行基本的 Kinesis Data Streams API 操作,并按照操作类型从逻辑上进行划分。这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。此外,您可使用其他编程语言调用 Kinesis Data Streams API。有关所有可用 AWS 开发工具包的更多信息,请参阅开始使用 Amazon Web Services 开发

每个任务都有先决条件;例如,您在创建流之后才能向流中添加数据,而创建流需要先创建一个客户端。有关更多信息,请参阅 使用 Java 管理 Kinesis Stream

向流添加数据

在创建流之后,您可以记录的形式向其中添加数据。记录是一种数据结构,其中包含要处理的数据(采用数据 Blob 形式)。在将数据存储到记录中之后,Kinesis Data Streams 不会以任何形式检查、解释或更改数据。每个记录还有一个关联的序列号和分区键。

Kinesis Data Streams API 中有两种不同的向流添加数据的操作:PutRecordsPutRecordPutRecords 操作将按 HTTP 请求向您的流发送多个记录,并且单个 PutRecord 操作一次可向您的流发送多个记录(每个记录需要单独的 HTTP 请求)。对于大多数应用程序,您应会更喜欢使用 PutRecords,因为这将使每个数据创建器实现更高的吞吐量。有关每种操作的更多信息,请参阅以下各小节。

请始终记住,在您的源应用程序使用 Kinesis Data Streams API 向流添加数据时,很可能存在一个或多个同时处理离开流的数据的使用者应用程序。有关使用者应用程序使用 Kinesis Data Streams API 获取数据的信息,请参阅 从流中获取数据

使用 PutRecords 添加多个记录

PutRecords 操作可在一次请求中向 Kinesis Data Streams 发送多个记录。通过使用 PutRecords,创建器可在向其 Kinesis stream 发送数据时实现更高的吞吐量。每 PutRecords 请求可以支持多达 500 条记录。请求中的每一个记录可以大到 1MB,整个请求的上限为 5 MB,包括分区键。 与下面描述的 PutRecord 操作一样,PutRecords 将使用序列号和分区键。但是,PutRecord 参数 SequenceNumberForOrdering 未包含在 PutRecords 调用中。PutRecords 操作将尝试按请求的自然顺序处理所有记录。

每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecords 向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecords 请求之间的时间段越长,序列号变得越大。

注意

序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。

PutRecords 请求可包含具有不同分区键的记录。请求的应用范围是一个流;每个请求可包含分区键和记录的任何组合,直到达到请求限制。使用许多不同的分区键对具有许多不同分片的流进行的请求一般快于使用少量分区键对少量分片进行的请求。分区键的数量应远大于分片的数量以减少延迟并最大程度提高吞吐量。

PutRecords 示例

以下代码创建 100 个使用连续分区键的数据记录并将其放入名为 DataStream 的流中。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

PutRecords 响应包含响应 Records 的数组。响应数组中的每个记录按自然顺序(从请求和响应的顶部到底部)直接与请求数组中的一个记录关联。响应 Records 数组包含的记录数量始终与请求数组相同。

处理使用 PutRecords 时的操作失败

默认情况下,请求内的单个记录的失败不会中止对 PutRecords 请求中后续记录的处理。这意味着,响应 Records 数组包含处理成功和不成功的记录。您必须删除处理不成功的记录并在后续调用中包括它们。

成功的记录包括 SequenceNumberShardID 值,而不成功的记录包含 ErrorCodeErrorMessage 值。ErrorCode 参数反映了错误类型,可能为下列值之一:ProvisionedThroughputExceededExceptionInternalFailureErrorMessage 提供有关 ProvisionedThroughputExceededException 异常的更多详细信息,包括账户 ID、流名称和已阻止的记录的分片 ID。以下示例在 PutRecords 请求中有三个记录。第二个记录失败并反映在响应中。

例 PutRecords 请求语法

{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }

例 PutRecords 响应语法

{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

处理不成功的请求可包含在后续 PutRecords 请求中。首先,查看 putRecordsResult 中的 FailedRecordCount 参数以确认请求中是否存在失败的记录。如果存在,则应将具有 ErrorCode(不是 null)的每个 putRecordsEntry 添加到后续请求中。有关此类处理程序的示例,请参阅以下代码。

例 PutRecords 故障处理程序

PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

使用 PutRecord 添加单一记录

PutRecord 的每次调用对一个记录起作用。应首选 使用 PutRecords 添加多个记录 中描述的 PutRecords 操作,除非您的应用程序明确需要每一请求始终发送一条记录,或因某种其他原因无法使用 PutRecords

每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecord 向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecord 请求之间的时间段越长,序列号变得越大。

在快速连续进行放置时,不保证返回的序列号会递增,因为放置操作的发生对于 Kinesis Data Streams 实际上是同步进行的。要确保相同分区键的序列号严格递增,请使用 SequenceNumberForOrdering 参数,如 PutRecord 示例 代码示例中所示。

无论您是否使用 SequenceNumberForOrdering,Kinesis Data Streams 通过 GetRecords 调用接收的记录都将按序列号严格进行排序。

注意

序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。

分区键用于对流中的数据进行分组。数据记录将基于其分区键分配给流中的分片。具体来说,Kinesis Data Streams 使用分区键作为将分区键(和关联数据)映射到特定分片的哈希函数的输入。

作为此哈希机制的结果,具有相同分区键的所有数据记录将映射到流中的同一分片。但是,如果分区键的数量超出分片的数量,则一些分片必定会包含具有不同分区键的记录。从设计的角度看,要确保您的所有分片得到充分利用,分片的数量(由 CreateStreamRequestsetShardCount 方法指定)应远少于唯一分区键的数量,并且流至单一分区键的数据量应远少于分片容量。

PutRecord 示例

以下代码创建跨两个分区键分配的 10 条数据记录,并将它们放入名为 myStreamName 的流中。

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

上一个代码示例使用 setSequenceNumberForOrdering 来确保每个分区键内的顺序严格递增。要有效使用此参数,请将当前记录(记录 n)的 SequenceNumberForOrdering 设置为前一记录(记录 n-1)的序列号。要获取已添加到流的记录的序列号,请对 putRecord 的结果调用 getSequenceNumber

同一客户端调用 PutRecord 时,SequenceNumberForOrdering 参数可确保相同分区键的序列号严格递增。SequenceNumberForOrdering 不会在从多个并发应用程序添加的记录中或在多个分区键中提供顺序保证。

本页内容: