本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Amazon Kinesis Data Streams API 与结合使用来开发创建者Amazon SDK for Java
您可使用 Amazon Kinesis Data Streams API 与结合使用来开发创建者。AmazonSDK for Java。如果您是首次使用 Kinesis Data Streams,请先熟悉一下中介绍的概念和术语。什么是 Amazon Kinesis Data Streams?和Amazon Kinesis Data Streams 入门.
这些示例讨论Kinesis Data Streams API然后使用Amazon适用于 Java 的开发工具包
本章中的 Java 示例代码演示如何执行基本的 Kinesis Data Streams API 操作,并按照操作类型从逻辑上进行划分。这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。此外,您可以调用Kinesis Data Streams API使用其他编程语言。有关所有可用的更多信息Amazon开发工具包,请参阅使用 Amazon Web Services 开发
每个任务都有先决条件;例如,您在创建流之后才能向流中添加数据,而创建流需要先创建一个客户端。有关更多信息,请参阅 创建和管理流。
向流添加数据
在创建流之后,您可以记录的形式向其中添加数据。记录是一种数据结构,其中包含要处理的数据(采用数据 Blob 形式)。在将数据存储到记录中之后,Kinesis Data Streams 不会以任何形式检查、解释或更改数据。每个记录还有一个关联的序列号和分区键。
Kinesis Data Streams API 中有两种不同的向流添加数据的操作:PutRecords
和PutRecord
. PutRecords
操作将按 HTTP 请求向您的流发送多个记录,并且单个 PutRecord
操作一次可向您的流发送多个记录(每个记录需要单独的 HTTP 请求)。对于大多数应用程序,您应会更喜欢使用 PutRecords
,因为这将使每个数据创建者实现更高的吞吐量。有关每种操作的更多信息,请参阅以下各小节。
请始终记住,在您的源应用程序使用 Kinesis Data Streams API 向流添加数据时,很可能存在一个或多个同时处理离开流的数据的使用者应用程序。有关使用者使用 Kinesis Data Streams API 获取数据的信息,请参阅从流中获取数据.
使用添加多个记录PutRecords
这些区域有:PutRecords
操作在一次请求中向 Kinesis Data Streams 发送多个记录。通过使用PutRecords
,创建器可在向 Kinesis Data Streams 发送数据时实现更高的吞吐量。EARDSPutRecords
请求最多可以支持 500 条记录。请求中的每一个记录最大可以为 1 MB,整个请求的上限为 5 MB,包括分区键。和单一一样PutRecord
下面描述的操作,PutRecords
使用序列号和分区键。但是,PutRecord
参数 SequenceNumberForOrdering
未包含在 PutRecords
调用中。PutRecords
操作将尝试按请求的自然顺序处理所有记录。
每个数据记录都有一个唯一的序列号。此序列号在您调用之后由 Kinesis Data Streams 分配。client.putRecords
向流添加数据记录。同一分区键的序列号通常会随时间变化增加;PutRecords
请求之间的时间段越长,序列号变得越大。
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。
PutRecords
请求可包含具有不同分区键的记录。请求的应用范围是一个流;每个请求可包含分区键和记录的任何组合,直到达到请求限制。使用许多不同的分区键对具有许多不同分片的流进行的请求一般快于使用少量分区键对少量分片进行的请求。分区键的数量应远大于分片的数量以减少延迟并最大程度提高吞吐量。
PutRecords示例
以下代码创建 100 个使用连续分区键的数据记录并将其放入名为 DataStream
的流中。
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);
PutRecords
响应包含响应 Records
的数组。响应数组中的每个记录按自然顺序(从请求和响应的顶部到底部)直接与请求数组中的一个记录关联。响应 Records
数组包含的记录数量始终与请求数组相同。
处理使用时的失败PutRecords
默认情况下,请求内的单个记录的失败不会中止对 PutRecords
请求中后续记录的处理。这意味着,响应 Records
数组包含处理成功和不成功的记录。您必须删除处理不成功的记录并在后续调用中包括它们。
成功的记录包括 SequenceNumber
和 ShardID
值,而不成功的记录包含 ErrorCode
和 ErrorMessage
值。ErrorCode
参数反映了错误类型,可能为下列值之一:ProvisionedThroughputExceededException
或 InternalFailure
。ErrorMessage
提供有关 ProvisionedThroughputExceededException
异常的更多详细信息,包括账户 ID、流名称和已阻止的记录的分片 ID。以下示例在 PutRecords
请求中有三个记录。第二个记录失败并反映在响应中。
例 PutRecords请求语法
{
"Records": [
{
"Data": "XzxkYXRhPl8w",
"PartitionKey": "partitionKey1"
},
{
"Data": "AbceddeRFfg12asd",
"PartitionKey": "partitionKey1"
},
{
"Data": "KFpcd98*7nd1",
"PartitionKey": "partitionKey3"
}
],
"StreamName": "myStream"
}
例 PutRecords响应语法
{
"FailedRecordCount”: 1,
"Records": [
{
"SequenceNumber": "21269319989900637946712965403778482371",
"ShardId": "shardId-000000000001"
},
{
“ErrorCode":”ProvisionedThroughputExceededException”,
“ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"SequenceNumber": "21269319989999637946712965403778482985",
"ShardId": "shardId-000000000002"
}
]
}
处理不成功的请求可包含在后续 PutRecords
请求中。首先,查看 putRecordsResult
中的 FailedRecordCount
参数以确认请求中是否存在失败的记录。如果存在,则应将具有 putRecordsEntry
(不是 ErrorCode
)的每个 null
添加到后续请求中。有关此类处理程序的示例,请参阅以下代码。
例 PutRecords失败处理程
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }
使用添加单一记录PutRecord
对 PutRecord
的每次调用对一个记录起作用。应首选 PutRecords
中描述的 使用添加多个记录PutRecords 操作,除非您的应用程序明确需要每一请求始终发送一条记录,或因某种其他原因无法使用 PutRecords
。
每个数据记录都有一个唯一的序列号。此序列号在您调用之后由 Kinesis Data Streams 分配。client.putRecord
向流添加数据记录。同一分区键的序列号通常会随时间变化增加;PutRecord
请求之间的时间段越长,序列号变得越大。
在快速连续进行放置时,不保证返回的序列号会递增,因为放置操作的发生对于 Kinesis Data Streams 实际上是同步进行的。要确保相同分区键的序列号严格递增,请使用 SequenceNumberForOrdering
参数,如 PutRecord示例 代码示例中所示。
你是否使用SequenceNumberForOrdering
,Kinesis Data Streams 通过GetRecords
呼叫严格按序列号排序。
序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。
分区键用于对流中的数据进行分组。数据记录将基于其分区键分配给流中的分片。具体来说,Kinesis Data Streams 将分区键作为将分区键(和关联数据)映射到特定分片的哈希函数的输入。
作为此哈希机制的结果,具有相同分区键的所有数据记录将映射到流中的同一分片。但是,如果分区键的数量超出分片的数量,则一些分片必定会包含具有不同分区键的记录。从设计的角度看,要确保您的所有分片得到充分利用,分片的数量(由 setShardCount
的 CreateStreamRequest
方法指定)应远少于唯一分区键的数量,并且流至单一分区键的数据量应远少于分片容量。
PutRecord示例
以下代码创建跨两个分区键分配的 10 条数据记录,并将它们放入名为 myStreamName
的流中。
for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }
上一个代码示例使用 setSequenceNumberForOrdering
来确保每个分区键内的顺序严格递增。要有效地使用此参数,请将SequenceNumberForOrdering
当前记录的(记录)n) 到前一条记录的序列号(记录)n-1)。要获取已添加到流的记录的序列号,请对 putRecord
的结果调用 getSequenceNumber
。
SequenceNumberForOrdering
参数可确保相同分区键的序列号严格递增。SequenceNumberForOrdering
不提供跨多个分区键的记录排序。
使用与数据进行交互Amazon架 Glue 注册表
您可以将 Kinesis 数据流与AmazonGlue 模式注册表。这些区域有:Amazon使用 Glue 架构注册表,您可以集中发现、控制和演变架构,同时确保通过注册表持续验证创建的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。这些区域有:AmazonGlue 模式注册表使你能够改进end-to-end流媒体应用程序中的数据质量和数据治理。有关更多信息,请参阅 。Amazon架 Glue 注册表. 设置此集成的方法之一是通过PutRecords
和PutRecord
Kinesis Data Streams API 可在AmazonJava 开发工具包。
有关如何使用将 Kinesis Data Streams 与架构注册表集成的详细说明PutRecords和PutRecordKinesis Data Streams API,请参阅中的 “使用 Kinesis Data Streams API 与数据交互” 部分使用案例:将 Amazon Kinesis Data Streams 与Amazon架 Glue 注册表.