使用 Kinesis 客户端库 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis 客户端库

开发可以处理来自 KDS 数据流的数据的自定义消费者应用程序的方法之一是使用 Kinesis 客户端库 (KCL)。

注意

对于 KCL 1.x 和 KCL 2.x,建议您根据使用情况升级到最新的 KCL 1.x 版本或 KCL 2.x 版本。KCL 1.x 和 KCL 2.x 都会定期更新新版本,其中包括最新的依赖项和安全补丁、错误修复以及向后兼容的新功能。有关更多信息,请参阅 。https://github.com/awslabs/amazon-kinesis-client/发布.

Kinesis 客户端库是什么?

KCL 通过处理与分布式计算相关的许多复杂任务来帮助您使用和处理来自 Kinesis 数据流的数据。其中包括跨多个使用者应用程序实例执行负载均衡、响应使用者应用程序实例故障、对已处理记录执行检查点操作以及对重新分片 KCL 负责所有这些子任务,以便您可以专注于编写自定义记录处理逻辑。

KCL 与 Kinesis Data Streams API 不同,后者可以在Amazon开发工具包。Kinesis Data Streams API 可帮助您管理 Kinesis Data Streams 的许多方面,包括创建流、重新分片以及放置和获取记录。KCL 围绕所有这些子任务提供了一层抽象,特别是为了让您可以专注于消费者应用程序的自定义数据处理逻辑。有关 Kinesis Data Streams API 的信息,请参阅Amazon Kinesis API 参考.

重要

KCL 是一个 Java 库。使用名为MultiLang守护程序。此守护程序是基于 Java 的,当您使用除 Java 之外的 KCL 语言时,会在后台运行。例如,如果您安装了适用于 Python 的 KCL 并完全在 Python 中编写使用者应用程序,则您仍需要在您的系统中安装 Java,因此MultiLang守护程序。此外,MultiLangDaemon 有一些默认设置,您可能需要根据自己的使用案例自定义这些设置,例如Amazon它连接到的区域。有关的更多信息MultiLang上守护程序GitHub请参阅KCLMultiLang守护程序.

KCL 充当您的记录处理逻辑和 Kinesis Data Streams 之间的中介。KCL 执行以下任务:

  • 连接到数据流

  • 枚举数据流中的分片

  • 使用租约与其工作人员协调分片关联

  • 为其管理的每个分片实例化记录处理器

  • 从数据流中提取数据记录

  • 将记录推送到对应的记录处理器

  • 对已处理记录进行检查点操作

  • 当工作线程实例计数发生变化或重新分片数据流时(分片被拆分或合并)时,平衡分片工作人员关联(租赁)

KCL 可用的版本

目前,您可以使用以下支持版本之一来构建自定义使用者应用程序:

您可以使用 KCL 1.x 或 KCL 2.x 来构建使用共享吞吐量的消费者应用程序。有关更多信息,请参阅 使用 KCL 开发具有共享吞吐量的自定义使用者

要构建使用专用吞吐量(增强的扇出消费者)的消费者应用程序,您只能使用 KCL 2.x。有关更多信息,请参阅 开发具有专用吞吐量的自定义使用者(增强扇出功能)

有关 KCL 1.x 和 KCL 2.x 之间的区别的信息,以及有关如何从 KCL 1.x 迁移到 KCL 2.x 的说明,请参阅将使用者从 KCL 1.x 迁移到 KCL 2.x.

KCL 概念

  • KCL 使用者应用— 使用 KCL 自定义构建并旨在读取和处理数据流中记录的应用程序。

  • 消费者应用实例-KCL 消费者应用程序通常是分布式的,一个或多个应用程序实例同时运行,以便协调故障并动态负载平衡数据记录处理。

  • 工作线程— KCL 消费者应用程序实例用于开始处理数据的高级别类。

    重要

    每个 KCL 消费者应用程序实例都有一个 Worker。

    工作人员初始化并监督各种任务,包括同步分片和租赁信息、跟踪分片分配以及处理分片中的数据。工作程序为 KCL 提供了使用者应用程序的配置信息,例如将记录此 KCL 使用者应用程序要处理的数据流的名称以及Amazon访问此数据流所需的凭据。工作人员还启动该特定的 KCL 消费者应用程序实例,将数据记录从数据流传送到记录处理器。

    重要

    在 KCL 1.x 中,这个课被称为工作线程. 有关更多信息(这些是 Java KCL 存储库),请参阅https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/SRC/Main/java/com/亚马逊/服务/Kinesis/客户端库/Lib/工作人员/工作人.Java. 在 KCL 2.x 中,这个课被称为计划程序. KCL 2.x 中的调度程序的用途与 KCL 1.x 中的工作人员的目的相同。有关 KCL 2.x 中的调度器类的更多信息,请参阅https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java.

  • 租赁— 定义工作器和分片之间绑定的数据。分布式 KCL 消费者应用程序使用租赁将数据记录处理跨工作人员队列进行分区。在任何给定时间,每个数据记录分片都由leaseKey变量。

    默认情况下,工作人员可以持有一个或多个租约(取决于maxLeasesFor工作线程同时提交。

    重要

    每个工作人员都会争夺持有数据流中所有可用分片的所有可用租约。但是在任何时候只有一名员工可以成功持有每份租约。

    例如,如果您有一个具有工作器 A 的消费者应用程序实例 A 正在处理包含 4 个分片的数据流,则工作器 A 可以同时保存分片 1、2、3 和 4 的租约。但是如果你有两个消费者应用程序实例:A 和 B 使用工作器 A 和工作器 B,并且这些实例正在处理包含 4 个分片、工作器 A 和工作器 B 的数据流不能同时持有分片 1 的租约。一名工作人员持有特定分片的租约,直到准备停止处理此分片的数据记录或失败为止。当一名工作人员停止持有租约时,另一名员工接管并持有租约。

    有关更多信息(这些是 Java KCL 存储库),请参阅https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/SRC/main/java/com/亚马逊/服务/Kinesis/租赁/impl/租赁.java对于 KCL 1.x 和https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java对于 KCL 2.x。

  • 租赁表-一个独特的 Amazon DynamoDB 表,用于跟踪 KCL 消费者应用程序工作人员租用和处理的 KDS 数据流中的分片。在运行 KCL 使用者应用程序时,租赁表必须与数据流中的最新分片信息保持同步(在工作程序内部和所有工作线程之间)。有关更多信息,请参阅 使用租赁表跟踪 KCL 消费者应用程序处理的碎片

  • 录制处理器— 定义 KCL 消费者应用程序如何处理从数据流中获取的数据的逻辑。在运行时,KCL 消费者应用程序实例化一个工作器,此工作程序为其持有租约的每个分片实例化一个记录处理器。

使用租赁表跟踪 KCL 消费者应用程序处理的碎片

什么是租赁表

对于每个 Amazon Kinesis Data Streams 应用程序,KCL 都使用唯一的租赁表(存储在 Amazon DynamoDB 表中)来跟踪 KDS 数据流中由 KCL 消费者应用程序的工作人员租用和处理的分片。

重要

KCL 使用者应用程序的名称来创建此使用者应用程序使用的租赁表的名称,因此每个使用者应用程序名称都必须是唯一的。

您可以使用Amazon DynamoDB 控制台在消费者应用程序运行时。

如果您的 KCL 使用者应用程序的租赁表在应用程序启动时不存在,则其中一个工作程序会为此应用程序创建租赁表。

重要

您的账户将被收取与 DynamoDB 表关联的费用(除开与 Kinesis Data Streams)本身关联的费用。

租赁表中的每行表示使用者应用程序的工作程序正在处理的分片。如果你的 KCL 消费者应用程序只处理一个数据流,那么leaseKey这是租赁表的哈希键是分片 ID。如果您是针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流,那么 leaseKey 的结构看起来像这样:account-id:StreamName:streamCreationTimestamp:ShardId. 例如,111111111:multiStreamTest-1:12345:shardId-000000000336.

除了分片 ID 以外,每行还包含以下数据:

  • 检查点:分片的最新检查点序号。此值在数据流中的所有分片中都是唯一的。

  • checkpointSubSequence数量:使用 Kinesis Producer Library 的聚合功能时,这是一个扩展检查点它跟踪 Kinesis 记录中的单个用户记录。

  • leaseCounter:用于租赁版本控制,以便工作程序可以检测其租赁已由其他工作程序获取。

  • leaseKey:租赁的唯一标识符。每个租赁特定于数据流中的一个分片,一次由一个工作程序持有。

  • leaseOwner:持有此租赁的工作程序。

  • ownerSwitchesSince检查点:自上次写入检查点以来,此租赁更改了工作程序的次数。

  • parentShardId:用于确保在开始子分片的处理之前已经完全处理了父分片。这可以确保记录按照放入流中的相同顺序处理。

  • hashrange:PeriodicShardSyncManager运行定期同步以查找租赁表中缺少的分片,并根据需要为它们创建租约。

    注意

    这些数据出现在从 KCL 1.14 和 KCL 2.3 开始的每个分片的租赁表中。有关 的更多信息PeriodicShardSyncManager以及租赁和分片之间的定期同步,请参阅租赁表如何与 KDS 数据流中的分片同步.

  • 孩子碎片:LeaseCleanupManager以查看子分片的处理状态,并决定是否可以从租赁表中删除父分片。

    注意

    这些数据出现在从 KCL 1.14 和 KCL 2.3 开始的每个分片的租赁表中。

  • ShharDID:分片的 ID。

    注意

    这些数据仅出现在租赁表中,如果你是针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流. 这仅在 KCL 2.x 中对于 Java 提供支持,从 KCL 2.3 开始,对于 Java 及更高版本。

  • stream name数据流的标识符采用以下格式:account-id:StreamName:streamCreationTimestamp.

    注意

    这些数据仅出现在租赁表中,如果你是针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流. 这仅在 KCL 2.x 中对于 Java 提供支持,从 KCL 2.3 开始,对于 Java 及更高版本。

吞吐量

如果您的 Amazon Kinesis Data Streams 应用程序收到了预置吞吐量异常,您应增加 DynamoDB 表的预置吞吐量。KCL 会创建预置吞吐量为 10 次读取/秒和 10 次写入/秒的表,但这对于您的应用程序可能不够。例如,如果您的 Amazon Kinesis Data Streams 应用程序频繁执行检查点操作或对由许多分片组成的流执行操作,您可能需要更多吞吐量。

有关 DynamoDB 中预配置吞吐量的信息,请参阅读/写容量模式处理表和数据中的Amazon DynamoDB 开发人员指南.

租赁表如何与 KDS 数据流中的分片同步

KCL 消费者应用程序中的工作人员使用租赁处理来自给定数据流的分片。有关哪些员工在任何给定时间租赁什么分片的信息存储在租赁表中。在 KCL 使用者应用程序运行期间,租用表必须与数据流中的最新分片信息保持同步。KCL 将租赁表与从 Kinesis Data Streams 服务获取的分片信息同步在使用者应用程序引导期间(无论是在初始化或重新启动使用者应用程序时)以及在处理的分片到达结束时(重新分片)。换句话说,工作人员或 KCL 使用者应用程序应用程序将与他们在初始使用者应用程序引导期间以及每当使用者应用程序遇到数据流重新分片事件时所处理的数据流同步。

KCL 1.0-1.13 和 KCL 2.0-2.2 中的同步

在 KCL 1.0-1.13 和 KCL 2.0-2.2 中,在消费者应用程序的引导期间以及在每个数 Kinesis Data Streams 重新分片事件期间,KCL 通过调用ListShards或者DescribeStream发现 API。在上面列出的所有 KCL 版本中,KCL 使用者应用程序的每个工作程序都完成以下步骤,以便在使用者应用程序的引导期间和每个流重新分片事件中执行租赁/分片同步过程:

  • 获取正在处理的流的数据的所有分片

  • 从租赁表中获取所有分片租约

  • 筛选出租赁表中没有租约的每个打开的分片

  • 迭代所有找到的打开分片以及每个没有打开父级的打开分片:

    • 遍历层次结构树的祖先路径,以确定分片是否为后代。如果正在处理祖先分片(租赁表中存在祖先分片的租赁条目),或者是否应该处理祖先分片(例如,如果初始位置为TRIM_HORIZON要么AT_TIMESTAMP

    • 如果上下文中的开放分片是后代,KCL 会根据初始位置检查分片并根据需要为其父级创建租约

在 KCL 2.x 中进行同步,从 KCL 2.3 开始及以后

从 KCL 2.x (KCL 2.3) 及更高版本开始,库现在支持对同步过程进行以下更改。这些租赁/分片同步更改显著减少了 KCL 消费者应用程序对 Kinesis Data Streams 服务发出的 API 调用数量,并优化了 KCL 消费者应用程序中的租赁管理。

  • 在应用程序引导过程中,如果租赁表为空,KCL 会使用ListShardAPI 的过滤选项(ShardFilter可选请求参数),用于仅针对在指定的时间打开的分片的快照检索和创建租赁ShardFilter参数。这些区域有:ShardFilter参数使您能够过滤掉ListShardsAPI。唯一必需的财产ShardFilter参数为Type. KCL 使用Typefilter 属性及其有效值的以下有效值,用于识别并返回可能需要新租用的打开分片的快照:

    • AT_TRIM_HORIZON-回复包括所有打开的碎片TRIM_HORIZON.

    • AT_LATEST-响应仅包括当前打开的数据流分片。

    • AT_TIMESTAMP-响应包括开始时间戳小于或等于给定时间戳且结束时间戳大于或等于给定时间戳或仍然打开的所有分片。

    ShardFilter用于为空租赁表创建租赁,以便为指定的分片快照初始化租赁RetrievalConfig#initialPositionInStreamExtended.

    有关 ShardFilter 的更多信息,请参阅 https://docs.amazonaws.cn/kinesis/latest/APIReference/API_ShardFilter.html

  • 由一名当选的工作人员负责人执行租赁/分片同步以使租赁表与数据流中的最新分片保持最新状态,而不是执行租赁/分片同步的所有工作人员,而是执行租赁/分片同步。

  • KCL 2.3 使用ChildShards返回的参数GetRecordsSubscribeToShard用于执行租赁/分片同步的 APISHARD_END对于封闭的分片,允许 KCL 工作人员只能为已完成处理的分片的子分片创建租约。对于在整个消费者应用程序中共享,这种租赁/分片同步的优化使用ChildShards的参数GetRecordsAPI。对于专用吞吐量(增强的扇出)消费者应用程序,这种租赁/分片同步的优化使用ChildShards的参数SubscribeToShardAPI。有关更多信息,请参阅 。GetRecordsSubscribeTo分片, 和ChildShard.

  • 随着上述变化,KCL 的行为正从所有员工了解所有现有碎片的模式转变为工人只学习每个员工拥有的碎片的子级碎片的模式。因此,除了在消费者应用程序引导和重新分片事件期间发生的同步之外,KCL 现在还执行额外的定期分片/租赁扫描,以便识别租赁表中的任何潜在漏洞(换句话说,了解所有新分片),以确保完整正在处理数据流的哈希范围,并根据需要为它们创建租赁。PeriodicShardSyncManager是负责运行定期租赁/分片扫描的组件。

    有关 的更多信息PeriodicShardSyncManager在 KCL 2.3 中,请参阅https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/软件/亚马逊/kinesis/租赁/LeaseManagementConfig.java #L201-L213.

    在 KCL 2.3 中,可以配置新的配置选项PeriodicShardSyncManagerLeaseManagementConfig

    名称 默认值 说明
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000(2 分钟)

    在租赁表中扫描部分租约的审计员作业的频率(以毫秒为单位)。如果审计员检测到流的租约中有任何漏洞,那么它将根据以下方式触发分片同步leasesRecoveryAuditorInconsistencyConfidenceThreshold.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期审计员作业的可信度阈值,用于确定租赁表中数据流的租约是否不一致。如果审计员多次为数据流连续发现同一组不一致之处,那么它将触发分片同步。

    NewCloudWatch现在还可以发出指标来监控PeriodicShardSyncManager. 有关更多信息,请参阅 周期 SHARDSYNC 管理器

  • 包括优化HierarchicalShardSyncer只为一层碎片创建租约。

KCL 1.x 中的同步,从 KCL 1.14 及以后开始

从 KCL 1.x (KCL 1.14) 及更高版本开始,库现在支持对同步过程进行以下更改。这些租赁/分片同步更改显著减少了 KCL 消费者应用程序对 Kinesis Data Streams 服务发出的 API 调用数量,并优化了 KCL 消费者应用程序中的租赁管理。

  • 在应用程序引导过程中,如果租赁表为空,KCL 会使用ListShardAPI 的过滤选项(ShardFilter可选请求参数),用于仅针对在指定的时间打开的分片的快照检索和创建租赁ShardFilter参数。这些区域有:ShardFilter参数使您能够过滤掉ListShardsAPI。唯一必需的财产ShardFilter参数为Type. KCL 使用Typefilter 属性及其有效值的以下有效值,用于识别并返回可能需要新租用的打开分片的快照:

    • AT_TRIM_HORIZON-回复包括所有打开的碎片TRIM_HORIZON.

    • AT_LATEST-响应仅包括当前打开的数据流分片。

    • AT_TIMESTAMP-响应包括开始时间戳小于或等于给定时间戳且结束时间戳大于或等于给定时间戳或仍然打开的所有分片。

    ShardFilter用于为空租赁表创建租赁,以便为指定的分片快照初始化租赁KinesisClientLibConfiguration#initialPositionInStreamExtended.

    有关 ShardFilter 的更多信息,请参阅 https://docs.amazonaws.cn/kinesis/latest/APIReference/API_ShardFilter.html

  • 由一名当选的工作人员负责人执行租赁/分片同步以使租赁表与数据流中的最新分片保持最新状态,而不是执行租赁/分片同步的所有工作人员,而是执行租赁/分片同步。

  • KCL 1.14 使用ChildShards返回的参数GetRecordsSubscribeToShard用于执行租赁/分片同步的 APISHARD_END对于封闭的分片,允许 KCL 工作人员只能为已完成处理的分片的子分片创建租约。有关更多信息,请参阅 。GetRecordsChildShard.

  • 随着上述变化,KCL 的行为正从所有员工了解所有现有碎片的模式转变为工人只学习每个员工拥有的碎片的子级碎片的模式。因此,除了在消费者应用程序引导和重新分片事件期间发生的同步之外,KCL 现在还执行额外的定期分片/租赁扫描,以便识别租赁表中的任何潜在漏洞(换句话说,了解所有新分片),以确保完整正在处理数据流的哈希范围,并根据需要为它们创建租赁。PeriodicShardSyncManager是负责运行定期租赁/分片扫描的组件。

    何时KinesisClientLibConfiguration#shardSyncStrategyType设置为ShardSyncStrategyType.SHARD_ENDPeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold用于确定租赁表中包含漏洞的连续扫描次数的阈值,之后要强制执行分片同步。何时KinesisClientLibConfiguration#shardSyncStrategyType设置为ShardSyncStrategyType.PERIODICleasesRecoveryAuditorInconsistencyConfidenceThreshold已忽略。

    有关 的更多信息PeriodicShardSyncManager在 KCL 1.14 中,请参阅https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/服务/kinesis/客户库/lib/工作人员/KinesisClientLibConfiguration.java #L987-L999.

    在 KCL 1.14 中,新的配置选项可用于配置PeriodicShardSyncManagerLeaseManagementConfig

    名称 默认值 说明
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期审计员作业的可信度阈值,用于确定租赁表中数据流的租约是否不一致。如果审计员多次为数据流连续发现同一组不一致之处,那么它将触发分片同步。

    NewCloudWatch现在还可以发出指标来监控PeriodicShardSyncManager. 有关更多信息,请参阅 周期 SHARDSYNC 管理器

  • KCL 1.14 现在还支持延期租约清理。通过以下方式异步删除租赁LeaseCleanupManager到达后SHARD_END,当分片已超过数据流的保留期之后过期或者因重新分片操作而被关闭时。

    新的配置选项如下:LeaseCleanupManager.

    名称 默认值 说明
    leaseCleanupIntervalMillis

    1 minute

    运行租赁清理线程的时间间隔。

    completedLeaseCleanupIntervalMillis 5 分钟

    检查租赁是否完成的时间间隔。

    garbageLeaseCleanupIntervalMillis 30 分钟

    检查租约是否为垃圾的时间间隔(即已超过数据流的保留期限)与否。

  • 包括优化KinesisShardSyncer只为一层碎片创建租约。

针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流

本节介绍了 KCL 2.x for Java 中的以下更改,这些更改使您能够创建可以同时处理多个数据流的 KCL 使用者应用程序。

重要

只有针对 Java 的 KCL 2.x 支持多流处理,对于 Java 及更高版本,从 KCL 2.3 开始。

任何其他可以实现 KCL 2.x 的语言都不支持多流处理。

KCL 1.x 的任何版本都不支持多流处理。

  • MultistreamTracker 界面

    要构建可以同时处理多个流的消费者应用程序,必须实现一个名为MultistreamTracker. 此界面包括streamConfigList方法,返回要由 KCL 使用者应用程序处理的数据流及其配置的列表。请注意,正在处理的数据流可以在使用者应用程序运行时期间进行更改。streamConfigListKCL 定期调用,以了解要处理的数据流的变化。

    这些区域有:streamConfigList填充StreamConfig列表。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    请注意,StreamIdentifierInitialPositionInStreamExtended是必填字段,而consumerArn为可选项。您必须提供consumerArn仅当您使用 KCL 2.x 来实施增强型扇出消费者应用程序时才能使用。

    有关 的更多信息StreamIdentifier请参阅https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/软件/亚马逊/kinesis/常见/StreamIdentifier.java #L29. 您可以为StreamIdentifier来自序列化的流标识符。序列化的流标识符应采用以下格式:account-id:StreamName:streamCreationTimestamp.

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTracker还包括在租赁表中删除旧流的租赁的策略(formerStreamsLeasesDeletionStrategy)。请注意,在消费者应用程序运行时期间无法更改策略。有关更多信息,请参阅 。https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/软件/亚马逊/kinesis/处理器/FormerStreamsLeasesDeletionStrategy.java

  • ConfigsBuilder是一个应用程序范围的类,您可以用它来指定在构建 KCL 消费者应用程序时要使用的所有 KCL 2.x 配置设置。ConfigsBuilder课程现在支持MultistreamTracker接口。你可以初始化ConfigsBuilder要么是使用要从中消耗记录的一个数据流的名称:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    或者你可以初始化ConfigsBuilder和MultiStreamTracker如果你想实现一个同时处理多个流的 KCL 消费者应用程序。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 随着为 KCL 消费者应用程序实施了多流支持,应用程序的租赁表中的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。

  • 当实施对 KCL 消费者应用程序的多流支持时,leaseKey 将采用以下结构:account-id:StreamName:streamCreationTimestamp:ShardId. 例如,111111111:multiStreamTest-1:12345:shardId-000000000336.

    重要

    当您现有的 KCL 消费者应用程序配置为仅处理一个数据流时,leaseKey(即租赁表的哈希键)是分片 ID。如果您重新配置此现有的 KCL 消费者应用程序以处理多个数据流,它会打破租赁表,因为对于多流支持,leaseKey 结构必须如下所示:account-id:StreamName:StreamCreationTimestamp:ShardId.

将 Kinesis 客户端库与AmazonGlue 架构注册

您可以将 Kinesis 数据流与AmazonGlue 模式注册表。这些区域有:AmazonGlue 架构注册表允许您集中发现、控制和演变架构,同时确保已注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。这些区域有:AmazonGlue 模式注册表使你能够改进end-to-end流媒体应用程序中的数据质量和数据治理。有关更多信息,请参阅 。AmazonGlue 架构注册. 设置此集成的方法之一是通过 Java 中的 KCL。

重要

目前,Kinesis Data StreamsAmazon只有使用在 Java 中实现的 KCL 2.3 消费者的 Kinesis 数据流才支持 Glue 模式注册表集成。不提供多语言支持。不支持 KCL 1.0 消费者。不支持 KCL 2.3 之前的 KCL 2.x 消费者。

有关如何使用 KCL 将 Kinesis Data Streams 与模式注册表集成的详细说明,请参阅中的 “使用 KPL/KCL 库与数据交互” 部分使用案例:将 Amazon Kinesis Data Streams 与AmazonGlue 架构注册.