本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
实现消费端取消聚合
从版本 1.4.0 开始,KCL 支持自动取消聚合 KPL 用户记录。在更新 KCL 后,将编译利用 KCL 早期版本编写的消费端应用程序代码,而无需进行任何修改。不过,如果正在创建器端使用 KPL 聚合,则有一个与检查点操作相关的细微之处:已聚合记录中的所有子记录都具有相同的序列号,因此如果您需要区分子记录,则必须利用检查点存储额外数据。此额外数据称作子序列号。
从以前的 KCL 版本迁移
您无需更改现有调用来同时执行检查点操作与聚合。仍确保您能够成功检索 Kinesis Data Streams 中存储的所有记录。KCL 现在提供两个新检查点操作来支持特定的使用案例,如下所述。
如果您的现有代码先于 KPL 支持之前已为 KCL 写入,并且调用了没有参数的检查点操作,则它等同于对批处理中的上个 KPL 用户记录的序列号进行检查点操作。如果调用带序列号字符串的检查点操作,则它等同于对批处理的指定序列号以及隐式子序列号 0(零)进行检查点操作。
调用没有任何参数的新 KCL 检查点 checkpoint()
操作,在语义上等同于对批处理中的上个 Record
调用的序列号以及隐式子序列号 0(零)进行检查点操作。
调用新 KCL 检查点操作 checkpoint(Record record)
,在语义上等同于对指定 Record
的序列号以及隐式子序列号 0(零)进行检查点操作。如果 Record
调用实际为 UserRecord
,则对 UserRecord
序列号和子序列号进行检查点操作。
调用新 KCL 检查点操作 checkpoint(String sequenceNumber, long
subSequenceNumber)
会对给定的序列号以及子序列号进行显式检查点操作。
在上述任意情况下,在将检查点存储在 Amazon DynamoDB 检查点表中后,KCL 可正确地恢复检索记录,即使是在应用程序发生崩溃并重新启动时也是如此。如果在序列中包含多条记录,则检索会从具有已进行检查点操作的最新序列号的记录中的下个子序列号记录开始。如果最新检查点包括上一条序列号记录的最新子序列号,则检索会从具有下个序列号的记录开始。
以下部分将讨论需要避免跳过和重复记录的消费端的序列和子序列检查点操作的详细信息。如果在停止并重新启动消费端的记录处理时跳过(或重复)记录并不重要,则可运行您的现有代码而无需进行修改。
将 KCL 扩展用于 KPL 取消聚合
KPL 取消聚合会涉及子序列检查点操作。为了帮助使用子序列检查点操作,已将 UserRecord
类添加到 KCL:
public class UserRecord extends Record {
public long getSubSequenceNumber() {
/* ... */
}
@Override
public int hashCode() {
/* contract-satisfying implementation */
}
@Override
public boolean equals(Object obj) {
/* contract-satisfying implementation */
}
}
现在使用的是此类而不是 Record
。这不会破坏现有代码,因为它是 Record
的子类。UserRecord
类同时表示实际子记录和标准非聚合记录。非聚合记录可被视为刚好具有一条子记录的聚合记录。
此外,将两个新的操作添加到 IRecordProcessorCheckpointer
:
public void checkpoint(Record record);
public void checkpoint(String sequenceNumber, long subSequenceNumber);
要开始使用子序列号检查点操作,您可执行以下转换。更改以下形式代码:
checkpointer.checkpoint(record.getSequenceNumber());
新的形式代码:
checkpointer.checkpoint(record);
建议您使用 checkpoint(Record record)
形式来进行子序列检查点操作。不过,如果您已以字符串形式存储 sequenceNumbers
以用于检查点操作,则您现在也应存储 subSequenceNumber
,如以下示例所示:
String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
从 Record
到 UserRecord
的转换始终会成功,因为其实现始终使用 UserRecord
。除非需要对序列号进行算术运算,否则建议不要采用此方法。
在处理 KPL 用户记录时,KCL 会将子序列号作为每行的额外字段写入 Amazon DynamoDB。在恢复检查点操作时,早期版本的 KCL 使用 AFTER_SEQUENCE_NUMBER
提取记录。而具有 KPL 支持的当前 KCL 则使用 AT_SEQUENCE_NUMBER
。在检索带已进行检查点操作的序列号的记录时,会检查已进行检查点操作的子序列号,而且会根据需要删除子记录(如果上一条子记录为已进行检查点操作的记录,则可能删除所有子记录)。同样,非聚合记录可被视为具有单条子记录的聚合记录,因此相同的算法同时适用于聚合和非聚合记录。
直接使用 GetRecords
您也可以选择不使用 KCL 而是直接调用 API 操作 GetRecords
来检索 Kinesis Data Streams 记录。要将已检索到的记录提取到原始 KPL 用户记录中,可在 UserRecord.java
中调用下列静态操作之一:
public static List<Record> deaggregate(List<Record> records)
public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
第一个操作使用 0
的默认值 startingHashKey
(零)和 2^128 -1
的默认值 endingHashKey
。
每个操作都会取消将 Kinesis Data Streams 记录的给定列表聚合到 KPL 用户记录的列表中。将从返回的记录列表中删除其显式哈希键或分区键位于 startingHashKey
(包含)和 endingHashKey
(包含)的范围之外的任何 KPL 用户记录。