使用 Amazon SDK for Java 开发具有共享吞吐量的自定义使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon SDK for Java 开发具有共享吞吐量的自定义使用者

开发具有共享吞吐量的自定义 Kinesis Data Streams 使用器的方法之一是使用 Amazon Kinesis Data Streams API。本部分介绍如何将 Kinesis Data Streams API 与适用于 Java 的 Amazon 开发工具包配合使用。此部分中的 Java 示例代码演示如何执行基本的 KDS API 操作,并按照操作类型从逻辑上进行划分。

这些示例并非可直接用于生产的代码。它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。

您可以使用其他不同的编程语言调用 Kinesis Data Streams API。有关所有可用 Amazon 开发工具包的更多信息,请参阅开始使用亚马逊云科技开发

重要

有关开发具有共享吞吐量的自定义 Kinesis Data Streams 使用器的方法,建议使用 Kinesis Client Library(KCL)。KCL 通过处理许多与分布式计算相关的复杂任务,帮助您使用和处理 Kinesis 数据流中的数据。有关更多信息,请参阅 Developing Custom Consumers with Shared Throughput Using KCL

从流中获取数据

Kinesis Data Streams API 包括 getShardIteratorgetRecords 方法,您可以调用此类方法从数据流中检索记录。这是拉取模型,您的代码可以直接从数据流的分片中抽取数据记录。

重要

我们建议您使用由 KCL 提供的记录处理器支持功能,以从数据流中检索记录。这是推送模型,您可以通过实现代码来处理数据。KCL 将从数据流中获取数据记录并将数据记录传送给您的应用程序代码。此外,KCL 还提供失效转移、恢复和负载均衡功能。有关更多信息,请参阅 Developing Custom Consumers with Shared Throughput Using KCL

但是,在某些情况下,您可能会更倾向于使用 Kinesis Data Streams API。例如,在实施自定义工具以监控或调试数据流时。

重要

Kinesis Data Streams 支持更改数据流的数据记录保留期。有关更多信息,请参阅更改数据保留期

使用分片迭代器

可从流中按分片检索记录。对于每个分片以及您从分片中检索的每批记录,您必须获取分片迭代器。可在 getRecordsRequest 对象中使用分片迭代器来指定要从中检索记录的分片。与分片迭代器关联的类型决定了应在分片中检索记录的起点(有关更多信息,请参阅此部分中后面的内容)。您需要先检索分片(DescribeStream API-已弃用中已讨论),然后才能使用分片迭代器。

使用 getShardIterator 方法获取初始分片迭代器。使用 getNextShardIterator 对象(由 getRecordsResult 方法返回)的 getRecords 方法为其他记录批次获取分片迭代器。分片迭代器的有效时间为 5 分钟。如果使用有效期内的分片迭代器,则将获得一个新的迭代器。每个分片迭代器在 5 分钟内一直有效,即使使用过也是如此。

要获取初始分片迭代器,请实例化 GetShardIteratorRequest 并将其传递给 getShardIterator 方法。要配置请求,请指定流和分片 ID。有关如何获取 Amazon 账户中的流的信息,请参阅 列出流。有关如何获取流中分片的信息,请参阅 DescribeStream API-已弃用

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

此示例代码在获取初始分片迭代器时将 TRIM_HORIZON 指定为迭代器类型。此迭代器类型意味着记录应从添加到分片的第一个记录而不是从最近添加的记录(也称为顶端)开始返回。以下是可能的迭代器类型:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

有关更多信息,请参阅 ShardIteratorType

部分迭代器类型除了需要指定类型之外,还需要指定序列号;例如:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

使用 getRecords 获取记录之后,可通过调用记录的 getSequenceNumber 方法来获取记录的序列号。

record.getSequenceNumber()

此外,将记录添加到数据流的代码可通过对 getSequenceNumber 的结果调用 putRecord 获取已添加记录的序列号。

lastSequenceNumber = putRecordResult.getSequenceNumber();

您可使用序列号确保记录的顺序严格递增。有关更多信息,请参阅 PutRecord 示例中的代码示例。

使用 GetRecords

获取分片迭代器之后,请实例化 GetRecordsRequest 对象。使用 setShardIterator 方法为请求指定迭代器。

(可选) 您还可使用 setLimit 方法设置要检索的记录的数量。getRecords 返回的记录数量始终等于或小于此限制。如果您未指定此限制,getRecords 将返回已检索记录的 10MB。以下示例代码将此限制设置为 25 个记录。

如果未返回任何记录,则意味着此分片中当前没有分片迭代器引用的序列号对应的可用数据记录。在这种情况下,您的应用程序应等待流的数据来源所需的时间。然后尝试使用对 getRecords 的上一调用返回的分片迭代器再次从分片获取数据。

getRecordsRequest 传递给 getRecords 方法并捕获返回的值作为 getRecordsResult 对象。要获取数据记录,请对 getRecords 对象调用 getRecordsResult 方法。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

要准备对 getRecords 的另一次调用,请通过 getRecordsResult 获取下一分片迭代器。

shardIterator = getRecordsResult.getNextShardIterator();

为获得最佳效果,请在对 getRecords 的各次调用之间停止至少 1 秒(1000 毫秒)以免超出 getRecords 频率限制。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

通常,您应循环调用 getRecords,甚至当您在测试方案中检索单一记录时也是如此。对 getRecords 的单一调用可能返回空的记录列表,即使分片包含更多具有之后的序列号的记录也是如此。出现此情况时,将返回 NextShardIterator,同时空记录列表将引用分片中之后的序列号,并且后续的 getRecords 调用最终将返回记录。以下示例演示循环的使用。

示例:getRecords

以下代码示例反映了此节中的 getRecords 顶端,包括循环发出调用。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

如果您使用 Kinesis Client Library,则可能在返回数据之前发出多次调用。此行为是设计使然,不代表 KCL 或您的数据存在问题。

适应重新分片

如果 getRecordsResult.getNextShardIterator 返回 null,则表示发生了涉及此分片的分片拆分或合并。此分片现在处于 CLOSED 状态,并且您已从其中读取了所有可用的数据记录。

在这种情况下,您可以使用 getRecordsResult.childShards 来了解正在处理的分片中由拆分或合并创建的新子分片。有关更多信息,请参阅 ChildShard

在拆分中,两个新分片的 parentShardId 都与您之前处理的分片的分片 ID 相同。这两个分片的 adjacentParentShardId 值为 null

在合并中,合并创建的一个新分片的 parentShardId 等于父分片之一的分片 ID,并且 adjacentParentShardId 等于另一父分片的分片 ID。您的应用程序已读取这些分片之一中的所有数据。这是 getRecordsResult.getNextShardIterator 返回 null 的分片。如果数据顺序对于您的应用程序很重要,则应确保它在读取合并创建的子分片中的任何新数据之前,还读取另一父分片中的所有数据。

如果您使用多个处理器从流检索数据(假定一个分片一个处理器),并且出现分片拆分或合并时,您应增加或减少处理器数量以适应分片数量的变化。

有关重新分片的更多信息,包括有关分片状态(如 CLOSED)的讨论,请参阅 对流进行重新分片

使用 Amazon Glue 架构注册表与数据交互

您可以将 Kinesis Data Streams 与 Amazon Glue 架构注册表进行集成。Amazon Glue 架构注册表允许您集中发现、控制和演变架构,同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。通过 Amazon Glue 架构注册表,您能够改善流媒体应用程序中的端到端数据质量和数据治理。有关更多信息,请参阅 Amazon Glue Schema Registry。使用 Amazon Java 开发工具包中提供的 GetRecords Kinesis Data Streams API 可以设置此集成。

有关如何使用 GetRecords Kinesis Data Streams API 设置 Kinesis Data Streams 与 Schema 注册表集成的详细说明,请参阅 Use Case: Integrating Amazon Kinesis Data Streams with the Amazon Glue Schema Registry 中的“Interacting with Data Using the Kinesis Data Streams APIs”部分。