使用 Amazon SDK for Java 开发具有共享吞吐量的自定义使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon SDK for Java 开发具有共享吞吐量的自定义使用者

开发自定义 Kinesis Data Streams 消费者在整个过程中共享的方法之一是使用 Amazon Kinesis Data Streams API。本部分介绍如何将 Kinesis Data Streams API 与AmazonSDK for Java。本部分中的 Java 示例代码演示如何执行基本的 KDS API 操作,并按照操作类型从逻辑上进行划分。

这些示例并非可直接用于生产的代码。它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。

您可使用其他不同的编程语言调用 Kinesis Data Streams API。有关所有可用的更多信息Amazon开发工具包,请参阅开始使用 Amazon Web Services 开发.

重要

开发自定义 Kinesis Data Streams 使用者的推荐方法是使用 Kinesis 客户端库 (KCL)。KCL 通过处理与分布式计算相关的许多复杂任务来帮助您使用和处理来自 Kinesis 数据流的数据。有关更多信息,请参阅 。使用 KCL 开发具有共享吞吐量的自定义使用者.

从流中获取数据

Kinesis Data Streams API 包括getShardIteratorgetRecords您可以调用这些方法从数据流中检索记录。这是拉模型,您的代码直接从数据流的分片中抽取数据记录。

重要

我们建议您使用 KCL 提供的记录处理器支持从数据流中检索记录。这是推模型,您通过实现代码来处理数据。KCL 从数据流中检索数据记录并将数据记录交付给您的应用程序代码。此外,KCL 还提供故障转移、恢复和负载均衡功能。有关更多信息,请参阅 。使用 KCL 开发具有共享吞吐量的自定义使用者.

但是,在某些情况下,您可能倾向于使用 Kinesis Data Streams API。例如,实施自定义工具以监控或调试数据流。

重要

Kinesis Data Streams 支持更改数据流的数据记录保留期。有关更多信息,请参阅 更改数据保留期

使用分片迭代器

可从流中按分片检索记录。对于每个分片以及您从分片中检索的每批记录,您必须获取分片迭代器。可在 getRecordsRequest 对象中使用分片迭代器来指定要从中检索记录的分片。与分片迭代器关联的类型决定了应在分片中检索记录的起点(有关更多信息,请参阅此部分中后面的内容)。您需要先检索分片(DescribeStreamAPI-已弃用中已讨论),然后才能使用分片迭代器。

使用 getShardIterator 方法获取初始分片迭代器。使用 getNextShardIterator 对象(由 getRecordsResult 方法返回)的 getRecords 方法为其他记录批次获取分片迭代器。分片迭代器的有效时间为 5 分钟。如果使用有效期内的分片迭代器,则将获得一个新的迭代器。每个分片迭代器在 5 分钟内一直有效,即使使用过也是如此。

要获取初始分片迭代器,请实例化 GetShardIteratorRequest 并将其传递给 getShardIterator 方法。要配置请求,请指定流和分片 ID。有关如何获取您的流的信息Amazon账户,请参阅列出流. 有关如何获取流中分片的信息,请参阅 DescribeStreamAPI-已弃用

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

此示例代码在获取初始分片迭代器时将 TRIM_HORIZON 指定为迭代器类型。此迭代器类型意味着记录应从添加到分片的第一个记录而不是从最近添加的记录(也称为小费. 以下是可能的迭代器类型:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

有关更多信息,请参阅 。ShardIterator类型.

部分迭代器类型除了需要指定类型之外,还需要指定序列号;例如:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

使用 getRecords 获取记录之后,可通过调用记录的 getSequenceNumber 方法来获取记录的序列号。

record.getSequenceNumber()

此外,将记录添加到数据流的代码可通过对 getSequenceNumber 的结果调用 putRecord 获取已添加记录的序列号。

lastSequenceNumber = putRecordResult.getSequenceNumber();

您可使用序列号确保记录的顺序严格递增。有关更多信息,请参阅 PutRecord示例中的代码示例。

使用GetRecords

获取分片迭代器之后,请实例化 GetRecordsRequest 对象。使用 setShardIterator 方法为请求指定迭代器。

(可选) 您还可使用 setLimit 方法设置要检索的记录的数量。getRecords 返回的记录数量始终等于或小于此限制。如果您未指定此限制,getRecords返回 10 MB 检索的记录。以下示例代码将此限制设置为 25 个记录。

如果未返回任何记录,则意味着此分片中当前没有分片迭代器引用的序列号对应的可用数据记录。在这种情况下,您的应用程序应等待流的数据源所需的时间,但至少为 1 秒。然后尝试使用对 getRecords 的上一调用返回的分片迭代器再次从分片获取数据。记录添加到流的时间与它在 getRecords 中可用的时间之间约有 3 秒的延迟。

getRecordsRequest 传递给 getRecords 方法并捕获返回的值作为 getRecordsResult 对象。要获取数据记录,请对 getRecords 对象调用 getRecordsResult 方法。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

要准备对 getRecords 的另一次调用,请通过 getRecordsResult 获取下一分片迭代器。

shardIterator = getRecordsResult.getNextShardIterator();

为获得最佳效果,请在对 getRecords 的各次调用之间停止至少 1 秒(1000 毫秒)以免超出 getRecords 频率限制。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

通常,您应循环调用 getRecords,甚至当您在测试方案中检索单一记录时也是如此。对 getRecords 的单一调用可能返回空的记录列表,即使分片包含更多具有之后的序列号的记录也是如此。出现此情况时,将返回 NextShardIterator,同时空记录列表将引用分片中之后的序列号,并且后续的 getRecords 调用最终将返回记录。以下示例演示循环的使用。

示例:getRecords

以下代码示例反映了此节中的 getRecords 顶端,包括循环发出调用。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

如果您使用的是 Kinesis 客户端库,可能在返回数据之前发出多次调用。此行为是设计使然,不代表 KCL 或您的数据存在问题。

适应重新分片

如果getRecordsResult.getNextShardIterator回报null,它表示发生了涉及该分片的分片拆分或合并。这个碎片现在在CLOSED状态,并且您已经从该分片中读取了所有可用的数据记录。

在此方案中,您可以使用getRecordsResult.childShards了解通过拆分或合并创建的正在处理的分片的新子分片。有关更多信息,请参阅 。ChildShard.

在拆分中,两个新分片的 parentShardId 都与您之前处理的分片的分片 ID 相同。这两个分片的 adjacentParentShardId 值为 null

在合并中,合并创建的一个新分片的 parentShardId 等于父分片之一的分片 ID,并且 adjacentParentShardId 等于另一父分片的分片 ID。您的应用程序已读取这些分片之一中的所有数据。这是 getRecordsResult.getNextShardIterator 返回 null 的分片。如果数据顺序对于您的应用程序很重要,则应确保它在读取合并创建的子分片中的任何新数据之前,还读取另一父分片中的所有数据。

如果您使用多个处理器从流检索数据(假定一个分片一个处理器),并且出现分片拆分或合并时,您应增加或减少处理器数量以适应分片数量的变化。

有关重新分片的更多信息,包括有关分片状态(如 CLOSED)的讨论,请参阅 对流进行重新分片

使用Amazon架 Glue 注册表

您可以将 Kinesis 数据流与AmazonGlue 模式注册表。这些区域有:AmazonGlue 架构注册表允许您集中发现、控制和演变架构,同时确保已注册表持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。这些区域有:AmazonGlue 模式注册表使你能够改进end-to-end流媒体应用程序中的数据质量和数据治理。有关更多信息,请参阅 。Amazon架 Glue 注册表. 设置此集成的方法之一是通过GetRecordsKinesis Data Streams API 可在AmazonJava 开发工具包。

有关如何使用设置 Kinesis Data Streams 与架构注册表集成的详细说明。GetRecordsKinesis Data Streams API,请参阅中的 “使用 Kinesis Data Streams API 与数据交互” 部分使用案例:将 Amazon Kinesis Data Streams 与Amazon架 Glue 注册表.