将消费者从 KCL 1.x 迁移到 KCL 2.x - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将消费者从 KCL 1.x 迁移到 KCL 2.x

本主题解释了 Kinesis 客户端库 1.x 和 2.x 版本之间的区别 ()。KCL它还向您展示了如何将消费者从 1.x 版迁移到 2.x 版。KCL在迁移您的客户端后,它将从最后一个检查点位置开始处理记录。

2.0 版KCL引入了以下界面更改:

KCL接口变更
KCL1.x 接口 KCL2.0 接口
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 折叠为 software.amazon.kinesis.processor.ShardRecordProcessor

迁移记录处理器

以下示例显示了为 KCL 1.x 实现的记录处理器:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
迁移记录处理器类
  1. 将接口从 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 更改为 software.amazon.kinesis.processor.ShardRecordProcessor,如下所示:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. 更新 importinitialize 方法的 processRecords 语句。

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. 使用以下新方法替换 shutdown 方法:leaseLostshardEndedshutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

下面是记录处理器类的更新版本。

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

迁移记录处理器工厂

记录处理器工厂负责在获得租赁时创建记录处理器。以下是 KCL 1.x 工厂的示例。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
迁移记录处理器工厂
  1. 将已实施的接口从 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory 更改为 software.amazon.kinesis.processor.ShardRecordProcessorFactory,如下所示。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. 更改 createProcessor 的返回签名。

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

下面是 2.0 中的记录处理器工厂的示例:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

迁移工作人员

在 2.0 版中KCL,一个名Scheduler为的新类取代了该Worker类。以下是 KCL 1.x 工作程序的示例。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
迁移工作程序
  1. Worker 类的 import 语句更改为 SchedulerConfigsBuilder 类的导入语句。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 创建 ConfigsBuilderScheduler,如以下示例所示。

    建议您使用 KinesisClientUtil 创建 KinesisAsyncClient,并在 KinesisAsyncClient 中配置 maxConcurrency

    重要

    Amazon Kinesis 户端可能会看到延迟大幅增加,除非您将 KinesisAsyncClient 配置为具有足够高的 maxConcurrency,以允许所有租期以及额外使用 KinesisAsyncClient

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

配置 Amazon Kinesis 客户端

随着 Kinesis Client Library 版本 2.0 的发布,客户端的配置已从单个配置类(KinesisClientLibConfiguration)变为 6 个配置类。下表描述了迁移。

配置字段及其新类
原始字段 新配置类 描述
applicationName ConfigsBuilder 该KCL应用程序的名称。用作 tableNameconsumerName 的默认名称。
tableName ConfigsBuilder 允许覆盖用于 Amazon DynamoDB 租赁表的表名称。
streamName ConfigsBuilder 此应用程序从其中处理记录的流的名称。
kinesisEndpoint ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
dynamoDBEndpoint ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
initialPositionInStreamExtended RetrievalConfig 从应用程序的初始运行KCL开始,从分片中开始获取记录的位置。
kinesisCredentialsProvider ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
dynamoDBCredentialsProvider ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
cloudWatchCredentialsProvider ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
failoverTimeMillis LeaseManagementConfig 在您可以将租赁所有者视为已失败之前必须经过的毫秒数。
workerIdentifier ConfigsBuilder 表示应用程序处理器的这种实例化的唯一标识符。此值必须唯一。
shardSyncIntervalMillis LeaseManagementConfig 分片同步调用之间的时间。
maxRecords PollingConfig 允许设置 Kinesis 返回的最大记录数。
idleTimeBetweenReadsInMillis CoordinatorConfig 此选项已删除。请参阅“闲置时间删除”。
callProcessRecordsEvenForEmptyRecordList ProcessorConfig 如果设置,即使 Kinesis 中未提供任何记录,也会调用记录处理器。
parentShardPollIntervalMillis CoordinatorConfig 记录处理器应轮询多少时间才能查看是否已完成父分片。
cleanupLeasesUponShardCompletion LeaseManagementConfig 如果设置,只要子租赁已开始处理,即可删除租赁。
ignoreUnexpectedChildShards LeaseManagementConfig 如果设置,将忽略具有打开的分片的子分片。这主要适用于 DynamoDB Streams。
kinesisClientConfig ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
dynamoDBClientConfig ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
cloudWatchClientConfig ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
taskBackoffTimeMillis LifecycleConfig 等待重试失败任务的时间。
metricsBufferTimeMillis MetricsConfig 控制 CloudWatch 指标发布。
metricsMaxQueueSize MetricsConfig 控制 CloudWatch 指标发布。
metricsLevel MetricsConfig 控制 CloudWatch 指标发布。
metricsEnabledDimensions MetricsConfig 控制 CloudWatch 指标发布。
validateSequenceNumberBeforeCheckpointing CheckpointConfig 此选项已删除。请参阅“检查点序列号验证”。
regionName ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
maxLeasesForWorker LeaseManagementConfig 应用程序的单个实例应接受的最大租赁数量。
maxLeasesToStealAtOneTime LeaseManagementConfig 应用程序一次应尝试窃取的最大租赁数量。
initialLeaseTableReadCapacity LeaseManagementConfig 在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。
initialLeaseTableWriteCapacity LeaseManagementConfig 在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。
initialPositionInStreamExtended LeaseManagementConfig 应用程序应在流中开始的初始位置。此值仅在创建初始租赁时使用。
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig 如果租赁表包含现有租赁,请禁用同步的分片数据。TODO: KinesisEco -438
shardPrioritization CoordinatorConfig 要使用的分片优先级。
shutdownGraceMillis 不适用 此选项已删除。参见 “ MultiLang 移除”。
timeoutInSeconds 不适用 此选项已删除。参见 “ MultiLang 移除”。
retryGetRecordsInSeconds PollingConfig 配置 GetRecords 尝试失败之间的延迟。
maxGetRecordsThreadPool PollingConfig 使用的线程池大小 GetRecords。
maxLeaseRenewalThreads LeaseManagementConfig 控制租赁续订线程池的大小。您的应用程序可以容纳的租赁越多,此池应该就越大。
recordsFetcherFactory PollingConfig 允许替换用于创建从流中检索的提取程序的工厂。
logWarningForTaskAfterMillis LifecycleConfig 任务尚未完成的情况下在记录警告之前要等待的时长。
listShardsBackoffTimeInMillis RetrievalConfig 发生故障时在调用 ListShards 之间要等待的时间(以毫秒为单位)。
maxListShardsRetryAttempts RetrievalConfig ListShards 在放弃之前重试的最长时间。

空闲时间移除

在 1.x 版本中KCL,对idleTimeBetweenReadsInMillis应于两个数量:

  • 任务分派检查之间的时间量。您现在可以通过设置 CoordinatorConfig#shardConsumerDispatchPollIntervalMillis 来在任务之间配置此时间。

  • 未从 Kinesis Data Streams 中返回任何记录时的睡眠时间量。在版本 2.0 中,带增强型扇出功能的记录是从其各自的检索器中推送的。分片消费端上的活动仅发生在推送的请求到达时。

删除客户机配置

在 2.0 版本中,KCL不再创建客户端。这取决于用户提供有效的客户端。进行此更改后,已删除控制客户端创建的所有配置参数。如果需要这些参数,您可以在向 ConfigsBuilder 提供客户端之前在客户端上设置它们。

已删除字段 等效配置
kinesisEndpoint SDKKinesisAsyncClient使用首选端点配置:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()
dynamoDBEndpoint SDKDynamoDbAsyncClient使用首选端点配置:DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()
kinesisClientConfig SDKKinesisAsyncClient使用所需的配置配置:KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()
dynamoDBClientConfig SDKDynamoDbAsyncClient使用所需的配置配置:DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()
cloudWatchClientConfig SDKCloudWatchAsyncClient使用所需的配置配置:CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()
regionName SDK使用首选区域配置。这对于所有SDK客户来说都是一样的。例如,KinesisAsyncClient.builder().region(Region.US_WEST_2).build()