使用者取消聚合 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用者取消聚合

从版本 1.4.0 开始,KCL 支持自动取消聚合 KPL 用户记录。在更新 KCL 后,将编译利用早期版本的 KCL 编写的使用者应用程序代码而不做任何修改。不过,如果正在创建者端使用 KPL 聚合,则有一个与检查点操作相关的细微之处:已聚合记录中的所有子记录都具有相同的序列号,因此如果您需要区分子记录,则必须利用检查点存储额外数据。此额外数据称作子序列号

从早期版本的 KCL 迁移

您无需更改现有调用来同时执行检查点操作与聚合。仍确保您能够成功检索 Kinesis Data Streams 中存储的所有记录。KCL 现在提供两个新的检查点操作来支持特定的使用案例,如下所述。

如果您的现有代码先于 KPL 支持之前已为 KCL 写入,并且调用了没有参数的检查点操作,则它等同于对批处理中的上个 KPL 用户记录的序列号进行检查点操作。如果调用带序列号字符串的检查点操作,则它等同于对批处理的指定序列号以及隐式子序列号 0(零)进行检查点操作。

调用新的 KCL 检查点操作checkpoint()没有任何参数在语义上等同于检查最后一个序列号Record调用批处理中调用,以及隐式子序列号 0(零)。

调用新的 KCL 检查点操作checkpoint(Record record)在语义上等同于给定的检查点Record的序列号以及隐式子序列号 0(零)。如果 Record 调用实际为 UserRecord,则对 UserRecord 序列号和子序列号进行检查点操作。

调用新的 KCL 检查点操作checkpoint(String sequenceNumber, long subSequenceNumber)对给定的序列号以及子序列号进行显式检查点操作。

在上述任意情况下,在将检查点存储在 Amazon DynamoDB 检查点表中后,KCL 可正确地恢复检索记录,即使是在应用程序发生崩溃并重新启动时也是如此。如果在序列中包含多条记录,则检索会从具有已进行检查点操作的最新序列号的记录中的下个子序列号记录开始。如果最新检查点包括上一条序列号记录的最新子序列号,则检索会从具有下个序列号的记录开始。

以下部分将讨论需要避免跳过和重复记录的使用者的序列和子序列检查点操作的详细信息。如果在停止并重新启动使用器的记录处理时跳过(或重复)记录并不重要,则可运行您的现有代码而无需进行修改。

KPL 反聚合的 KCL 扩展

如上述讨论,KPL 去聚合会涉及子序列检查点操作。为了帮助使用子序列检查点操作,UserRecord已添加到 KCL:

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

现在使用的是此类而不是 Record。这不会破坏现有代码,因为它是 Record 的子类。UserRecord 类同时表示实际子记录和标准非聚合记录。非聚合记录可被视为刚好具有一条子记录的聚合记录。

此外,将两个新的操作添加到 IRecordProcessorCheckpointer

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

要开始使用子序列号检查点操作,您可执行以下转换。更改以下形式代码:

checkpointer.checkpoint(record.getSequenceNumber());

新的形式代码:

checkpointer.checkpoint(record);

建议您使用 checkpoint(Record record) 形式来进行子序列检查点操作。不过,如果您已以字符串形式存储 sequenceNumbers 以用于检查点操作,则您现在也应存储 subSequenceNumber,如以下示例所示:

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

RecordUserRecord 的转换始终成功实施,因为实施始终在后台使用 UserRecord。除非需要对序列号进行算术运算,否则建议不要采用此方法。

在处理 KPL 用户记录时,KCL 会将子序列号写入 Amazon DynamoDB 中,作为每行的额外字段。使用过的 KCL 的以前版本AFTER_SEQUENCE_NUMBER在恢复检查点操作时获取记录。当前拥有 KPL 支持的 KCL 使用AT_SEQUENCE_NUMBER相反。在检索带已进行检查点操作的序列号的记录时,会检查已进行检查点操作的子序列号,而且会根据需要删除子记录(如果上一条子记录为已进行检查点操作的记录,则可能删除所有子记录)。同样,非聚合记录可被视为具有单条子记录的聚合记录,因此相同的算法同时适用于聚合和非聚合记录。

使用GetRecords直接

您也可以选择不使用 KCL,而是调用 API 操作。GetRecords直接检索 Kinesis Data Streams 记录。要将已检索到的记录提取到原始 KPL 用户记录中,可在中调用下列静态操作之一:UserRecord.java

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

第一个操作使用 0 的默认值 startingHashKey(零)和 2^128 -1 的默认值 endingHashKey

每个操作都会将 Kinesis Data Streams 记录的给定列表聚合到 KPL 用户记录的列表中。其显式哈希键或分区键位于的范围之外的任何 KPL 用户记录startingHashKey(含)和endingHashKey(含)将从返回的记录列表中丢弃。