Amazon Kinesis Data Streams
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 Amazon AWS 入门

使用者取消聚合

从版本 1.4.0 开始,KCL 支持自动取消聚合 KPL 用户记录。在更新 KCL 后,将编译利用早期版本的 KCL 编写的使用者应用程序代码而不做任何修改。不过,如果正在创建器端使用 KPL 聚合,则有一个与检查点操作相关的细微之处:已聚合记录中的所有子记录都具有相同的序列号,因此如果您需要区分子记录,则必须利用检查点存储额外数据。此额外数据称作子序列号

从早期版本的 KCL 进行迁移

您无需更改现有调用来同时执行检查点操作与聚合。仍确保您能够成功检索 Kinesis Data Streams 中存储的所有记录。KCL 现在提供两个新的检查点操作来支持特定的使用案例,如下所述。

如果您的现有代码先于 KPL 支持为 KCL 写入,并且调用了不带参数的检查点操作,则它等同于对批处理中的上一条 KPL 用户记录的序列号进行检查点操作。如果调用带序列号字符串的检查点操作,则它等同于对批处理的指定序列号以及隐式子序列号 0(零)进行检查点操作。

调用没有任何参数的新的 KCL 检查点操作 checkpoint() 在语义上等同于对批处理中的上个 Record 调用的序列号以及隐式子序列号 0(零)进行检查点操作。

调用新的 KCL 检查点操作 checkpoint(Record record) 在语义上等同于对指定 Record 的序列号以及隐式子序列号 0(零)进行检查点操作。如果 Record 调用实际为 UserRecord,则对 UserRecord 序列号和子序列号进行检查点操作。

调用新的 KCL 检查点操作 checkpoint(String sequenceNumber, long subSequenceNumber) 会对给定的序列号以及子序列号进行显式检查点操作。

在上述任意情况下,在将检查点存储在 Amazon DynamoDB 检查点表中后,KCL 可正确地恢复检索记录,即使是在应用程序发生崩溃并重新启动时也是如此。如果在序列中包含多条记录,则检索会从具有已进行检查点操作的最新序列号的记录中的下个子序列号记录开始。如果最新检查点包括上一条序列号记录的最新子序列号,则检索会从具有下个序列号的记录开始。

以下部分将讨论需要避免跳过和重复记录的使用者的序列和子序列检查点操作的详细信息。如果在停止并重新启动使用者的记录处理时跳过(或重复)记录并不重要,则可运行您的现有代码而无需进行修改。

KPL 取消聚合的 KCL 扩展

如上述讨论,KPL 取消聚合会涉及子序列检查点操作。为了有助于使用子序列检查点操作,已将 UserRecord 类添加到 KCL:

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

现在使用的是此类而不是 Record。这不会破坏现有代码,因为它是 Record 的子类。UserRecord 类同时表示实际子记录和标准非聚合记录。非聚合记录可被视为刚好具有一条子记录的聚合记录。

此外,将两个新的操作添加到 IRecordProcessorCheckpointer

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

要开始使用子序列号检查点操作,您可执行以下转换。更改以下形式代码:

checkpointer.checkpoint(record.getSequenceNumber());

新的形式代码:

checkpointer.checkpoint(record);

建议您使用 checkpoint(Record record) 形式来进行子序列检查点操作。不过,如果您已以字符串形式存储 sequenceNumbers 以用于检查点操作,则您现在也应存储 subSequenceNumber,如以下示例所示:

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

RecordUserRecord 的转换始终成功实施,因为实施始终在后台使用 UserRecord。除非需要对序列号进行算术运算,否则建议不要采用此方法。

在处理 KPL 用户记录时,KCL 会将子序列号写入 Amazon DynamoDB 作为每行的额外字段。在恢复检查点操作时,早期版本的 KCL 使用 AFTER_SEQUENCE_NUMBER 提取记录。而具有 KPL 支持的当前 KCL 使用的是 AT_SEQUENCE_NUMBER。在检索带已进行检查点操作的序列号的记录时,会检查已进行检查点操作的子序列号,而且会根据需要删除子记录(如果上一条子记录为已进行检查点操作的记录,则可能删除所有子记录)。同样,非聚合记录可被视为具有单条子记录的聚合记录,因此相同的算法同时适用于聚合和非聚合记录。

直接使用 GetRecords

您也可以选择不使用 KCL 而是直接调用 API 操作 GetRecords 来检索 Kinesis Data Streams 记录。要将已检索到的记录提取到原始 KPL 用户记录中,可在 UserRecord.java 中调用下列静态操作之一:

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

第一个操作使用 startingHashKey 的默认值 0(零)和 endingHashKey 的默认值 2^128 -1

每个操作都会取消将 Kinesis Data Streams 记录的给定列表聚合到 KPL 用户记录的列表中。将从返回的记录列表中删除其显式哈希键或分区键位于 startingHashKey(包含)和 endingHashKey(包含)的范围之外的任何 KPL 用户记录。