Amazon Kinesis Data Streams
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

通过将 Amazon Kinesis Data Streams API 与适用于 Java 的 AWS 开发工具包结合使用来开发 Amazon Kinesis Data Streams 使用者

您可以将 Amazon Kinesis Data Streams API 与适用于 Java 的 AWS 开发工具包结合使用来开发使用者。如果您是首次使用 Kinesis Data Streams,请先熟悉一下什么是 Amazon Kinesis Data Streams?开始使用 Amazon Kinesis Data Streams中介绍的概念和术语。

这些示例讨论 Kinesis Data Streams API 并使用适用于 Java 的 AWS 开发工具包从流中获取数据。但是,在大多数使用案例中,您应会更喜欢使用 Kinesis Data Streams KCL 库。有关更多信息,请参阅 使用 Kinesis Client Library 开发 Amazon Kinesis Data Streams 使用者

本章中的 Java 示例代码演示如何执行基本的 Kinesis Data Streams API 操作,并按照操作类型从逻辑上进行划分。这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。此外,您可使用其他不同的编程语言调用 Kinesis Data Streams API。有关所有可用 AWS 开发工具包的更多信息,请参阅开始使用 Amazon Web Services 开发

每个任务都有先决条件;例如,您在创建流之后才能向流中添加数据,而创建流需要先创建一个客户端。有关更多信息,请参阅 使用 Java 管理 Kinesis Stream

从流中获取数据

Kinesis Data Streams API 提供了用于从流检索数据的 getShardIteratorgetRecords 方法。这是一个拉模型,您的代码直接从流的分片中抽取数据。

我们建议您使用 Kinesis Client Library (KCL) 提供的记录处理器支持功能,以在使用者应用程序中获取流数据。这是一个推模型,您通过实现代码来处理数据。KCL 将从流中获取数据记录并将数据记录交付给您的应用程序代码。此外,KCL 还提供故障转移、恢复和负载均衡功能。有关更多信息,请参阅 使用 Kinesis Client Library 开发 Amazon Kinesis Data Streams 使用者

但是,在某些情况下,您可能倾向于将 Kinesis Data Streams API 与适用于 Java 的 AWS 开发工具包结合使用。例如,在实施自定义工具以监控或调试流时。

使用分片迭代器

可从流中按分片检索记录。对于每个分片以及您从分片中检索的每批记录,您需要获取分片迭代器。可在 getRecordsRequest 对象中使用分片迭代器来指定要从中检索记录的分片。与分片迭代器关联的类型决定了应在分片中检索记录的起点(有关更多详细信息,请参阅以下内容)。在可以使用分片迭代器之前,您需要先检索分片(从流中检索分片 中已讨论)。

使用 getShardIterator 方法获取初始分片迭代器。使用 getRecordsResult 对象(由 getRecords 方法返回)的 getNextShardIterator 方法为其他记录批次获取分片迭代器。分片迭代器在五分钟内有效。如果您使用有效期内的分片迭代器,则将获得一个新的迭代器。请注意,每个分片迭代器在五分钟内都保持有效,即使使用过也是如此。

要获取初始分片迭代器,请实例化 GetShardIteratorRequest 并将其传递给 getShardIterator 方法。要配置请求,请指定流和分片 ID。有关如何获取您的 AWS 账户中的流的信息,请参阅 列出流。有关如何获取流中分片的信息,请参阅 从流中检索分片

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

此示例代码在获取初始分片迭代器时将 TRIM_HORIZON 指定为迭代器类型。此迭代器类型意味着记录应从添加到分片的第一个记录而不是从最近添加的记录(也称为顶端)开始返回。以下是可能的迭代器类型:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

有关更多信息,请参阅 ShardIteratorType

部分迭代器类型除了需要指定类型之外,还需要指定序列号。例如:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

在您使用 getRecords 获取记录之后,您可通过调用记录的 getSequenceNumber 方法获取记录的序列号。

record.getSequenceNumber()

此外,将记录添加到数据流的代码可通过对 putRecord 的结果调用 getSequenceNumber 获取已添加记录的序列号。

lastSequenceNumber = putRecordResult.getSequenceNumber();

您可使用序列号确保记录的顺序严格递增。有关更多信息,请参阅PutRecord 示例中的代码示例。

使用 GetRecords

在获取分片迭代器之后,实例化 GetRecordsRequest 对象。使用 setShardIterator 方法为请求指定迭代器。

(可选) 您还可使用 setLimit 方法设置要检索的记录的数量。getRecords 返回的记录数量始终等于或小于此限制。如果您未指定此限制,getRecords 将返回已检索记录的 10 MB。以下示例代码将此限制设置为 25 个记录。

如果未返回任何记录,则意味着此分片中当前没有分片迭代器引用的序列号对应的可用数据记录。出现此情况时,您的应用程序应等待流的数据源所需的时间,但至少为 1 秒。然后尝试使用对 getRecords 的上一调用返回的分片迭代器再次从分片获取数据。请注意,记录添加到流的时间与它在 getRecords 中可用的时间之间约有 3 秒的延迟。

getRecordsRequest 传递给 getRecords 方法并捕获返回的值作为 getRecordsResult 对象。要获取数据记录,请对 getRecordsResult 对象调用 getRecords 方法。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

要准备对 getRecords 的另一次调用,请通过 getRecordsResult 获取下一分片迭代器。

shardIterator = getRecordsResult.getNextShardIterator();

为获得最佳效果,请在对 getRecords 的各次调用之间停止至少 1 秒(1000 毫秒)以免超出 getRecords 频率限制。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

通常,您应循环调用 getRecords,甚至当您在测试方案中检索单一记录时也是如此。对 getRecords 的单一调用可能返回空的记录列表,即使分片包含更多具有之后的序列号的记录也是如此。出现此情况时,将返回 NextShardIterator,同时空记录列表将引用分片中之后的序列号,并且后续的 getRecords 调用最终将返回记录。以下示例演示循环的使用。

示例:getRecords

以下代码示例反映了此节中的 getRecords 顶端,包括循环发出调用。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

如果您使用的是 Kinesis Client Library,请注意,KCL 可能在返回数据之前发出多次调用。此行为是设计使然,不代表 KCL 或您的数据存在问题。

适应重新分片

如果 getRecordsResult.getNextShardIterator 返回 null,则指示以下内容:出现了涉及分片的分片拆分或合并,此分片现在处于 CLOSED 状态,并且您已读取此分片中的所有可用数据记录。

在此方案中,您应重新枚举流中的分片以选取通过拆分或合并创建的新分片。

在拆分中,两个新分片的 parentShardId 都与您之前处理的分片的分片 ID 相同。这两个分片的 adjacentParentShardId 值为 null

在合并中,合并创建的一个新分片的 parentShardId 等于父分片之一的分片 ID,并且 adjacentParentShardId 等于另一父分片的分片 ID。您的应用程序已读取这些分片之一中的所有数据;这是 getRecordsResult.getNextShardIterator 返回 null 的分片。如果数据顺序对于您的应用程序很重要,则应确保它在读取合并创建的子分片中的任何新数据之前,读取另一父分片中的所有数据。

如果您使用多个处理器从流检索数据,假定一个分片一个处理器,并且出现分片拆分或合并时,您应增加或减少处理器数量以适应分片数量的变化。

有关重新分片的更多信息,包括有关分片状态(如 CLOSED)的讨论,请参阅 对流进行重新分片