通过将 Amazon Kinesis Data Streams API 与结合使用来开发创建器Amazon适用于 Java 的 开发工具包 - Amazon Kinesis Data Streams
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

通过将 Amazon Kinesis Data Streams API 与结合使用来开发创建器Amazon适用于 Java 的 开发工具包

您可使用 Amazon Kinesis Data Streams API 与结合使用来开发创建者AmazonSDK for Java。如果您是首次使用 Kinesis Data Streams,请先熟悉一下什么是 Amazon Kinesis Data Streams?Amazon Kinesis Data Streams 入门

这些示例讨论Kinesis Data Streams API并使用Amazon适用于 Java 的 开发工具包向流添加(放置)数据。但是,在大多数使用案例中,您应会更喜欢 Kinesis Data Streams KPL 库。有关更多信息,请参阅 使用 Amazon Kinesis 创建器库开发创建器

本章中的 Java 示例演示如何执行基本的 Kinesis Data Streams API 操作,并按操作类型从逻辑上划分这些操作。这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。此外,您可以调用Kinesis Data Streams API使用其他编程语言。有关的更多信息Amazon开发工具包,请参阅开始使用 Amazon Web Services 开发

每个任务都有先决条件;例如,您在创建流之后才能向流中添加数据,而创建流需要先创建一个客户端。有关更多信息,请参阅 创建和管理流

向流添加数据

在创建流之后,您可以记录的形式向其中添加数据。记录是一种数据结构,其中包含要处理的数据(采用数据 Blob 形式)。在您将数据存储到记录中之后,Kinesis Data Streams 不会以任何形式检查、解释或更改数据。每个记录还有一个关联的序列号和分区键。

Kinesis Data Streams API 中有两种不同的向流添加数据的操作:PutRecordsPutRecordPutRecords 操作将按 HTTP 请求向您的流发送多个记录,并且单个 PutRecord 操作一次可向您的流发送多个记录(每个记录需要单独的 HTTP 请求)。对于大多数应用程序,您应会更喜欢使用 PutRecords,因为这将使每个数据创建者实现更高的吞吐量。有关每种操作的更多信息,请参阅以下各小节。

请始终记住,在您的源应用程序使用 Kinesis Data Streams API 向流添加数据时,很可能存在一个或多个同时处理离开流的数据的使用者应用程序。有关使用者使用 Kinesis Data Streams API 获取数据的信息,请参阅从流中获取数据

使用 PutRecords 添加多个记录

这些区域有:PutRecords操作可在一次请求中向 Kinesis Data Streams 发送多个记录。通过使用PutRecords,创建者可在向 Kinesis 数据流发送数据时实现更高的吞吐量。EACHPutRecords请求最多可以支持 500 条记录。请求中的每一个记录最大可以为 1 MB,整个请求的上限为 5 MB,包括分区键。与单PutRecord操作如下所述,PutRecords使用序列号和分区键。但是,PutRecord 参数 SequenceNumberForOrdering 未包含在 PutRecords 调用中。PutRecords 操作将尝试按请求的自然顺序处理所有记录。

每个数据记录都有一个唯一的序列号。此序列号在您调用之后由 Kinesis Data Streams 分配client.putRecords向流添加数据记录。同一分区键的序列号通常会随时间变化增加;PutRecords 请求之间的时间段越长,序列号变得越大。

注意

序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。

PutRecords 请求可包含具有不同分区键的记录。请求的应用范围是一个流;每个请求可包含分区键和记录的任何组合,直到达到请求限制。使用许多不同的分区键对具有许多不同分片的流进行的请求一般快于使用少量分区键对少量分片进行的请求。分区键的数量应远大于分片的数量以减少延迟并最大程度提高吞吐量。

PutRecords 示例

以下代码创建 100 个使用连续分区键的数据记录并将其放入名为 DataStream 的流中。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

PutRecords 响应包含响应 Records 的数组。响应数组中的每个记录按自然顺序(从请求和响应的顶部到底部)直接与请求数组中的一个记录关联。响应 Records 数组包含的记录数量始终与请求数组相同。

处理使用 PutRecords 时的失败

默认情况下,请求内的单个记录的失败不会中止对 PutRecords 请求中后续记录的处理。这意味着,响应 Records 数组包含处理成功和不成功的记录。您必须删除处理不成功的记录并在后续调用中包括它们。

成功的记录包括 SequenceNumberShardID 值,而不成功的记录包含 ErrorCodeErrorMessage 值。ErrorCode 参数反映了错误类型,可能为下列值之一:ProvisionedThroughputExceededExceptionInternalFailureErrorMessage 提供有关 ProvisionedThroughputExceededException 异常的更多详细信息,包括账户 ID、流名称和已阻止的记录的分片 ID。以下示例在 PutRecords 请求中有三个记录。第二个记录失败并反映在响应中。

例 PutRecords 请求语法

{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }

例 PutRecords 响应语法

{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

处理不成功的请求可包含在后续 PutRecords 请求中。首先,查看 putRecordsResult 中的 FailedRecordCount 参数以确认请求中是否存在失败的记录。如果存在,则应将具有 putRecordsEntry(不是 ErrorCode)的每个 null 添加到后续请求中。有关此类处理程序的示例,请参阅以下代码。

例 PutRecords 故障处理程序

PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

使用 PutRecord 添加单一记录

PutRecord 的每次调用对一个记录起作用。应首选 PutRecords 中描述的 使用 PutRecords 添加多个记录 操作,除非您的应用程序明确需要每一请求始终发送一条记录,或因某种其他原因无法使用 PutRecords

每个数据记录都有一个唯一的序列号。此序列号在您调用之后由 Kinesis Data Streams 分配client.putRecord向流添加数据记录。同一分区键的序列号通常会随时间变化增加;PutRecord 请求之间的时间段越长,序列号变得越大。

在快速连续进行放置时,不保证返回的序列号会递增,因为放置操作的发生对于 Kinesis Data Streams 来说是同步进行的。要确保相同分区键的序列号严格递增,请使用 SequenceNumberForOrdering 参数,如 PutRecord 示例 代码示例中所示。

无论您是否使用SequenceNumberForOrdering,记录 Kinesis Data Streams 通过GetRecords调用是严格按序列号排序的。

注意

序列号不能用作相同流中的数据集的索引。为了在逻辑上分隔数据集,请使用分区键或者为每个数据集创建单独的流。

分区键用于对流中的数据进行分组。数据记录将基于其分区键分配给流中的分片。具体来说,Kinesis Data Streams 使用分区键作为将分区键(和关联数据)映射到特定分片的哈希函数的输入。

作为此哈希机制的结果,具有相同分区键的所有数据记录将映射到流中的同一分片。但是,如果分区键的数量超出分片的数量,则一些分片必定会包含具有不同分区键的记录。从设计的角度看,要确保您的所有分片得到充分利用,分片的数量(由 setShardCountCreateStreamRequest 方法指定)应远少于唯一分区键的数量,并且流至单一分区键的数据量应远少于分片容量。

PutRecord 示例

以下代码创建跨两个分区键分配的 10 条数据记录,并将它们放入名为 myStreamName 的流中。

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

上一个代码示例使用 setSequenceNumberForOrdering 来确保每个分区键内的顺序严格递增。要有效地使用此参数,请将SequenceNumberForOrdering当前记录(记录n)设置为前面记录的序列号(记录n-1)。要获取已添加到流的记录的序列号,请对 putRecord 的结果调用 getSequenceNumber

SequenceNumberForOrdering 参数可确保相同分区键的序列号严格递增。SequenceNumberForOrdering 不提供跨多个分区键的记录排序。

与数据交互使用AmazonGlue 架构注册表

您可以将 Kinesis 数据流与AmazonGlue 模式注册表。这些区域有:AmazonGlue 模式注册表允许您集中发现、控制和发展模式,同时确保生成的数据通过注册模式持续验证。架构定义数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。这些区域有:AmazonGlue 架构注册表使您能够改进流应用程序中的端到端数据质量和数据治理。有关更多信息,请参阅 。AmazonGlue 架构注册表。设置此集成的方法之一是通过PutRecordsPutRecordKinesis Data Streams API 可在AmazonJava 开发工具包。

有关如何使用 PutRecords 和 PutRecord 动态数据流 API 设置 Kinesis 数据流与架构注册表集成的详细说明,请参阅使用案例:将 Amazon Kinesis Data Streams 与AmazonGlue 架构注册表