Migrate from KCL 2.x to KCL 3.x
This topic provides step-by-step instructions to migrate your consumer from KCL 2.x to KCL 3.x. KCL 3.x supports in-place migration of KCL 2.x consumers. You can continue consuming the data from your Kinesis data stream while migrating your workers in a rolling manner.
Important
KCL 3.x maintains the same interfaces and methods as KCL 2.x. Therefore you don’t have to update your record processing code during the migration. However, you must set the proper configuration and check the required steps for the migration. We highly recommend that you follow the following migration steps for a smooth migration experience.
Step 1: Prerequisites
Before you start using KCL 3.x, make sure that you have the following:
-
Java Development Kit (JDK) 8 or later
-
Amazon SDK for Java 2.x
-
Maven or Gradle for dependency management
Step 2: Add dependencies
If you're using Maven, add the following dependency to your pom.xml
file. Make sure you replaced 3.x.x to the latest KCL version.
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
If you're using Gradle, add the following to your build.gradle
file.
Make sure you replaced 3.x.x to the latest KCL version.
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
You can check for the latest version of the KCL on the Maven Central Repository
Step 3: Set up the migration-related configuration
To migrate from KCL 2.x to KCL 3.x, you must set the following configuration parameter:
-
CoordinatorConfig.clientVersionConfig: This configuration determines which KCL version compatibility mode the application will run in. When migrating from KCL 2.x to 3.x, you need to set this configuration to
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
. To set this configuration, add the following line when creating your scheduler object:
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
The following is an example of how to set the
CoordinatorConfig.clientVersionConfig
for migrating from
KCL 2.x to 3.x. You can adjust other configurations as needed based on your
specific requirements:
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
It's important that all workers in your consumer application use the same load balancing algorithm at a given time because KCL 2.x and 3.x use different load balancing algorithms. Running workers with different load balancing algorithms can cause suboptimal load distribution as the two algorithms operate independently.
This KCL 2.x compatibility setting allows your KCL 3.x application to run in a mode compatible with KCL 2.x and use the load balancing algorithm for KCL 2.x until all workers in your consumer application have been upgraded to KCL 3.x. When the migration is complete, KCL will automatically switch to full KCL 3.x functionality mode and start using a new KCL 3.x load balancing algorithm for all running workers.
Important
If you are not using ConfigsBuilder
but creating a
LeaseManagementConfig
object to set configurations, you must
add one more parameter called applicationName
in KCL
version 3.x or later. For details, see Compilation error with the LeaseManagementConfig constructorConfigsBuilder
to set KCL configurations.
ConfigsBuilder
provides a more flexible and maintainable way to
configure your KCL application.
Step 4: Follow best practices for the shutdownRequested() method implementation
KCL 3.x introduces a feature called graceful lease handoff to
minimize the reprocessing of data when a lease is handed over to another worker as
part of the lease reassignment process. This is achieved by checkpointing the last
processed sequence number in the lease table before the lease handoff. To ensure the
graceful lease handoff works properly, you must make sure that you invoke the
checkpointer
object within the shutdownRequested
method in your RecordProcessor
class. If you're not invoking the
checkpointer
object within the shutdownRequested
method, you can implement it as illustrated in the following example.
Important
-
The following implementation example is a minimal requirement for the graceful lease handoff. You can extend it to include additional logic related to the checkpointing if needed. If you are performing any asynchronous processing, make sure that all delivered records to the downstream were processed before invoking checkpointing.
-
While graceful lease handoff significantly reduces the likelihood of data reprocessing during lease transfers, it does not entirely eliminate this possibility. To preserve data integrity and consistency, design your downstream consumer applications to be idempotent. This means they should be able to handle potential duplicate record processing without adverse effects on the overall system.
/** * Invoked when either Scheduler has been requested to gracefully shutdown * or lease ownership is being transferred gracefully so the current owner * gets one last chance to checkpoint. * * Checkpoints and logs the data a final time. * * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint * before the shutdown is completed. */ public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { // Ensure that all delivered records are processed // and has been successfully flushed to the downstream before calling // checkpoint // If you are performing any asynchronous processing or flushing to // downstream, you must wait for its completion before invoking // the below checkpoint method. log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } }
Step 5: Check the KCL 3.x prerequisites for collecting worker metrics
KCL 3.x collects CPU utilization metrics such as CPU utilization from workers to balance the load across workers evenly. Consumer application workers can run on Amazon EC2, Amazon ECS, Amazon EKS, or Amazon Fargate. KCL 3.x can collect CPU utilization metrics from workers only when the following prerequisites are met:
Amazon Elastic Compute Cloud(Amazon EC2)
-
Your operating system must be Linux OS.
-
You must enable IMDSv2
in your EC2 instance.
Amazon Elastic Container Service (Amazon ECS) on Amazon EC2
-
Your operating system must be Linux OS.
-
You must enable ECS task metadata endpoint version 4
. -
Your Amazon ECS container agent version must be 1.39.0 or later.
Amazon ECS on Amazon Fargate
-
You must enable Fargate task metadata endpoint version 4
. If you use Fargate platform version 1.4.0 or later, this is enabled by default. -
Fargate platform version 1.4.0 or later.
Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2
-
Your operating system must be Linux OS.
Amazon EKS on Amazon Fargate
-
Fargate platform 1.3.0 or later.
Important
If KCL 3.x cannot collect CPU utilization metrics from workers because prerequisites are not met, it will rebalance the load the throughput level per lease. This fallback rebalancing mechanism will make sure all workers will get similar total throughput levels from leases assigned to each worker. For more information, see How KCL assigns leases to workers and balances the load.
Step 6: Update IAM permissions for KCL 3.x
You must add the following permissions to the IAM role or policy associated with your KCL 3.x consumer application. This involves updating the existing IAM policy used by the KCL application. For more information, see IAM permissions required for KCL consumer applications.
Important
Your existing KCL applications might not have the following IAM actions and resources added in the IAM policy because they were not needed in KCL 2.x. Make sure that you have added them before running your KCL 3.x application:
-
Actions:
UpdateTable
-
Resources (ARNs):
arn:aws:dynamodb:region:account:table/KCLApplicationName
-
-
Actions:
Query
-
Resources (ARNs):
arn:aws:dynamodb:region:account:table/KCLApplicationName/Index/*
-
-
Actions:
CreateTable
,DescribeTable
,Scan
,GetItem
,PutItem
,UpdateItem
,DeleteItem
-
Resources (ARNs):
arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats
,arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState
Replace "region," "account," and "KCLApplicationName" in the ARNs with your own Amazon Web Services Region, Amazon Web Services account number, and KCL application name respectively. If you use configurations to customize names of the metadata tables created by KCL, use those specified table names instead of KCL application name.
-
Step 7: Deploy KCL 3.x code to your workers
After you have set the configuration required for the migration and completed the all previous migration checklists, you can build and deploy your code to your workers.
Note
If you see a compilation error with the LeaseManagementConfig
constructor, see Compilation error with the LeaseManagementConfig constructor
Step 8: Complete the migration
During the deployment of KCL 3.x code, KCL continues using the lease assignment algorithm from KCL 2.x. When you have successfully deployed KCL 3.x code to all of your workers, KCL automatically detects this and switches to the new lease assignment algorithm based on resource utilization of the workers. For more details about the new lease assignment algorithm, see How KCL assigns leases to workers and balances the load.
During the deployment, you can monitor the migration process with the following
metrics emitted to CloudWatch. You can monitor metrics under the Migration
operation. All metrics are per-KCL-application metrics and set to the
SUMMARY
metric level. If the Sum
statistic of the
CurrentState:3xWorker
metric matches the total number of workers in
your KCL application, it indicates that the migration to KCL 3.x
has successfully completed.
Important
It takes at least 10 minutes for KCL to switch to the new leasee assignment algorithm after all workers are ready to run it.
Metrics | Description |
---|---|
CurrentState:3xWorker |
The number of KCL workers successfully migrated to
KCL 3.x and running the new lease assignment
algorithm. If the
|
CurrentState:2xCompatibleWorker |
The number of KCL workers running in KCL 2.x compatible mode during the migration process. A non-zero value for this metric indicates that the migration is still in progress.
|
Fault |
The number of exceptions encountered during the migration
process. Most of these exceptions are transient errors, and KCL
3.x will automatically retry to complete the migration. If you
observe a persistent
|
GsiStatusReady |
The status of the global secondary index (GSI) creation on the lease table. This metric indicates whether the GSI on the lease table has been created, a prerequisite to run KCL 3.x. The value is 0 or 1, with 1 indicating successful creation. During a rollback state, this metric will not be emitted. After you roll forward again, you can resume monitoring this metric.
|
workerMetricsReady |
Status of worker metrics emission from all workers. The metrics indicates whether all workers are emitting metrics like CPU utilization. The value is 0 or 1, with 1 indicating all workers are successfully emitting metrics and ready for the new lease assignment algorithm. During a rollback state, this metric will not be emitted. After you roll forward again, you can resume monitoring this metric.
|
KCL provides rollback capability to the 2.x compatible mode during
migration. After successful migration to KCL 3.x is successful, we
recommend that you remove the CoordinatorConfig.clientVersionConfig
setting of CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
if rollback is no
longer needed. Removing this configuration stops the emission of migration-related
metrics from the KCL application.
Note
We recommend that you monitor your application's performance and stability for
a period during the migration and after completing the migration. If you observe
any issues, you can rollback workers to use KCL 2.x compatible
functionality using the KCL Migration Tool