使用 Kinesis 客户端库 - Amazon Kinesis Data Streams
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis 客户端库

开发可以处理来自 KDS 数据流的数据的自定义消费者应用程序的方法之一是使用 Kinesis 客户端库 (KCL)。

什么是 Kinesis 客户端库?

KCL 通过处理与分布式计算相关的许多复杂任务,帮助您使用和处理 Kinesis 数据流中的数据。这些包括跨多个使用者应用程序实例执行负载均衡、响应使用者应用程序实例故障、对已处理记录执行检查点操作和对重新分片做出反应。KCL 负责处理所有这些子任务,以便您可以集中精力编写自定义记录处理逻辑。

KCL 与 AWS 开发工具包中提供的 Kinesis Data Streams API 不同。Kinesis API 可帮助您管理 Kinesis 数据流的许多方面,包括创建流、重新分片以及放置并获取记录。KCL 针对所有这些子任务提供了一个抽象层,这样您就可以专注于消费者应用程序的自定义数据处理逻辑。有关 Kinesis Data Streams API 的信息,请参阅Amazon Kinesis API 参考

重要

KCL 是一个 Java 库。使用称为多语言守护进程的多语言界面提供了对 Java 以外的语言的 Support。此守护进程是基于 Java 的,当您使用的是 Java 以外的 KCL 语言时,它会在后台运行。例如,如果您安装了适用于 Python 的 KCL 并完全在 Python 中编写使用者应用程序,则由于 MultiLangDaemon,您仍需要在您的系统中安装 Java。此外,MultiLangDaemon 有一些默认设置,您可能需要根据自己的使用案例自定义这些设置,例如,所连接到的 AWS 区域。有关 GitHub 上的 MultiLangDaemon 的更多信息,请参阅KCL MultiLangDaemon 项目

KCL 充当您的记录处理逻辑和 Kinesis Data Streams 之间的中介。KCL 执行以下任务:

  • 连接到数据流

  • 枚举数据流中的分片

  • 使用租赁协调与其工作人员的分片关联

  • 为其管理的每个分片实例化记录处理器

  • 从数据流中提取数据记录

  • 将记录推送到对应的记录处理器

  • 对已处理记录进行检查点操作

  • 当工作人员实例计数发生变化或重新分片时(分片被拆分或合并)平衡分片-工作人员关联(租用)

KCL 可用版本

目前,您可以使用以下受支持的 KCL 版本之一来构建自定义使用者应用程序:

您可以使用 KCL 1.x 或 KCL 2.x 构建使用共享吞吐量的使用者应用程序。有关更多信息,请参阅 使用 KCL 开发具有共享吞吐量的自定义使用者

要构建使用专用吞吐量(增强的扇出使用者)的消费者应用程序,您只能使用 KCL 2.x。有关更多信息,请参阅 开发具有专用吞吐量的自定义使用者(增强扇出功能)

有关 KCL 1.x 和 KCL 2.x 之间的区别的信息,以及如何从 KCL 1.x 迁移到 KCL 2.x 的说明,请参阅将使用者从 KCL 1.x 迁移到 KCL 2.x

KCL 概念

  • KCL 使用者应用程序— 一种使用 KCL 自定义构建的应用程序,旨在读取和处理数据流中的记录。

  • 使用者应用程序例-KCL 使用者应用程序通常是分布式的,一个或多个应用程序实例同时运行,以便在故障时进行协调并动态负载平衡数据记录处理。

  • 工作线程— KCL 使用者应用程序实例用于开始处理数据的高级别类。

    重要

    每个 KCL 使用者应用程序实例有一个工作程序。

    工作程序初始化并监督各种任务,包括同步分片和租赁信息、跟踪分片分配以及处理分片中的数据。工作程序向 KCL 提供使用者应用程序的配置信息,例如此 KCL 使用者应用程序将处理其数据记录的数据流的名称以及访问此数据流所需的 AWS 证书。工作程序还启动特定的 KCL 使用者应用程序实例,以将数据记录从数据流传送到记录处理器。

    重要

    在 KCL 1.x 中,这个类被称为工作线程。有关详细信息(这些是 Java KCL 存储库),请参阅https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java。在 KCL 2.x 中,这个类被称为计划程序。KCL 2.x 中的调度程序的目的与 KCL 1.x 中的工作人员的目的相同。有关 KCL 2.x 中的调度程序类的更多信息,请参阅https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java

  • 租赁― 定义工作线程和分片之间绑定的数据。分布式 KCL 使用者应用程序使用租约对工作线程的数据记录处理进行分区。在任何给定的时间,每个数据记录分片都通过leaseKey变量。

    默认情况下,工作人员可以持有一个或多个租赁(取决于适用于工作者的最大租赁变量)。

    重要

    每个工作程序都会争取保留数据流中所有可用分片的所有可用租赁。但在任何时候,只有一名工作人员才能成功地持有每个租约。

    例如,如果您的使用者应用程序实例 A 具有工作程序 A,该实例正在处理具有 4 个分片的数据流,则工作程序 A 可以同时保留分片 1、2、3 和 4 的租约。但是,如果您有两个消费者应用程序实例:A 和 B,并且这些实例正在处理具有 4 个分片的数据流,工作程序 A 和工作程序 B 不能同时持有分片 1 的租约。一个工作程序持有特定分片的租约,直到它准备好停止处理此分片的数据记录或直到它失败为止。当一名工作人员停止持有租约时,另一名工作人员占用并持有租赁。

    有关详细信息(这些是 Java KCL 存储库),请参阅https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java适用于 KCL 1.x 和https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.javaKCL 2.x。

  • 租赁表-一个唯一的 Amazon DynamoDB 表,用于跟踪 KDS 数据流中正由 KCL 消费者应用程序的工作人员租用和处理的分片。在 KCL 使用者应用程序运行时,租用表必须保持与来自数据流的最新分片信息同步(工作线程内部和所有工作线程之间)。有关更多信息,请参阅 使用租赁表跟踪 KCL 使用者应用程序处理的分片

  • 记录处理器— 定义 KCL 使用者应用程序如何处理从数据流中获取的数据的逻辑。在运行时,KCL 使用者应用程序实例化一个工作程序,并且此工作程序为其拥有租约的每个分片实例化一个记录处理器。

使用租赁表跟踪 KCL 使用者应用程序处理的分片

什么是租赁表

对于每个 Amazon Kinesis Data Streams 应用程序,KCL 使用唯一的租用表(存储在 Amazon DynamoDB 表中)来跟踪 KDS 数据流中的分片,这些分片正由 KCL 消费者应用程序的工作人员租用和处理。

重要

KCL 使用者应用程序的名称来创建此使用者应用程序使用的租赁表的名称,因此每个使用者应用程序名称都必须是唯一的。

您可以使用Amazon DynamoDB 控制台,而使用者应用程序正在运行。

如果您的 KCL 使用者应用程序的租赁表在应用程序启动时不存在,则其中一个工作程序会为此应用程序创建租赁表。

重要

您的账户将被收取与 DynamoDB 表关联的费用(除开与 Kinesis Data Streams 本身关联的费用)。

租赁表中的每行表示您的使用者应用程序的工作程序正在处理的分片。如果您的 KCL 使用者应用程序只处理一个数据流,则leaseKey这是租赁表的哈希键是分片 ID。如果您是针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流,那么 leaseKey 的结构如下所示:account-id:StreamName:streamCreationTimestamp:ShardId。例如,111111111:multiStreamTest-1:12345:shardId-000000000336

除了分片 ID 以外,每行还包含以下数据:

  • 检查点: 分片的最新检查点序号。此值在数据流中的所有分片中都是唯一的。

  • checkpointSubSequenceNumber: 使用 Kinesis 的聚合功能时,这是检查点,用于跟踪 Kinesis 记录中的各个用户记录。

  • leaseCounter: 用于租赁版本控制,这样工作程序可以检测其租赁已由其他工作程序获取。

  • leaseKey: 租赁的唯一标识符。每个租赁特定于数据流中的一个分片,一次由一个工作程序持有。

  • leaseOwner: 持有此租赁的工作程序。

  • ownerSwitchesSinceCheckpoint: 自上次写入检查点以来,此租赁更改了工作程序的次数。

  • parentShardId: 用于确保在开始子分片上的处理之前已经完全处理了父分片。这可以确保记录按照放入流中的相同顺序处理。

  • 散列范围: 使用PeriodicShardSyncManager运行定期同步以查找租用表中缺少的分片,并根据需要为它们创建租用。

    注意

    以 KCL 1.14 和 KCL 2.3 开始的每个分片的租赁表中都有这些数据。有关 的更多信息PeriodicShardSyncManager和租赁和分片之间的定期同步,请参阅租赁表如何与 KDS 数据流中的分片同步

  • 子分片: 使用LeaseCleanupManager查看子分片的处理状态,并决定是否可以从租用表中删除父分片。

    注意

    以 KCL 1.14 和 KCL 2.3 开始的每个分片的租赁表中都有这些数据。

  • SharDID: 分片的 ID。

    注意

    此数据仅出现在租赁表中,如果针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流。这仅在 Java 的 KCL 2.x 中受支持,从 Java 及更高版本的 KCL 2.3 开始。

  • stream name数据流的标识符,采用以下格式:account-id:StreamName:streamCreationTimestamp

    注意

    此数据仅出现在租赁表中,如果针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流。这仅在 Java 的 KCL 2.x 中受支持,从 Java 及更高版本的 KCL 2.3 开始。

Throughput

如果 Amazon Kinesis 应用程序收到了预置的吞吐量异常,您应为 DynamoDB 表增加预置的吞吐量。KCL 会创建预置吞吐量为 10 次读取/秒和 10 次写入/秒的表,但这对于您的应用程序可能不够。例如,如果 Amazon Kinesis 应用程序执行频繁的检查点操作或对由很多分片组成的流执行操作,您可能需要更多吞吐量。

有关 DynamoDB 中预配置吞吐量的信息,请参阅读/写容量模式处理表和数据中的Amazon DynamoDB 开发人员指南

租赁表如何与 KDS 数据流中的分片同步

KCL 使用者应用程序中的工作人员使用租约来处理给定数据流中的分片。有关哪个工作人员在任何给定时间租用哪些分片的信息存储在租赁表中。在 KCL 使用者应用程序运行时,租用表必须与来自数据流的最新分片信息保持同步。KCL 在使用者应用程序引导期间(初始化或重新启动使用者应用程序时)以及正在处理的分片到达结束时(重新分片),将租用表与从 Kinesis Data Streams 服务获取的分片信息同步。换句话说,工作程序或 KCL 使用者应用程序在初始使用者应用程序引导期间以及每当使用者应用程序遇到数据流 reshard 事件时,它们正在处理的数据流同步。

同步在 KCL 1.0-1.13 和 KCL 2.0-2.2

在 KCL 1.0-1.13 和 KCL 2.0-2.2 中,在使用者应用程序的引导过程中以及在每个数据流重新分割事件期间,KCL 将租用表与从 Kinesis Data Streams 服务获取的分片信息同步,方法是调用ListShardsDescribeStream发现 API。在上面列出的所有 KCL 版本中,KCL 使用者应用程序的每个工作程序都完成以下步骤,以便在使用者应用程序的引导和每个流重新分片事件期间执行租用/分片同步过程:

  • 获取正在处理的流的数据的所有分片

  • 从租用表中获取所有分片租约

  • 过滤掉租用表中没有租约的每个打开的分片

  • 迭代所有找到的打开的分片以及每个没有打开父项的打开分片:

    • 遍历层次结构树的祖先路径以确定分片是否为子体。如果正在处理祖先分片(租用表中存在祖先分片的租用条目)或者应处理祖先分片(例如,如果初始位置为TRIM_HORIZON或者AT_TIMESTAMP)

    • 如果上下文中的打开分片是子体,KCL 会根据初始位置检查分片,并根据需要为其父项创建租约

在 KCL 2.x 中进行同步,从 KCL 2.3 开始及更高版本

从最新支持的 KCL 2.x (KCL 2.3) 及更高版本开始,库现在支持对同步过程进行以下更改。这些租用/分片同步更改显著减少了 KCL 使用者应用程序对 Kinesis 数据流服务进行的 API 调用次数,并优化了 KCL 消费者应用程序中的租赁管理。

  • 在应用程序引导过程中,如果租用表为空,KCL 将使用ListShardAPI 的过滤选项(ShardFilter可选请求参数),以便仅检索和创建在ShardFilter参数。这些区域有:ShardFilter参数允许您过滤掉ListShardsAPI。唯一必需的属性ShardFilter参数为Type。KCL 使用Typefilter 属性及其下列有效值来标识和返回可能需要新租约的打开分片的快照:

    • AT_TRIM_HORIZON-响应包括所有在TRIM_HORIZON

    • AT_LATEST-响应仅包含数据流当前打开的分片。

    • AT_TIMESTAMP-响应包括开始时间戳小于或等于给定时间戳且结束时间戳大于或等于给定时间戳或仍然打开的所有分片。

    ShardFilter用于为空租赁表创建租赁,以初始化RetrievalConfig#initialPositionInStreamExtended

    有关 的更多信息ShardFilter,请参阅https://docs.amazonaws.cn/kinesis/latest/APIReference/API_ShardFilter.html

  • 而不是执行租用/分片同步以使租用表与数据流中的最新分片保持最新状态的所有工作程序,而是由单个选定的工作线程负责人执行租用/分片同步。

  • KCL 2.3 使用ChildShards返回参数GetRecordsSubscribeToShard用于执行在发生的租用/分片同步的 APISHARD_END,允许 KCL 工作程序仅为其完成处理的分片的子分片创建租约。对于在整个使用者应用程序中共享,租用/分片同步的此优化使用ChildShards的参数GetRecordsAPI。对于专用吞吐量(增强扇出)消费者应用程序,租用/分片同步的此优化使用ChildShards的参数SubscribeToShardAPI。有关更多信息,请参阅 。GetRecords订阅分片, 和儿童碎片

  • 随着上述变化,KCL 的行为正在从所有工作人员学习所有现有分片的模型转移到工作人员仅了解每个工作人员拥有的分片的子分片的模型。因此,除了在使用者应用程序引导和重新分片事件期间发生的同步之外,KCL 现在还会执行额外的定期分片/租赁扫描,以确定租用表中的任何潜在漏洞(换句话说,了解所有新分片),以确保完整哈希范围正在处理,并根据需要为它们创建租约。PeriodicShardSyncManager是负责运行定期租用/分片扫描的组件。

    有关 的更多信息PeriodicShardSyncManager,请参阅https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213

    在 KCL 2.3 中,新的配置选项可用于配置PeriodicShardSyncManagerinLeaseManagementConfig

    名称 默认值 描述
    租赁恢复审计费用频率

    2 万 (2 分钟)

    审计员作业扫描租赁表中部分租约的频率(以毫秒为单位)。如果审计员检测到流的租赁中的任何漏洞,那么它会根据leasesRecoveryAuditorInconsistencyConfidenceThreshold

    租赁恢复审计不一致信任阈值

    3

    定期审计员作业的置信阈值,用于确定租用表中数据流的租约是否不一致。如果审计员连续发现数据流的同一组不一致,那么它会触发分片同步。

    现在还会发出新的 CloudWatch 指标,以监控PeriodicShardSyncManager。有关更多信息,请参阅 PeriodicShardSyncManager

  • 包括优化HierarchicalShardSyncer仅为一层分片创建租约。

在 KCL 1.x 中进行同步,从 KCL 1.14 及以上开始

从最新支持的 KCL 1.x (KCL 1.14) 及以上版本开始,库现在支持对同步过程进行以下更改。这些租用/分片同步更改显著减少了 KCL 使用者应用程序对 Kinesis 数据流服务进行的 API 调用次数,并优化了 KCL 消费者应用程序中的租赁管理。

  • 在应用程序引导过程中,如果租用表为空,KCL 将使用ListShardAPI 的过滤选项(ShardFilter可选请求参数),以便仅检索和创建在ShardFilter参数。这些区域有:ShardFilter参数允许您过滤掉ListShardsAPI。唯一必需的属性ShardFilter参数为Type。KCL 使用Typefilter 属性及其下列有效值来标识和返回可能需要新租约的打开分片的快照:

    • AT_TRIM_HORIZON-响应包括所有在TRIM_HORIZON

    • AT_LATEST-响应仅包含数据流当前打开的分片。

    • AT_TIMESTAMP-响应包括开始时间戳小于或等于给定时间戳且结束时间戳大于或等于给定时间戳或仍然打开的所有分片。

    ShardFilter用于为空租赁表创建租赁,以初始化KinesisClientLibConfiguration#initialPositionInStreamExtended

    有关 的更多信息ShardFilter,请参阅https://docs.amazonaws.cn/kinesis/latest/APIReference/API_ShardFilter.html

  • 而不是执行租用/分片同步以使租用表与数据流中的最新分片保持最新状态的所有工作程序,而是由单个选定的工作线程负责人执行租用/分片同步。

  • KCL 1.14 使用ChildShards返回参数GetRecordsSubscribeToShard用于执行在发生的租用/分片同步的 APISHARD_END,允许 KCL 工作程序仅为其完成处理的分片的子分片创建租约。有关更多信息,请参阅 。GetRecords儿童碎片

  • 随着上述变化,KCL 的行为正在从所有工作人员学习所有现有分片的模型转移到工作人员仅了解每个工作人员拥有的分片的子分片的模型。因此,除了在使用者应用程序引导和重新分片事件期间发生的同步之外,KCL 现在还会执行额外的定期分片/租赁扫描,以确定租用表中的任何潜在漏洞(换句话说,了解所有新分片),以确保完整哈希范围正在处理,并根据需要为它们创建租约。PeriodicShardSyncManager是负责运行定期租用/分片扫描的组件。

    何时KinesisClientLibConfiguration#shardSyncStrategyType设置为ShardSyncStrategyType.SHARD_ENDPeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold用于确定包含租用表中漏洞的连续扫描次数的阈值,之后将强制执行分片同步。何时KinesisClientLibConfiguration#shardSyncStrategyType设置为ShardSyncStrategyType.PERIODICleasesRecoveryAuditorInconsistencyConfidenceThreshold将被忽略。

    有关 的更多信息PeriodicShardSyncManager在 KCL 1.14 中,请参阅https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999

    在 KCL 1.14 中,新的配置选项可用于配置PeriodicShardSyncManagerinLeaseManagementConfig

    名称 默认值 描述
    租赁恢复审计不一致信任阈值

    3

    定期审计员作业的置信阈值,用于确定租用表中数据流的租约是否不一致。如果审计员连续发现数据流的同一组不一致,那么它会触发分片同步。

    现在还会发出新的 CloudWatch 指标,以监控PeriodicShardSyncManager。有关更多信息,请参阅 PeriodicShardSyncManager

  • KCL 1.14 现在还支持延迟租赁清理。租赁将通过LeaseCleanupManager达到SHARD_END,当分片已过期超过数据流的保留期或由于重新分片操作而关闭时。

    可以使用新的配置选项配置LeaseCleanupManager

    名称 默认值 描述
    瓦尔米利斯租赁公司

    1 minute

    运行租约清理线程的时间间隔。

    完整的外部瓦尔米利斯 5 分钟

    检查租约是否已完成的时间间隔。

    瓦尔米利斯 30 分钟

    检查租约是否为垃圾(即修剪过数据流的保留期)的时间间隔。

  • 包括优化KinesisShardSyncer仅为一层分片创建租约。

针对 Java 消费者应用程序使用相同的 KCL 2.x 处理多个数据流

本节介绍了 KCL 2.x 适用于 Java 中的以下更改,这些更改使您能够创建可以同时处理多个数据流的 KCL 使用者应用程序。

重要

仅在 Java 的 KCL 2.x 中支持多流处理,从 Java 及更高版本的 KCL 2.3 开始。

可以实现 KCL 2.x 的任何其他语言都不支持多流处理。

任何版本的 KCL 1.x 都不支持多流处理。

  • Multi流跟踪接口

    要构建可同时处理多个流的使用者应用程序,您必须实现一个名为多流跟踪。此接口包括streamConfigList方法,该方法返回要由 KCL 使用者应用程序处理的数据流及其配置的列表。请注意,正在处理的数据流可以在使用者应用程序运行时更改。streamConfigList由 KCL 定期调用,以了解要处理的数据流中的更改。

    这些区域有:streamConfigList方法填充流配置列表。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    请注意,StreamIdentifierInitialPositionInStreamExtended是必填字段,而consumerArn为可选项。您必须提供consumerArn仅当您使用 KCL 2.x 来实现增强的扇出消费者应用程序时。

    有关 的更多信息StreamIdentifier,请参阅https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L29。您可以创建一个多流实例StreamIdentifier来自序列化流标识符。序列化流标识符应采用以下格式:account-id:StreamName:streamCreationTimestamp

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTracker还包括一个用于删除租赁表中旧流租赁的策略(formerStreamsLeasesDeletionStrategy)。请注意,不能在使用者应用程序运行时更改策略。有关更多信息,请参阅 。https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java

  • ConfigsBuilder是一个应用程序范围的类,您可以使用它来指定构建 KCL 使用者应用程序时要使用的所有 KCL 2.x 配置设置。ConfigsBuilder类现在支持MultistreamTracker接口。您可以使用一个数据流的名称来初始化 ConfigsBuilder,以便使用来自以下内容的记录:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    或者,您可以使用MultiStreamTracker如果您想要实现同时处理多个流的 KCL 使用者应用程序。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 通过为 KCL 使用者应用程序实施多流支持,应用程序租用表的每一行现在都包含分片 ID 和此应用程序处理的多个数据流的流名称。

  • 当对您的 KCL 消费者应用程序实施多流支持时,leaseKey 采用以下结构:account-id:StreamName:streamCreationTimestamp:ShardId。例如,111111111:multiStreamTest-1:12345:shardId-000000000336

    重要

    当您的现有 KCL 使用者应用程序配置为仅处理一个数据流时,leaseKey(即租用表的哈希键)就是分片 ID。如果将此现有 KCL 使用者应用程序重新配置为处理多个数据流,则会破坏租用表,因为使用多流支持,leaseKey 结构必须如下所示:account-id:StreamName:StreamCreationTimestamp:ShardId

将 Kinesis 客户端库与 AWS Glue Schema 注册表一起使用

您可以将 Kinesis 数据流与 AWS Glue Schema 注册表集成。AWS Glue 架构注册表允许您集中发现、控制和发展模式,同时确保生成的数据经注册模式持续验证。架构定义数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。AWS Glue 架构注册表使您能够改进流应用程序中的端到端数据质量和数据治理。有关更多信息,请参阅 。AWS Glue 架构注册表。设置此集成的方法之一是通过 Java 中的 KCL。

重要

目前,Kinesis Data Streams 和 AWS Glue 架构注册表集成仅适用于使用 Java 中实现的 KCL 2.3 使用者的 Kinesis 数据流。不提供多语言支持。不支持 KCL 1.0 使用者。不支持 KCL 2.3 之前的 KCL 2.x 消费者。

有关如何使用 KCL 设置 Kinesis 数据流与模式注册表集成的详细说明,请参阅使用案例:将 Amazon Kinesis Data Streams 与 AWS Glue Schema 注册表集成