从 KCL 2.x 迁移到 KCL 3.x - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

从 KCL 2.x 迁移到 KCL 3.x

本主题提供将使用器从 KCL 2.x 迁移到 KCL 3.x 的 step-by-step说明。 KCL3.x 支持 KCL 2.x 使用者的就地迁移。您可以继续使用 Kinesis 数据流中的数据,同时以滚动方式迁移工作人员。

重要

KCL3.x 与 KCL 2.x 保持相同的接口和方法。因此,在迁移过程中,您不必更新记录处理代码。但是,您必须设置正确的配置并检查迁移所需的步骤。我们强烈建议您按照以下迁移步骤进行操作,以获得顺畅的迁移体验。

步骤 1:先决条件

在开始使用 KCL 3.x 之前,请确保您具备以下条件:

  • Java 开发套件 (JDK) 8 或更高版本

  • Amazon SDK for Java 2.x

  • 用于依赖管理的 Maven 或 Gradle

步骤 2:添加依赖关系

如果您使用的是 Maven,请将以下依赖项添加到您的pom.xml文件中。确保将 3.x.x 替换为最新版本。KCL

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

如果您使用的是 Gradle,请将以下内容添加到您的build.gradle文件中。确保将 3.x.x 替换为最新版本。KCL

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

你可以在 Maven 中央存储库中查看最新版本的。KCL

步骤 3:设置与迁移相关的配置

要从 KCL 2.x 迁移到 KCL 3.x,必须设置以下配置参数:

  • CoordinatorConfig。 clientVersionConfig:此配置决定了应用程序将在哪个KCL版本兼容模式下运行。从 KCL 2.x 迁移到 3.x 时,需要将此配置设置为。CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X要设置此配置,请在创建调度器对象时添加以下行:

configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)

以下是如何设置从 KCL 2.x 迁移到 3.x CoordinatorConfig.clientVersionConfig 的示例。您可以根据自己的具体要求根据需要调整其他配置:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

由于 KCL 2.x 和 3.x 使用不同的负载平衡算法,因此在给定时间使用相同的负载平衡算法,因此使用者应用程序中的所有工作程序都必须使用相同的负载平衡算法。使用不同的负载平衡算法运行 worker 可能会导致负载分布不理想,因为这两种算法是独立运行的。

此 KCL 2.x 兼容性设置允许您的 KCL 3.x 应用程序在与 2.x 兼容的模式下运行,并对 KCL 2.x 使用负载平衡算法,直到使用者应用程序中的所有工作程序都升级到 3. KCL x。KCL迁移完成后,KCL将自动切换到完整的 KCL 3.x 功能模式,并开始为所有正在运行的 worker 使用新的 KCL 3.x 负载平衡算法。

重要

如果您不是在使用ConfigsBuilder而是创建LeaseManagementConfig对象来设置配置,则必须再添加一个在 3.x 或更高KCL版本applicationName中调用的参数。有关详细信息,请参阅 LeaseManagementConfig 构造函数的编译错误。我们建议使用ConfigsBuilder来设置KCL配置。 ConfigsBuilder提供了一种更灵活、更易于维护的方式来配置您的KCL应用程序。

步骤 4:遵循 shutdownRequested () 方法实现的最佳实践

KCL3.x 引入了一项名为 “优雅租约移交” 的功能,以最大限度地减少在租约重新分配过程中将租约移交给另一名工作人员时对数据的重新处理。这是通过在租约移交之前检查租赁表中最后处理的序列号来实现的。为确保优雅的租约移交正常运行,必须确保在类的shutdownRequested方法中调用该checkpointer对象。RecordProcessor如果您没有在shutdownRequested方法中调用checkpointer对象,则可以按照以下示例所示实现该对象。

重要
  • 以下实现示例是优雅租约移交的最低要求。如果需要,您可以将其扩展为包括与检查点相关的其他逻辑。如果您正在执行任何异步处理,请确保在调用检查点检查点之前已处理所有传送到下游的记录。

  • 虽然优雅的租赁移交可以显著降低租赁转让期间重新处理数据的可能性,但它并不能完全消除这种可能性。为了保持数据的完整性和一致性,请将下游消费者应用程序设计为等性。这意味着他们应该能够处理潜在的重复记录处理,而不会对整个系统产生不利影响。

/** * Invoked when either Scheduler has been requested to gracefully shutdown * or lease ownership is being transferred gracefully so the current owner * gets one last chance to checkpoint. * * Checkpoints and logs the data a final time. * * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint * before the shutdown is completed. */ public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { // Ensure that all delivered records are processed // and has been successfully flushed to the downstream before calling // checkpoint // If you are performing any asynchronous processing or flushing to // downstream, you must wait for its completion before invoking // the below checkpoint method. log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } }

步骤 5:检查收集工作人员KCL指标的 3.x 先决条件

KCL3.x 收集CPU利用率指标,例如工作人员的CPU利用率,以均匀地平衡工作人员之间的负载。消费者应用程序工作人员可以在亚马逊EC2、亚马逊ECSEKS、亚马逊或上运行 Amazon Fargate。 KCL仅当满足以下先决条件时,3.x 才能从工作人员那里收集CPU利用率指标:

Amazon Elastic Compute Cloud(亚马逊EC2)

  • 您的操作系统必须是 Linux 操作系统。

  • 您必须在您的EC2实例IMDSv2中启用。

亚马逊上的亚马逊弹性容器服务(亚马逊ECS)EC2

Amazon ECS on Amazon Fargate

  • 您必须启用 Fargate 任务元数据端点版本 4。如果您使用 Fargate 平台版本 1.4.0 或更高版本,则默认启用此功能。

  • Fargate 平台版本 1.4.0 或更高版本。

亚马逊上的亚马逊 Elastic Kubernetes Service(亚马逊)EKSEC2

  • 您的操作系统必须是 Linux 操作系统。

Amazon EKS on Amazon Fargate

  • Fargate 平台 1.3.0 或更高版本。

重要

如果 KCL 3.x 由于未满足先决条件而无法从工作人员那里收集CPU利用率指标,则它将重新平衡每个租约的吞吐量级别的负载。这种后备再平衡机制将确保所有工作人员都能从分配给每个工作人员的租约中获得相似的总吞吐量水平。有关更多信息,请参阅 如何将租约KCL分配给员工并平衡负担

步骤 6:更新 KCL 3. IAM x 的权限

您必须向与 KCL 3.x 使用者应用程序关联的IAM角色或策略添加以下权限。这包括更新KCL应用程序使用的现有IAM策略。有关更多信息,请参阅 IAM使用KCL者应用程序所需的权限

重要

您的现有KCL应用程序可能未在IAM策略中添加以下IAM操作和资源,因为在 KCL 2.x 中不需要这些操作和资源。在运行 KCL 3.x 应用程序之前,请确保已添加它们:

  • 行动:UpdateTable

    • 资源 (ARNs): arn:aws:dynamodb:region:account:table/KCLApplicationName

  • 行动:Query

    • 资源 (ARNs): arn:aws:dynamodb:region:account:table/KCLApplicationName/Index/*

  • 操作:CreateTableDescribeTableScanGetItemPutItemUpdateItemDeleteItem

    • 资源 (ARNs):arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats, arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState

    将中的 “区域”、“账户” 和 “” KCLApplicationName 分别替换为你自己的名称 Amazon Web Services 区域、 Amazon Web Services 账户 号码和KCL应用程序名称。ARNs如果您使用配置来自定义由创建的元数据表的名称KCL,请使用这些指定的表名而不是KCL应用程序名称。

第 7 步:向KCL工作人员部署 3.x 代码

在设置了迁移所需的配置并完成了之前的所有迁移清单后,您可以构建代码并将其部署到您的工作线程中。

注意

如果您看到构造函数出现编译错误,请参阅LeaseManagementConfig构造函数的编译错误以获取疑难解答信息。 LeaseManagementConfig

步骤 8:完成迁移

在部署 KCL 3.x 代码期间,KCL继续使用 KCL 2.x 中的租赁分配算法。成功将 KCL 3.x 代码部署到所有工作人员后,KCL会自动检测到这一点,并根据工作人员的资源利用率切换到新的租赁分配算法。有关新的租赁分配算法的更多详细信息,请参阅如何将租约KCL分配给员工并平衡负担

在部署期间,您可以使用向其发送以下指标来 CloudWatch监控迁移过程。您可以监控Migration操作下的指标。所有指标均为 per-KCL-application指标,并设置为SUMMARY指标级别。如果该CurrentState:3xWorker指标的Sum统计数据与KCL应用程序中的工作人员总数相匹配,则表示已成功完成向 KCL 3.x 的迁移。

重要

在所有工作人员都准备好运行新的租赁人分配算法之后,至少需要 10 分钟才能切换到该算法。KCL

CloudWatch KCL迁移过程的指标
Metrics 描述
CurrentState:3xWorker

成功迁移到 KCL 3.x 并运行新的租赁分配算法KCL的工作人员数量。如果此指标的Sum计数与您的工作人员总数相匹配,则表示已成功完成向 KCL 3.x 的迁移。

  • 指标级别:汇总

  • 单位:计数

  • 统计:最有用的统计数据是 Sum

CurrentState:2xCompatibleWorker

迁移过程中在 KCL 2.x 兼容模式下运行KCL的工作器数量。此指标的非零值表示迁移仍在进行中。

  • 指标级别:汇总

  • 单位:计数

  • 统计:最有用的统计数据是 Sum

Fault

迁移过程中遇到的异常数。这些异常大多数是暂时性错误,KCL3.x 将自动重试以完成迁移。如果您观察到永久性Fault指标值,请查看迁移期间的日志以进行进一步的故障排除。如果问题仍然存在,请联系 Amazon Web Services 支持。

  • 指标级别:汇总

  • 单位:计数

  • 统计:最有用的统计数据是 Sum

GsiStatusReady

租赁表上全局二级索引 (GSI) 的创建状态。此指标表明是否已创建租赁表GSI上的,这是运行 KCL 3.x 的先决条件。值为 0 或 1,其中 1 表示成功创建。在回滚状态下,不会发出此指标。再次向前滚动后,您可以继续监控此指标。

  • 指标级别:汇总

  • 单位:计数

  • 统计:最有用的统计数据是 Sum

workerMetricsReady

所有工作人员排放的员工指标的状态。这些指标表明是否所有工作人员都在发布诸如CPU利用率之类的指标。该值为 0 或 1,其中 1 表示所有工作人员都已成功发出指标并准备好使用新的租赁分配算法。在回滚状态下,不会发出此指标。再次向前滚动后,您可以继续监控此指标。

  • 指标级别:汇总

  • 单位:计数

  • 统计:最有用的统计数据是 Sum

KCL在迁移期间为 2.x 兼容模式提供回滚功能。成功迁移到 KCL 3.x 后,我们建议您删除 “CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X如果不再需要回滚” 的CoordinatorConfig.clientVersionConfig设置。移除此配置会阻止应用程序发布与迁移相关的指标。KCL

注意

我们建议您在迁移期间和完成迁移后的一段时间内监控应用程序的性能和稳定性。如果您发现任何问题,则可以使用KCL迁移工具回滚工作线程以使用与 KCL 2.x 兼容的功能。