本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
处理重复记录
有两个主要原因可能导致多次向您的 Amazon Kinesis Data Streams 应用程序传送记录:创建器重试和消费端重试。您的应用程序必须预计并适当地应对多次处理单个记录的问题。
产生器重试
假设某个创建器已经对 PutRecord
进行调用但仍在从 Amazon Kinesis Data Streams 接收确认前遇到了网络相关的超时。创建器无法确定记录是否已传输到 Kinesis Data Streams。假定每个记录对应用程序都很重要,创建者应该已被写入以重试对相同数据的调用。如果对相同数据的两次 PutRecord
调用已成功提交到 Kinesis Data Streams,则会有两个 Kinesis Data Streams 记录。尽管这两个记录具有相同的数据,但它们各具有唯一的序号。需要严格保证的应用程序应在记录中嵌入一个主键,以便在随后的处理过程中删除重复项。请注意,由创建者重试产生的重复项的数量通常低于由消费端重试产生的重复项的数量。
注意
如果您要使用 Amazon SDK PutRecord
,请在《Amazon SDKs and Tools user guide》中了解 SDK 重试行为。
消费端重试
消费端(数据处理应用程序)重试在记录处理器重新启动时发生。相同分片的记录处理器在以下情况下重新启动:
-
工作程序意外终止
-
已添加或删除工作程序实例
-
已拆分或合并分片
-
已部署应用程序
在所有这些情况下,分片到工作程序再到记录处理器的映射将持续更新到负载均衡处理。已迁移到其他实例的分片处理器将从上一个检查点开始重新启动处理记录。这导致了重复的记录处理,如以下示例所示。有关负载均衡的更多信息,请参阅使用重新分片、扩展和并行处理更改分片数量。
示例:导致重新传送记录的消费端重试
在此示例中,您有一个持续从流中读取记录、将记录聚合到本地文件并将文件上传到 Amazon S3 的应用程序。为简便起见,假定只有 1 个分片和 1 个处理该分片的工作程序。考虑以下示例顺序的事件,假定上一个检查点位于记录编号 10000 处:
-
工作程序从该分片中读取下一批记录,即从 10001 到 20000 的记录。
-
工作程序随后将这批记录传递到关联的记录处理器。
-
记录处理器聚合数据、创建 Amazon S3 文件并将文件成功上传到 Amazon S3。
-
工作程序在新的检查点出现之前意外终止。
-
应用程序、工作程序和记录处理器重新启动。
-
工作程序现在开始从上次成功的检查点(在本案例中为 10001)开始读取。
因此,记录 10001-20000 使用了多次。
实现对消费端重试的弹性
尽管记录可被处理多次,但您的应用程序可能会带来副作用,就像记录只能处理一次一样(幂等处理)。有关此问题的解决方案因复杂性和准确性而异。如果最终数据的目标可以很好地处理重复,我们建议依靠最终目标来实现幂等处理。例如,利用 Opensearch
在上一节的示例应用程序中,它持续从流中读取记录、将记录聚合到本地文件并将文件上传到 Amazon S3。如图所示,记录 10001 - 20000 使用了多次,从而生成了具有相同数据的多个 Amazon S3 文件。减少此示例中的重复的一种方法是确保步骤 3 使用了以下方案:
-
记录处理器对每个 Amazon S3 文件使用了固定数量的记录,如 5000。
-
文件名使用以下架构:Amazon S3 前缀、分片 ID 和
First-Sequence-Num
。在本例中,它可以是类似于sample-shard000001-10001
的形式。 -
在上传 Amazon S3 文件后,通过指定
Last-Sequence-Num
进行检查点操作。在本例中,您将在记录编号 15000 处进行检查点操作。
利用此方案,即使记录被处理了多次,生成的 Amazon S3 文件也会具有相同的名称和数据。重试只会导致将相同的数据多次写入到相同的文件。
对于重新分片操作,分片中剩余的记录的数量可能少于您需要的固定数量。在本例中,您的 shutdown()
方法必须将文件刷新到 Amazon S3 并对上一个序列号进行检查点操作。以上方案也与重新分片操作兼容。