使用 Amazon Kinesis Data Streams API 和 Amazon SDK for Java 开发创建器 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon Kinesis Data Streams API 和 Amazon SDK for Java 开发创建器

您可以使用 Amazon Kinesis Data Streams API 和适用于 Java 的 Amazon 开发工具包开发创建器。如果是首次使用 Kinesis Data Streams,请先熟悉 什么是 Amazon Kinesis Data Streams?开始使用 Amazon Kinesis Data Streams 中介绍的概念和术语。

这些示例讨论 Kinesis Data Streams API 并使用适用于 Java 的 Amazon 开发工具包向流中添加(放置)数据。但是,在大多数使用案例中,您应会更喜欢使用 Kinesis Data Streams KPL 库。有关更多信息,请参阅使用 Amazon Kinesis Producer Library 开发创建器

本章中的 Java 示例代码演示如何执行基本的 Kinesis Data Streams API 操作,并按照操作类型从逻辑上进行划分。这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。此外,您可使用其他编程语言调用 Kinesis Data Streams API。有关所有可用 Amazon 开发工具包的更多信息,请参阅开始使用亚马逊云科技开发

每个任务都有先决条件;例如,您在创建流之后才能向流中添加数据,而创建流需要先创建一个客户端。有关更多信息,请参阅创建和管理流

向流添加数据

在创建流之后,您可以记录的形式向其中添加数据。记录是一种数据结构,其中包含要处理的数据(采用数据 Blob 形式)。在将数据存储到记录中之后,Kinesis Data Streams 不会以任何形式检查、解释或更改数据。每个记录还有一个关联的序列号和分区键。

Kinesis Data Streams API 中有两种不同操作可向流添加数据:PutRecordsPutRecordPutRecords 操作将按 HTTP 请求向您的流发送多个记录,并且单个 PutRecord 操作一次可向您的流发送多个记录(每个记录需要单独的 HTTP 请求)。对于大多数应用程序,您应会更喜欢使用 PutRecords,因为这将使每个数据创建者实现更高的吞吐量。有关每种操作的更多信息,请参阅以下各小节。

请始终记住,在您的源应用程序使用 Kinesis Data Streams API 向流添加数据时,很可能存在一个或多个同时处理离开流的数据的使用器应用程序。有关使用器如何使用 Kinesis Data Streams API 获取数据的信息,请参阅 从流中获取数据

使用 PutRecords 添加多个记录

PutRecords 操作可在一个请求中向 Kinesis Data Streams 发送多条记录。向 Kinesis 数据流发送数据时,创建器可以使用 PutRecords 实现更高的吞吐量。每个 PutRecords 请求最多可以支持 500 条记录。请求中的每一个记录最大可以为 1 MB,整个请求的上限为 5 MB,包括分区键。与下面描述的单个 PutRecord 操作一样,PutRecords 将使用序列号和分区键。但是,PutRecord 参数 SequenceNumberForOrdering 未包含在 PutRecords 调用中。PutRecords 操作将尝试按请求的自然顺序处理所有记录。

每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecords 向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecords 请求之间的时间段越长,序列号变得越大。

注意

序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。

PutRecords 请求可包含具有不同分区键的记录。请求的应用范围是一个流;每个请求可包含分区键和记录的任何组合,直到达到请求限制。使用许多不同的分区键对具有许多不同分片的流进行的请求一般快于使用少量分区键对少量分片进行的请求。分区键的数量应远大于分片的数量以减少延迟并最大程度提高吞吐量。

PutRecords 示例

以下代码创建 100 个使用连续分区键的数据记录并将其放入名为 DataStream 的流中。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

PutRecords 响应包含响应 Records 的数组。响应数组中的每个记录按自然顺序(从请求和响应的顶部到底部)直接与请求数组中的一个记录关联。响应 Records 数组包含的记录数量始终与请求数组相同。

处理使用 PutRecords 时的失败

默认情况下,请求内的单个记录的失败不会中止对 PutRecords 请求中后续记录的处理。这意味着,响应 Records 数组包含处理成功和不成功的记录。您必须删除处理不成功的记录并在后续调用中包括它们。

成功的记录包括 SequenceNumberShardID 值,而不成功的记录包含 ErrorCodeErrorMessage 值。ErrorCode 参数反映了错误类型,可能为下列值之一:ProvisionedThroughputExceededExceptionInternalFailureErrorMessage 提供有关 ProvisionedThroughputExceededException 异常的更多详细信息,包括账户 ID、流名称和已阻止的记录的分片 ID。以下示例在 PutRecords 请求中有三个记录。第二个记录失败并反映在响应中。

例 PutRecords 请求语法
{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }
例 PutRecords 响应语法
{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

处理不成功的请求可包含在后续 PutRecords 请求中。首先,查看 putRecordsResult 中的 FailedRecordCount 参数以确认请求中是否存在失败的记录。如果存在,则应将具有 putRecordsEntry(不是 ErrorCode)的每个 null 添加到后续请求中。有关此类处理程序的示例,请参阅以下代码。

例 PutRecords 故障处理程序
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

使用 PutRecord 添加单一记录

PutRecord 的每次调用对一个记录起作用。应首选 PutRecords 中描述的 使用 PutRecords 添加多个记录 操作,除非您的应用程序明确需要每一请求始终发送一条记录,或因某种其他原因无法使用 PutRecords

每个数据记录都有一个唯一的序列号。此序列号在您调用 client.putRecord 向流添加数据记录之后由 Kinesis Data Streams 分配。同一分区键的序列号通常会随时间变化增加;PutRecord 请求之间的时间段越长,序列号变得越大。

在快速连续进行放置时,不保证返回的序列号会递增,因为放置操作的发生对于 Kinesis Data Streams 实际上是同步进行的。要确保相同分区键的序列号严格递增,请使用 SequenceNumberForOrdering 参数,如 PutRecord 示例 代码示例中所示。

无论您是否使用 SequenceNumberForOrdering,Kinesis Data Streams 通过 GetRecords 调用接收的记录都将按序列号严格进行排序。

注意

序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。

分区键用于对流中的数据进行分组。数据记录将基于其分区键分配给流中的分片。具体来说,Kinesis Data Streams 使用分区键作为将分区键(和关联数据)映射到特定分片的哈希函数的输入。

作为此哈希机制的结果,具有相同分区键的所有数据记录将映射到流中的同一分片。但是,如果分区键的数量超出分片的数量,则一些分片必定会包含具有不同分区键的记录。从设计的角度看,要确保您的所有分片得到充分利用,分片的数量(由 setShardCountCreateStreamRequest 方法指定)应远少于唯一分区键的数量,并且流至单一分区键的数据量应远少于分片容量。

PutRecord 示例

以下代码创建跨两个分区键分配的 10 条数据记录,并将它们放入名为 myStreamName 的流中。

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

上一个代码示例使用 setSequenceNumberForOrdering 来确保每个分区键内的顺序严格递增。要有效使用此参数,请将当前记录(记录 n)的 SequenceNumberForOrdering 设置为前一条记录(记录 n-1)的序列号。要获取已添加到流的记录的序列号,请对 putRecord 的结果调用 getSequenceNumber

SequenceNumberForOrdering 参数可确保相同分区键的序列号严格递增。SequenceNumberForOrdering 不提供跨多个分区键的记录排序。

使用 Amazon Glue 架构注册表与数据交互

您可以将 Kinesis Data Streams 与 Amazon Glue 架构注册表进行集成。Amazon Glue 架构注册表允许您集中发现、控制和演变架构,同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。通过 Amazon Glue 架构注册表,您能够改善流媒体应用程序中的端到端数据质量和数据治理。有关更多信息,请参阅 Amazon Glue Schema Registry。使用 Amazon Java 开发工具包中提供的 PutRecordsPutRecord Kinesis Data Streams API 可以设置此集成。

有关如何使用 PutRecords 和 PutRecord Kinesis Data Streams API 设置 Kinesis Data Streams 与 Schema 注册表集成的详细说明,请参阅 Use Case: Integrating Amazon Kinesis Data Streams with the Amazon Glue Schema Registry 中的“Interacting with Data Using the Kinesis Data Streams APIs”部分。