本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon Kinesis Data Streams API 和 Amazon SDK for Java 开发产生器
您可以使用 Amazon Kinesis Data Streams API 和适用于 Java 的 Amazon 开发工具包开发创建器。如果是首次使用 Kinesis Data Streams,请先熟悉 什么是 Amazon Kinesis Data Streams? 和 使用 Amazon CLI 执行 Amazon Kinesis Data Streams 操作 中介绍的概念和术语。
这些示例讨论 Kinesis Data Streams API 并使用适用于 Java 的 Amazon 开发工具包
本章中的 Java 示例代码演示如何执行基本的 Kinesis Data Streams API 操作,并按照操作类型从逻辑上进行划分。这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。此外,您可使用其他编程语言调用 Kinesis Data Streams API。有关所有可用 Amazon 开发工具包的更多信息,请参阅开始使用亚马逊云科技开发
每个任务都有先决条件;例如,您在创建流之后才能向流中添加数据,而创建流需要先创建一个客户端。有关更多信息,请参阅 创建和管理 Kinesis 数据流。
向流添加数据
在创建流之后,您可以记录的形式向其中添加数据。记录是一种数据结构,其中包含要处理的数据(采用数据 Blob 形式)。在将数据存储到记录中之后,Kinesis Data Streams 不会以任何形式检查、解释或更改数据。每个记录还有一个关联的序列号和分区键。
Kinesis Data Streams API 中有两种不同操作可向流添加数据:PutRecords
和 PutRecord
。PutRecords
操作将按 HTTP 请求向您的流发送多个记录,并且单个 PutRecord
操作一次可向您的流发送多个记录(每个记录需要单独的 HTTP 请求)。对于大多数应用程序,您应会更喜欢使用 PutRecords
,因为这将使每个数据创建者实现更高的吞吐量。有关每种操作的更多信息,请参阅以下各小节。
请始终记住,在您的源应用程序使用 Kinesis Data Streams API 向流添加数据时,很可能存在一个或多个同时处理离开流的数据的消费端应用程序。有关消费端如何使用 Kinesis Data Streams API 获取数据的信息,请参阅 从流中获取数据。
重要
使用 PutRecords 添加多条记录
PutRecords
操作可在一个请求中向 Kinesis Data Streams 发送多条记录。向 Kinesis 数据流发送数据时,创建器可以使用 PutRecords
实现更高的吞吐量。每个 PutRecords
请求最多可以支持 500 条记录。请求中的每一个记录最大可以为 1 MB,整个请求的上限为 5 MB,包括分区键。与下面描述的单个 PutRecord
操作一样,PutRecords
将使用序列号和分区键。但是,PutRecord
参数 SequenceNumberForOrdering
未包含在 PutRecords
调用中。PutRecords
操作将尝试按请求的自然顺序处理所有记录。
每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecords
向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecords
请求之间的时间段越长,序列号变得越大。
注意
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。
PutRecords
请求可包含具有不同分区键的记录。请求的应用范围是一个流;每个请求可包含分区键和记录的任何组合,直到达到请求限制。使用许多不同的分区键对具有许多不同分片的流进行的请求一般快于使用少量分区键对少量分片进行的请求。分区键的数量应远大于分片的数量以减少延迟并最大程度提高吞吐量。
PutRecords 示例
以下代码创建 100 个使用连续分区键的数据记录并将其放入名为 DataStream
的流中。
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);
PutRecords
响应包含响应 Records
的数组。响应数组中的每个记录按自然顺序(从请求和响应的顶部到底部)直接与请求数组中的一个记录关联。响应 Records
数组包含的记录数量始终与请求数组相同。
处理使用 PutRecords 时的失败问题
默认情况下,请求内的单个记录的失败不会中止对 PutRecords
请求中后续记录的处理。这意味着,响应 Records
数组包含处理成功和不成功的记录。您必须删除处理不成功的记录并在后续调用中包括它们。
成功的记录包括 SequenceNumber
和 ShardID
值,而不成功的记录包含 ErrorCode
和 ErrorMessage
值。ErrorCode
参数反映了错误类型,可能为下列值之一:ProvisionedThroughputExceededException
或 InternalFailure
。ErrorMessage
提供有关 ProvisionedThroughputExceededException
异常的更多详细信息,包括账户 ID、流名称和已阻止的记录的分片 ID。以下示例在 PutRecords
请求中有三个记录。第二个记录失败并反映在响应中。
例 PutRecords 请求语法
{
"Records": [
{
"Data": "XzxkYXRhPl8w",
"PartitionKey": "partitionKey1"
},
{
"Data": "AbceddeRFfg12asd",
"PartitionKey": "partitionKey1"
},
{
"Data": "KFpcd98*7nd1",
"PartitionKey": "partitionKey3"
}
],
"StreamName": "myStream"
}
例 PutRecords 响应语法
{
"FailedRecordCount”: 1,
"Records": [
{
"SequenceNumber": "21269319989900637946712965403778482371",
"ShardId": "shardId-000000000001"
},
{
“ErrorCode":”ProvisionedThroughputExceededException”,
“ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"SequenceNumber": "21269319989999637946712965403778482985",
"ShardId": "shardId-000000000002"
}
]
}
处理不成功的请求可包含在后续 PutRecords
请求中。首先,查看 putRecordsResult
中的 FailedRecordCount
参数以确认请求中是否存在失败的记录。如果存在,则应将具有 putRecordsEntry
(不是 ErrorCode
)的每个 null
添加到后续请求中。有关此类处理程序的示例,请参阅以下代码。
例 PutRecords 故障处理程序
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }
使用 PutRecord 添加单一记录
对 PutRecord
的每次调用对一个记录起作用。应首选 PutRecords
中描述的 使用 PutRecords 添加多条记录 操作,除非您的应用程序明确需要每一请求始终发送一条记录,或因某种其他原因无法使用 PutRecords
。
每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecord
向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecord
请求之间的时间段越长,序列号变得越大。
在快速连续进行放置时,不保证返回的序列号会递增,因为放置操作的发生对于 Kinesis Data Streams 实际上是同步进行的。要确保相同分区键的序列号严格递增,请使用 SequenceNumberForOrdering
参数,如 PutRecord 示例 代码示例中所示。
无论您是否使用 SequenceNumberForOrdering
,Kinesis Data Streams 通过 GetRecords
调用接收的记录都将按序列号严格进行排序。
注意
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。
分区键用于对流中的数据进行分组。数据记录将基于其分区键分配给流中的分片。具体来说,Kinesis Data Streams 使用分区键作为将分区键(和关联数据)映射到特定分片的哈希函数的输入。
作为此哈希机制的结果,具有相同分区键的所有数据记录将映射到流中的同一分片。但是,如果分区键的数量超出分片的数量,则一些分片必定会包含具有不同分区键的记录。从设计的角度看,要确保您的所有分片得到充分利用,分片的数量(由 setShardCount
的 CreateStreamRequest
方法指定)应远少于唯一分区键的数量,并且流至单一分区键的数据量应远少于分片容量。
PutRecord 示例
以下代码创建跨两个分区键分配的 10 条数据记录,并将它们放入名为 myStreamName
的流中。
for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }
上一个代码示例使用 setSequenceNumberForOrdering
来确保每个分区键内的顺序严格递增。要有效使用此参数,请将当前记录(记录 n)的 SequenceNumberForOrdering
设置为前一条记录(记录 n-1)的序列号。要获取已添加到流的记录的序列号,请对 putRecord
的结果调用 getSequenceNumber
。
SequenceNumberForOrdering
参数可确保相同分区键的序列号严格递增。SequenceNumberForOrdering
不提供跨多个分区键的记录排序。