使用开发吞吐量共享的消费者 Amazon SDK for Java - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用开发吞吐量共享的消费者 Amazon SDK for Java

开发自定义 Kinesis Data Streams 使用者的方法之一是将 Amazon Kinesis Data APIs Streams 与 Amazon SDK for Java本节介绍如何将 Kinesis Data APIs Streams 与配合 Amazon SDK for Java使用。你可以使用其他不同的编程语言调用 Kinesis Data APIs Streams。有关所有可用内容的更多信息 Amazon SDKs,请参阅开始使用 Amazon Web Services 进行开发

本节中的 Java 示例代码演示了如何执行 Kinesis Data API Streams 的基本操作,并按操作类型进行了逻辑划分。这些示例并非可直接用于生产的代码。它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。

从流中获取数据

Kinesis Data APIs Streams 包括getShardIteratorgetRecords和方法,您可以调用这些方法从数据流中检索记录。这是拉取模型,您的代码可以直接从数据流的分片中抽取数据记录。

重要

我们建议您使用提供的KCL记录处理器支持从数据流中检索记录。这是推送模型,您可以通过实现代码来处理数据。从数据流中KCL检索数据记录并将其传送到您的应用程序代码。此外,还KCL提供故障转移、恢复和负载平衡功能。有关更多信息,请参阅使用共享吞吐量开发自定义使用者KCL

但是,在某些情况下,你可能更喜欢使用 Kinesis Dat APIs a Streams。例如,在实施自定义工具以监控或调试数据流时。

重要

Kinesis Data Streams 支持更改数据流的数据记录保留期。有关更多信息,请参阅 更改数据留存期

使用分片迭代器

可从流中按分片检索记录。对于每个分片以及您从分片中检索的每批记录,您必须获取分片迭代器。可在 getRecordsRequest 对象中使用分片迭代器来指定要从中检索记录的分片。与分片迭代器关联的类型决定了应在分片中检索记录的起点(有关更多信息,请参阅此部分中后面的内容)。您必须先检索分片,然后才能使用分片迭代器。有关更多信息,请参阅 列出分片

使用 getShardIterator 方法获取初始分片迭代器。使用 getNextShardIterator 对象(由 getRecordsResult 方法返回)的 getRecords 方法为其他记录批次获取分片迭代器。分片迭代器的有效时间为 5 分钟。如果使用有效期内的分片迭代器,则将获得一个新的迭代器。每个分片迭代器在 5 分钟内一直有效,即使使用过也是如此。

要获取初始分片迭代器,请实例化 GetShardIteratorRequest 并将其传递给 getShardIterator 方法。要配置请求,请指定流和分片 ID。有关如何在您的 Amazon 账户中获取直播的信息,请参阅列出流。有关如何获取流中分片的信息,请参阅 列出分片

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

此示例代码在获取初始分片迭代器时将 TRIM_HORIZON 指定为迭代器类型。此迭代器类型意味着记录应从添加到分片的第一个记录而不是从最近添加的记录(也称为顶端)开始返回。以下是可能的迭代器类型:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

有关更多信息,请参阅ShardIteratorType

部分迭代器类型除了需要指定类型之外,还需要指定序列号;例如:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

使用 getRecords 获取记录之后,可通过调用记录的 getSequenceNumber 方法来获取记录的序列号。

record.getSequenceNumber()

此外,将记录添加到数据流的代码可通过对 getSequenceNumber 的结果调用 putRecord 获取已添加记录的序列号。

lastSequenceNumber = putRecordResult.getSequenceNumber();

您可使用序列号确保记录的顺序严格递增。有关更多信息,请参阅 PutRecord 示例中的代码示例。

使用 GetRecords

获取分片迭代器之后,请实例化 GetRecordsRequest 对象。使用 setShardIterator 方法为请求指定迭代器。

(可选) 您还可使用 setLimit 方法设置要检索的记录的数量。getRecords 返回的记录数量始终等于或小于此限制。如果您未指定此限制,getRecords 将返回已检索记录的 10MB。以下示例代码将此限制设置为 25 个记录。

如果未返回任何记录,则意味着此分片中当前没有分片迭代器引用的序列号对应的可用数据记录。在这种情况下,您的应用程序应等待流的数据来源所需的时间。然后尝试使用对 getRecords 的上一调用返回的分片迭代器再次从分片获取数据。

getRecordsRequest 传递给 getRecords 方法并捕获返回的值作为 getRecordsResult 对象。要获取数据记录,请对 getRecords 对象调用 getRecordsResult 方法。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

要准备对 getRecords 的另一次调用,请通过 getRecordsResult 获取下一分片迭代器。

shardIterator = getRecordsResult.getNextShardIterator();

为获得最佳效果,请在对 getRecords 的各次调用之间停止至少 1 秒(1000 毫秒)以免超出 getRecords 频率限制。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

通常,您应循环调用 getRecords,甚至当您在测试方案中检索单一记录时也是如此。对 getRecords 的单一调用可能返回空的记录列表,即使分片包含更多具有之后的序列号的记录也是如此。出现此情况时,将返回 NextShardIterator,同时空记录列表将引用分片中之后的序列号,并且后续的 getRecords 调用最终将返回记录。以下示例演示循环的使用。

示例:getRecords

以下代码示例反映了此节中的 getRecords 顶端,包括循环发出调用。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

如果您使用 Kinesis Client Library,则可能在返回数据之前发出多次调用。此行为是设计使然,并不表示KCL或您的数据存在问题。

适应重新分片

如果 getRecordsResult.getNextShardIterator 返回 null,则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 CLOSED 状态,并且您已从其中读取了所有可用的数据记录。

在这种情况下,您可以使用 getRecordsResult.childShards 来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息,请参阅ChildShard

在拆分中,两个新分片的 parentShardId 都与您之前处理的分片的分片 ID 相同。这两个分片的 adjacentParentShardId 值为 null

在合并中,合并创建的一个新分片的 parentShardId 等于父分片之一的分片 ID,并且 adjacentParentShardId 等于另一父分片的分片 ID。您的应用程序已读取这些分片之一中的所有数据。这是 getRecordsResult.getNextShardIterator 返回 null 的分片。如果数据顺序对于您的应用程序很重要,则应确保它在读取合并创建的子分片中的任何新数据之前,还读取另一父分片中的所有数据。

如果您使用多个处理器从流检索数据(假定一个分片一个处理器),并且出现分片拆分或合并时,您应增加或减少处理器数量以适应分片数量的变化。

有关重新分片的更多信息,包括有关分片状态(如 CLOSED)的讨论,请参阅 对流进行重新分片