使用 Kinesis Client Library - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis Client Library

开发可以处理来自 KDS 数据流的数据的自定义消费端应用程序的一种方法,是使用 Kinesis Client Library(KCL)。

注意

建议您根据使用场景将 KCL 1.x 和 KCL 2.x 升级到最新的 KCL 1.x 版本或 KCL 2.x 版本。KCL 1.x 和 KCL 2.x 会定期更新至最新版本,包括最新依赖项和安全补丁、错误修复以及向后兼容的新功能。欲了解更多信息,请参阅 https://github.com/awslabs/ amazon-kinesis-client /releases。

什么是 Kinesis Client Library?

KCL 通过处理许多与分布式计算相关的复杂任务,帮助您使用和处理 Kinesis 数据流中的数据。这些任务包括跨多个消费端应用程序实例的负载平衡、对消费端应用程序实例故障的响应、已处理记录的检查点操作以及对重新分片的反应。KCL 负责所有这些子任务,让您可以将精力集中在编写自定义记录处理逻辑上。

请注意,KCL 与 Amazon 开发工具包中可用的 Kinesis Data Streams API 不同。Kinesis Data Streams API 可帮助您管理 Kinesis Data Streams 的许多方面,包括创建流、重新分片以及放置和获取记录。KCL 围绕这些子任务提供了一个抽象层,让您可以专注于消费端应用程序的自定义数据处理逻辑工作。有关 Kinesis Data Streams API 的信息,请参阅 Amazon Kinesis API Reference

重要

KCL 属于 Java 库,使用名为 “” 的多语言接口提供对 Java 以外其他语言的 MultiLangDaemon支持。此进程守护程序基于 Java,当您使用 Java 以外的 KCL 语言时,该程序会在后台运行。例如,如果您安装适用于 Python 的 KCL 并完全使用 Python 编写使用者应用程序,则仍然需要在系统上安装 Java,这是因为。 MultiLangDaemon此外, MultiLangDaemon 还有一些您可能需要根据自己的用例自定义的默认设置,例如它所连接的Amazon区域。有关 MultiLangDaemon on 的更多信息 GitHub,请参阅 KCL MultiLangDaemon 项目

KCL 充当记录处理逻辑和 Kinesis Data Streams 之间的中介。KCL 将执行以下任务:

  • 连接到数据流

  • 枚举数据流中的分片

  • 使用租约来协调分片与其工作程序的关联

  • 为其管理的每个分片实例化记录处理器

  • 从数据流中提取数据记录

  • 将记录推送到对应的记录处理器

  • 对已处理记录进行检查点操作

  • 当工作程序实例计数发生变化或数据流被重新分片(分片被拆分或合并)时,平衡分片与工作程序的关联(租约)

KCL 可用版本

目前,您可以使用以下任一支持的 KCL 版本来构建自定义消费端应用程序:

您可以使用 KCL 1.x 或 KCL 2.x 来构建使用共享吞吐量的消费端应用程序。有关更多信息,请参阅 使用 KCL 开发具有共享吞吐量的自定义使用者

要构建使用专用吞吐量的消费端应用程序(增强型扇出消费端应用程序),只能使用 KCL 2.x。有关更多信息,请参阅 开发具有专用吞吐量的自定义使用者(增强扇出功能)

有关 KCL 1.x 和 KCL 2.x 之间差异的信息,以及如何从 KCL 1.x 迁移到 KCL 2.x 的说明,请参阅 将使用者从 KCL 1.x 迁移到 KCL 2.x

KCL 概念

  • KCL 消费端应用程序 – 使用 KCL 自定义构建的应用程序,旨在读取和处理数据流中的记录。

  • 消费端应用程序实例 – KCL 消费端应用程序通常是分布式应用程序,即一个或多个应用程序实例同时运行,以便协调故障和以动态方式实现数据记录处理负载平衡。

  • 工作程序 – KCL 消费端应用程序实例用来开始处理数据的高级类。

    重要

    每个 KCL 消费端应用程序实例都有一个工作程序。

    工作程序负责初始化和监督各种任务,包括同步分片和租约信息、跟踪分片分配以及处理来自分片的数据。工作程序向 KCL 提供消费端应用程序的配置信息,例如此 KCL 消费端应用程序要处理的数据记录的数据流名称以及访问此数据流所需的 Amazon 凭证。工作程序还会启动特定的 KCL 消费端应用程序实例,将数据记录从数据流传输到记录处理器。

    重要

    在 KCL 1.x 中,这个类被称为工作程序。欲了解更多信息(这些是 Java KCL 存储库),请参阅 https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/Services/kinesis/clientLibrary/lib/worker/ worker.java。在 KCL 2.x 中,这个类被称为计划程序。KCL 2.x 中计划程序的用途与 KCL 1.x 中工作程序的用途相同。有关 KCL 2.x 中调度器类的更多信息,请参阅 https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java。amazon-kinesis-client

  • 租约 – 定义工作程序和分片之间绑定的数据。分布式 KCL 消费端应用程序使用租约在一组工作程序中对数据记录进行分区。在任何指定时间,每个数据记录分片都通过由 leaseKey 变量标识的租约绑定到特定的工作程序。

    默认情况下,工作人员可以同时持有一份或多份租约(以 maxLeasesForWorker 变量的值为准)。

    重要

    每个工作程序都将争相持有数据流中所有可用分片的所有可用租约。但是,无论何时,只有一个工作程序可以成功持有每份租约。

    例如,如果您的消费端应用程序实例 A 和工作程序 A 正在处理包含 4 个分片的数据流,则工作程序 A 可以同时持有分片 1、2、3 和 4 的租约。但是,如果您有 A 和 B 两个消费端应用程序实例,以及工作程序 A 和工作程序 B,并且这些实例正在处理包含 4 个分片的数据流,则工作程序 A 和工作程序 B 不能同时持有分片 1 的租约。一个工作程序持有特定分片的租约,直到该工作程序准备好停止处理该分片的数据记录或该工作程序失败为止。当一个工作程序停止持有租约时,另一个工作程序会接管并持有租约。

    欲了解更多信息,(这些是 Java KCL 存储库),请参阅 https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/Leases/impl/lease.java for KCL 1.x 和 KCL 2.x 的 https://github.com/awslabs/ /blob/ma ster/ /src/main/java/software/amazon/kinesis/leases/Lease.java amazon-kinesis-client amazon-kinesis-client

  • 租约表 – 唯一的 Amazon DynamoDB 表,用于跟踪 KDS 数据流中由 KCL 消费端应用程序的工作程序租赁和处理的分片。在 KCL 消费端应用程序运行时,租约表必须与数据流中的最新分片信息保持同步(在工作程序内部和所有工作程序之间)。有关更多信息,请参阅 使用租约表跟踪 KCL 消费端应用程序处理的分片

  • 记录处理器 – 定义 KCL 消费端应用程序如何处理从数据流中获取的数据的逻辑。在运行时,KCL 消费端应用程序实例会实例化一个工作程序,该工作程序会为持有租约的每个分片实例化一个记录处理器。

使用租约表跟踪 KCL 消费端应用程序处理的分片

什么是租约表

对于每个 Amazon Kinesis Data Streams 应用程序,KCL 都使用一份唯一的租约表(存储在 Amazon DynamoDB 表中)来跟踪 KDS 数据流中由 KCL 消费端应用程序的工作程序租赁和处理的分片。

重要

KCL 使用消费端应用程序的名称来创建该消费端应用程序使用的租约表的名称,因此每个消费端应用程序的名称必须是唯一的。

您可在消费端应用程序运行的同时使用 Amazon DynamoDB 控制台查看租约表。

如果应用程序启动时 KCL 消费端应用程序的租约表不存在,则其中一个工作程序会为此应用程序创建租约表。

重要

除开与 Kinesis Data Streams 本身关联的费用,您的账户将被收取与 DynamoDB 表关联的费用。

租约表中的每行表示您消费端应用程序正在处理的分片。如果 KCL 消费端应用程序仅处理一个数据流,则租约表的哈希键 leaseKey 就是分片 ID。如果您采用 Java 版消费端应用程序的相同 KCL 2.x 处理多个数据流,则 leaseKey 的结构如下:account-id:StreamName:streamCreationTimestamp:ShardId。例如,111111111:multiStreamTest-1:12345:shardId-000000000336

除了分片 ID 以外,每行还包含以下数据:

  • checkpoint:分片的最新检查点序号。此值在数据流的所有分片中都是唯一的。

  • checkpointSubSequence数字:使用 Kinesis Producer 库的聚合功能时,这是对检查点的扩展,用于跟踪 Kinesis 记录中的单个用户记录。

  • leaseCounter:用于租赁版本控制,这样工作程序可以检测其租赁已由其他工作程序获取。

  • leaseKey:租赁的唯一标识符。每份租约都是数据流中一个分片所特有的,一份由一个工作程序持有。

  • leaseOwner:持有此租赁的工作程序。

  • ownerSwitchesSince检查点:自上次写检查点以来,这份租约更换了多少次员工。

  • parentShardId:用于确保在子分片上开始处理之前,父分片已得到完全处理。这可以确保记录按照放入流中的相同顺序处理。

  • hashrange:PeriodicShardSyncManager 用来运行定期同步,以查找租约表中缺少的分片,并在需要时为分片创建租约。

    注意

    从 KCL 1.14 和 KCL 2.3 开始的每个分片的租约表中都有此数据。有关租约和分片之间的 PeriodicShardSyncManager 和定期同步的更多信息,请参阅 租约表如何与 KDS 数据流中的分片同步

  • childshards:LeaseCleanupManager 用来查看子分片的处理状态并决定是否可以从租约表中删除父分片。

    注意

    从 KCL 1.14 和 KCL 2.3 开始的每个分片的租约表中都有此数据。

  • shardID:分片的 ID。

    注意

    仅当您采用 Java 版消费端应用程序的相同 KCL 2.x 处理多个数据流时,此数据才会出现在租约表中。只有适用于 Java 的 KCL 2.x 才支持此功能,且从适用于 Java 的 KCL 2.3 及更高版本开始。

  • stream name 数据流的标识符采用以下格式:account-id:StreamName:streamCreationTimestamp

    注意

    仅当您采用 Java 版消费端应用程序的相同 KCL 2.x 处理多个数据流时,此数据才会出现在租约表中。只有适用于 Java 的 KCL 2.x 才支持此功能,且从适用于 Java 的 KCL 2.3 及更高版本开始。

吞吐量

如果您的 Amazon Kinesis Data Streams 收到了预置的吞吐量异常,您应为 DynamoDB 表增加预置的吞吐量。KCL 将创建预置吞吐量为 10 次读取/秒和 10 次写入/秒的表,但这可能无法满足您应用程序的需求。例如,如果您的 Amazon Kinesis Data Streams 执行频繁的检查点操作或对由很多分片组成的流执行操作,您可能需要更多吞吐量。

有关 DynamoDB 表中预置吞吐量的信息,请参阅《Amazon DynamoDB 开发人员指南》中的读/写容量模式使用表和数据

租约表如何与 KDS 数据流中的分片同步

KCL 消费端应用程序中的工作程序使用租约来处理指定数据流中的分片。哪个工作程序在指定时间租赁哪个分片的相关信息都存储在租约表中。在 KCL 消费端应用程序运行期间,租约表必须与数据流中的最新分片信息保持同步。在消费端应用程序引导启动期间(初始化或重新启动消费端应用程序时),以及每当正在处理的分片结束时(重新分片),KCL 都会将租约表与从 Kinesis Data Streams 服务获取的分片信息同步。换言之,在消费端应用程序初始引导启动期间,以及每当消费端应用程序遇到数据流重新分片事件时,工作程序或 KCL 消费端应用程序都会与其正在处理的数据流同步。

KCL 1.0 - 1.13 和 KCL 2.0 - 2.2 中的同步

在 KCL 1.0 - 1.13 和 KCL 2.0 - 2.2 中,在消费端应用程序的引导启动期间以及每个数据流重新分片事件期间,KCL 会将租约表与通过调用 ListShardsDescribeStream 发现 API 从 Kinesis Data Streams 服务获取的分片信息同步。在上面列出的所有 KCL 版本中,KCL 消费端应用程序的每个工作程序都要完成以下步骤,以便在消费端应用程序的引导启动期间和每个流的重新分片事件中执行租约/分片同步过程:

  • 获取正在处理的流中数据的所有分片

  • 从租约表中获取所有分片租约

  • 筛选出租约表中没有租约的所有开放分片

  • 迭代所有找到的开放分片以及没有开放父分片的所有开放分片:

    • 遍历层次结构树的原级路径,确定该分片是否为后代分片。如果正在处理原级分片(租约表中存在原级分片的租约条目)或者应该处理原级分片(例如初始位置为 TRIM_HORIZONAT_TIMESTAMP),则该分片被视为后代分片

    • 如果上下文中的开放分片是后代分片,KCL 会根据初始位置对分片执行检查点操作,并在需要时为其父分片创建租约

KCL 2.x 中的同步从 KCL 2.3 及更高版本开始

从支持的最新版本 KCL 2.x(KCL 2.3)及更高版本开始,该库现在支持对同步过程进行以下更改。这些租约/分片同步更改大幅减少了 KCL 消费端应用程序对 Kinesis Data Streams 服务进行的 API 调用次数,并优化了 KCL 消费端应用程序中的租约管理。

  • 在应用程序的引导启动过程中,如果租约表为空,KCL 将利用 ListShard API 的筛选选项(ShardFilter 可选请求参数),仅针对在 ShardFilter 参数指定时间开放的分片的快照检索和创建租约。ShardFilter 参数可以让您筛选出 ListShards API 的响应。ShardFilter 参数唯一需要的属性是 Type。KCL 使用 Type 筛选属性及其以下有效值来识别并返回可能需要新租约的开放分片的快照:

    • AT_TRIM_HORIZON – 响应包含在 TRIM_HORIZON 时开放的所有分片。

    • AT_LATEST – 响应仅包含数据流中当前开放的分片。

    • AT_TIMESTAMP – 响应包含起始时间戳小于或等于指定时间戳且结束时间戳大于或等于指定时间戳的所有分片,或仍处于开放状态的所有分片。

    ShardFilter 用于为空租约表创建租约,以初始化在 RetrievalConfig#initialPositionInStreamExtended 中指定的分片快照的租约。

    有关 ShardFilter 的更多信息,请参阅https://docs.amazonaws.cn/kinesis/latest/APIReference/API_ShardFilter.html

  • 要让租约表与数据流中的最新分片保持同步,不是所有工作程序都执行租约/分片同步,而是由一个所选的主工作程序来执行租约/分片同步。

  • KCL 2.3 使用 GetRecordsSubscribeToShard API 的 ChildShards 返回参数在 SHARD_END 时对关闭的分片执行租约/分片同步,从而允许 KCL 工作程序仅为其完成处理的分片的子分片创建租约。对于共享吞吐量消费端应用程序,租约/分片同步的这种优化使用了 GetRecords API 的 ChildShards 参数。对于专用吞吐量(增强型扇出)消费端应用程序,租约/分片同步的这种优化使用 SubscribeToShard API 的 ChildShards 参数。有关更多信息,请参阅GetRecordsSubscribeToShards、和ChildShard

  • 经过上述更改,KCL 的行为正在从所有工作程序学习所有现有分片的模式,转变为工作程序只学习每个工作程序拥有的分片的子分片的模式。因此,除了在消费端应用程序引导启动和重新分片事件期间发生的同步外,KCL 现在还会执行额外的定期分片/租约扫描,从而识别租约表中的任何潜在漏洞(也就是了解所有新分片),确保数据流的完整哈希范围得到处理,并在需要时为它们创建租约。PeriodicShardSyncManager 是负责定期运行租约/分片扫描的组件。

    有关 KCL 2.3 PeriodicShardSyncManager 中的更多信息,请参阅 https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/sroftware/amazon/kinesis/leases/.jav amazon-kinesis-client a #L201-L213。LeaseManagementConfig

    在 KCL 2.3 中,可以使用新的配置选项来配置 LeaseManagementConfig 中的 PeriodicShardSyncManager

    名称 默认值 描述
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000(2 分钟)

    审计程序作业扫描租约表中部分租约的频率(以毫秒为单位)。如果审计程序检测到某个流的租约中存在任何漏洞,则会根据 leasesRecoveryAuditorInconsistencyConfidenceThreshold 触发分片同步。

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期审计程序作业的置信度阈值,用于确定租约表中数据流的租约是否不一致。如果审计程序连续多次发现同一数据流的不一致之处,则会触发分片同步。

    现在还会发布新的 CloudWatch 指标来监控的PeriodicShardSyncManager运行状况。有关更多信息,请参阅 PeriodicShardSyncManager

  • 包括对 HierarchicalShardSyncer 的优化,可仅为一层分片创建租约。

KCL 1.x 中的同步从 KCL 1.14 及更高版本开始

从支持的最新版本 KCL 1.x(KCL 1.14)及更高版本开始,该库现在支持对同步过程进行以下更改。这些租约/分片同步更改大幅减少了 KCL 消费端应用程序对 Kinesis Data Streams 服务进行的 API 调用次数,并优化了 KCL 消费端应用程序中的租约管理。

  • 在应用程序的引导启动过程中,如果租约表为空,KCL 将利用 ListShard API 的筛选选项(ShardFilter 可选请求参数),仅针对在 ShardFilter 参数指定时间开放的分片的快照检索和创建租约。ShardFilter 参数可以让您筛选出 ListShards API 的响应。ShardFilter 参数唯一需要的属性是 Type。KCL 使用 Type 筛选属性及其以下有效值来识别并返回可能需要新租约的开放分片的快照:

    • AT_TRIM_HORIZON – 响应包含在 TRIM_HORIZON 时开放的所有分片。

    • AT_LATEST – 响应仅包含数据流中当前开放的分片。

    • AT_TIMESTAMP – 响应包含起始时间戳小于或等于指定时间戳且结束时间戳大于或等于指定时间戳的所有分片,或仍处于开放状态的所有分片。

    ShardFilter 用于为空租约表创建租约,以初始化在 KinesisClientLibConfiguration#initialPositionInStreamExtended 中指定的分片快照的租约。

    有关 ShardFilter 的更多信息,请参阅https://docs.amazonaws.cn/kinesis/latest/APIReference/API_ShardFilter.html

  • 要让租约表与数据流中的最新分片保持同步,不是所有工作程序都执行租约/分片同步,而是由一个所选的主工作程序来执行租约/分片同步。

  • KCL 1.14 使用 GetRecordsSubscribeToShard API 的 ChildShards 返回参数在 SHARD_END 时对关闭的分片执行租约/分片同步,从而允许 KCL 工作程序仅为其完成处理的分片的子分片创建租约。有关更多信息,请参阅 GetRecordsChildShard

  • 经过上述更改,KCL 的行为正在从所有工作程序学习所有现有分片的模式,转变为工作程序只学习每个工作程序拥有的分片的子分片的模式。因此,除了在消费端应用程序引导启动和重新分片事件期间发生的同步外,KCL 现在还会执行额外的定期分片/租约扫描,从而识别租约表中的任何潜在漏洞(也就是了解所有新分片),确保数据流的完整哈希范围得到处理,并在需要时为它们创建租约。PeriodicShardSyncManager 是负责定期运行租约/分片扫描的组件。

    如果 KinesisClientLibConfiguration#shardSyncStrategyType 设置为 ShardSyncStrategyType.SHARD_END,则 PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold 用于确定租约表中包含漏洞的连续扫描次数的阈值,之后将强制执行分片同步。当 KinesisClientLibConfiguration#shardSyncStrategyType 设置为时 ShardSyncStrategyType.PERIODICleasesRecoveryAuditorInconsistencyConfidenceThreshold 将被忽略。

    有关 KCL 1.14 PeriodicShardSyncManager 中的更多信息,请参阅 https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ .java #L987-L999 KinesisClientLibConfiguration

    在 KCL 1.14 中,可以使用新的配置选项来配置 LeaseManagementConfig 中的 PeriodicShardSyncManager

    名称 默认值 描述
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期审计程序作业的置信度阈值,用于确定租约表中数据流的租约是否不一致。如果审计程序连续多次发现同一数据流的不一致之处,则会触发分片同步。

    现在还会发布新的 CloudWatch 指标来监控的PeriodicShardSyncManager运行状况。有关更多信息,请参阅 PeriodicShardSyncManager

  • KCL 1.14 现在还支持延期租约清理。当分片超过数据流的保留期或因重新分片操作而关闭时,LeaseCleanupManager 会在到达 SHARD_END 时异步删除租约。

    可以使用新的配置选项来配置 LeaseCleanupManager

    名称 默认值 描述
    leaseCleanupInterval米利斯

    1 minute

    运行租约清理线程的时间间隔。

    completedLeaseCleanupIntervalMillis 5 分钟

    检查租约是否完成的时间间隔。

    garbageLeaseCleanupIntervalMillis 30 分钟

    检查租约是否为垃圾租约(即超过数据流的保留期)的时间间隔。

  • 包括对 KinesisShardSyncer 的优化,可仅为一层分片创建租约。

采用 Java 版消费端应用程序的相同 KCL 2.x 处理多个数据流

本节介绍了 Java 版 KCL 2.x 中的以下更改,这些更改使您能够创建可以同时处理多个数据流的 KCL 使用者应用程序。

重要

只有适用于 Java 的 KCL 2.x 才支持多流处理功能,且从适用于 Java 的 KCL 2.3 及更高版本开始。

其他可以实现 KCL 2.x 的语言都不支持多流处理功能。

任何版本的 KCL 1.x 都不支持多流处理功能。

  • MultistreamTracker 接口

    要构建可以同时处理多个流的使用者应用程序,必须实现一个名为的新接口MultistreamTracker。此接口包括返回要由 KCL 消费端应用程序处理的数据流及其配置列表的 streamConfigList 方法。请注意,正在处理的数据流可以在消费端应用程序运行时进行更改。KCL 会定期调用 streamConfigList 来了解要处理的数据流的变化。

    streamConfigList方法填充StreamConfig列表。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    请注意,StreamIdentifierInitialPositionInStreamExtended 是必填字段,consumerArn 是选填字段。只有在使用 KCL 2.x 实现增强型扇出消费端应用程序时,才必须提供 consumerArn

    有关更多信息,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L29 您可以根据序列化的流标识符为 StreamIdentifier 创建多流实例。序列化的流标识符应采用以下格式:account-id:StreamName:streamCreationTimestamp

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTracker 还包括可删除租约表中旧流租约的策略(formerStreamsLeasesDeletionStrategy)。请注意,在消费端应用程序运行时无法更改策略。欲了解更多信息,请参阅 https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ /src/main/java/sortware/am amazon-kinesis-client azon/kinesis/processor/.java FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilder是一个应用程序范围的类,可用于指定构建 KCL 使用者应用程序时要使用的所有 KCL 2.x 配置设置。 ConfigsBuilder类现在支持该MultistreamTracker接口。你可以用一个数据流的名称进行初始化 ConfigsBuilder以使用来自以下内容的记录:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    或者,MultiStreamTracker如果要实现同时处理多个流的 KCL 使用者应用程序,也可以使用进行初始化 ConfigsBuilder 。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 通过为您的 KCL 消费端应用程序实现多流支持功能,应用程序租约表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。

  • 在为 KCL 消费端应用程序实现多流支持功能后,leaseKey 采用以下结构:account-id:StreamName:streamCreationTimestamp:ShardId。例如,111111111:multiStreamTest-1:12345:shardId-000000000336

    重要

    当现有 KCL 消费端应用程序配置为仅处理一个数据流时,leaseKey(租约表的哈希键)就是分片 ID。如果您将此现有 KCL 消费端应用程序重新配置为处理多个数据流,则会破坏租约表,因为支持多流处理功能后,leaseKey 必须采用如下结构:account-id:StreamName:StreamCreationTimestamp:ShardId

将 Kinesis Client Library 与 Amazon Glue 架构注册表结合使用

您可以将 Kinesis Data Streams 与 Amazon Glue 架构注册表进行集成。Amazon Glue 架构注册表允许您集中发现、控制和演变架构,同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。AmazonGlue Schema Registry 使您能够改善流媒体应用程序中的 end-to-end 数据质量和数据管理。有关更多信息,请参阅 Amazon Glue Schema Registry。设置此集成的方法之一是使用适用于 Java 的 KCL。

重要

目前,只有使用在 Java 中实现的 KCL 2.3 消费端的 Kinesis Data Streams 支持 Kinesis Data Streams 和 Amazon Glue 架构注册表集成。不提供多语言支持。不支持 KCL 1.0 消费端。不支持 KCL 2.3 之前的 KCL 2.x 消费端。

有关如何使用 KCL 设置 Kinesis Data Streams 与架构注册表集成的详细说明,请参阅 Use Case: Integrating Amazon Kinesis Data Streams with the Amazon Glue Schema Registry 中的“Interacting with Data Using the KPL/KCL Libraries”部分。