Amazon Kinesis Data Streams
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

从 Kinesis Client Library 1.x 迁移到 2.x

本主题介绍 Kinesis Client Library (KCL) 的版本 1.x 和 2.x 之间的区别。它还向您展示如何将使用者从 KCL 的版本 1.x 迁移到版本 2.x。在迁移您的客户端后,它将从最后一个检查点位置开始处理记录。

KCL 的版本 2 引入了以下接口更改:

KCL 接口更改

KCL 1.x 接口 KCL 2.0 接口
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 折叠为 software.amazon.kinesis.processor.ShardRecordProcessor

迁移记录处理器

以下示例显示了为 KCL 1.x 实施的记录处理器:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }

迁移记录处理器类

  1. 将接口从 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 更改为 software.amazon.kinesis.processor.ShardRecordProcessor,如下所示:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. 更新 importinitialize 方法的 processRecords 语句。

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. 使用以下新方法替换 shutdown 方法:leaseLostshardEndedshutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

下面是记录处理器类的更新版本。

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

迁移记录处理器工厂

记录处理器工厂负责在获得租赁时创建记录处理器。下面是 KCL 1.x 工厂的示例。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }

迁移记录处理器工厂

  1. 将已实施的接口从 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory 更改为 software.amazon.kinesis.processor.RecordProcessorFactory,如下所示。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. 更改 createProcessor 的返回签名。

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

下面是 2.0 中的记录处理器工厂的示例:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

迁移工作程序

在 KCL 版本 2.0 中,名为 Scheduler 的新类取代了 Worker 类。下面是 KCL 1.x 工作程序的示例。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

迁移工作程序

  1. Worker 类的 import 语句更改为 SchedulerConfigsBuilder 类的导入语句。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 创建 ConfigsBuilderScheduler,如以下示例所示。

    建议您使用 KinesisClientUtil 创建 KinesisAsyncClient,并在 KinesisAsyncClient 中配置 maxConcurrency

    重要

    Amazon Kinesis 户端可能会看到延迟大幅增加,除非您将 KinesisAsyncClient 配置为具有足够高的 maxConcurrency,以允许所有租期以及额外使用 KinesisAsyncClient

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

配置 Amazon Kinesis 客户端

使用 2.0 版本的 Kinesis Client Library,客户端的配置已从单个配置类 (KinesisClientLibConfiguration) 移至 6 个配置类。下表描述了迁移。

配置字段及其新类

原始字段 新配置类 描述
applicationName ConfigsBuilder 此 KCL 应用程序的名称。用作 tableNameconsumerName 的默认名称。
tableName ConfigsBuilder 允许覆盖用于 the Amazon DynamoDB 租赁表的表名称。
streamName ConfigsBuilder 此应用程序从其中处理记录的流的名称。
kinesisEndpoint ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
dynamoDBEndpoint ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
initialPositionInStreamExtended RetrievalConfig 分片中 KCL 开始获取记录的位置,从应用程序的初始运行开始。
kinesisCredentialsProvider ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
dynamoDBCredentialsProvider ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
cloudWatchCredentialsProvider ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
failoverTimeMillis LeaseManagementConfig 在您可以将租赁所有者视为已失败之前必须经过的毫秒数。
workerIdentifier ConfigsBuilder 表示应用程序处理器的这种实例化的唯一标识符。此值必须唯一。
shardSyncIntervalMillis LeaseManagementConfig 分片同步调用之间的时间。
maxRecords PollingConfig 允许设置 Kinesis 返回的最大记录数。
idleTimeBetweenReadsInMillis CoordinatorConfig 此选项已删除。请参阅“闲置时间删除”。
callProcessRecordsEvenForEmptyRecordList ProcessorConfig 如果设置,即使在 Kinesis 中未提供任何记录时也将调用记录处理器。
parentShardPollIntervalMillis CoordinatorConfig 记录处理器应轮询多少时间才能查看是否已完成父分片。
cleanupLeasesUponShardCompletion LeaseManagementConfig 如果设置,只要子租赁已开始处理,即可删除租赁。
ignoreUnexpectedChildShards LeaseManagementConfig 如果设置,将忽略具有打开的分片的子分片。这主要用于 DynamoDB 流。
kinesisClientConfig ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
dynamoDBClientConfig ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
cloudWatchClientConfig ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
taskBackoffTimeMillis LifecycleConfig 等待重试失败任务的时间。
metricsBufferTimeMillis MetricsConfig 控制 CloudWatch 指标发布。
metricsMaxQueueSize MetricsConfig 控制 CloudWatch 指标发布。
metricsLevel MetricsConfig 控制 CloudWatch 指标发布。
metricsEnabledDimensions MetricsConfig 控制 CloudWatch 指标发布。
validateSequenceNumberBeforeCheckpointing CheckpointConfig 此选项已删除。请参阅“检查点序列号验证”。
regionName ConfigsBuilder 此选项已删除。请参阅“客户端配置删除”。
maxLeasesForWorker LeaseManagementConfig 应用程序的单个实例应接受的最大租赁数量。
maxLeasesToStealAtOneTime LeaseManagementConfig 应用程序一次应尝试窃取的最大租赁数量。
initialLeaseTableReadCapacity LeaseManagementConfig 如果 Kinesis Client Library 需要创建新的 DynamoDB 租赁表,DynamoDB 将读取使用的 IOP。
initialLeaseTableWriteCapacity LeaseManagementConfig 如果 Kinesis Client Library 需要创建新的 DynamoDB 租赁表,DynamoDB 将读取使用的 IOP。
initialPositionInStreamExtended ConfigsBuilder 应用程序应在流中开始的初始位置。此值仅在创建初始租赁时使用。
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig 如果租赁表包含现有租赁,请禁用同步的分片数据。TODO:KinesisEco-438
shardPrioritization CoordinatorConfig 要使用的分片优先级。
shutdownGraceMillis 不适用 此选项已删除。请参阅“MultiLang 删除”。
timeoutInSeconds 不适用 此选项已删除。请参阅“MultiLang 删除”。
retryGetRecordsInSeconds PollingConfig 配置 GetRecords 失败尝试之间的延迟。
maxGetRecordsThreadPool PollingConfig 用于 GetRecords 的线程池大小。
maxLeaseRenewalThreads LeaseManagementConfig 控制租赁续订线程池的大小。您的应用程序可以容纳的租赁越多,此池应该就越大。
recordsFetcherFactory PollingConfig 允许替换用于创建从流中检索的提取程序的工厂。
logWarningForTaskAfterMillis LifecycleConfig 任务尚未完成的情况下在记录警告之前要等待的时长。
listShardsBackoffTimeInMillis RetrievalConfig 发生故障时在调用 ListShards 之间要等待的时间(以毫秒为单位)。
maxListShardsRetryAttempts RetrievalConfig ListShards 在放弃之前重试的最长时间。

闲置时间删除

在 1.x 版本的 KCL 中,idleTimeBetweenReadsInMillis 对应于两个数量:

  • 任务分派检查之间的时间量。您现在可以通过设置 CoordinatorConfig#shardConsumerDispatchPollIntervalMillis 来在任务之间配置此时间。

  • 从 Kinesis Data Streams 中未返回任何记录时的睡眠时间量。在版本 2.0 中,带增强型扇出功能的记录是从其各自的检索器中推送的。分片使用器上的活动仅发生在推送的请求到达时。

客户端配置删除

在 2.0 版本中,KCL 不再创建客户端。这取决于用户提供有效的客户端。进行此更改后,已删除控制客户端创建的所有配置参数。如果需要这些参数,您可以在向 ConfigsBuilder 提供客户端之前在客户端上设置它们。

已删除字段 等效配置
kinesisEndpoint 使用以下首选终端节点配置开发工具包 KinesisAsyncClientKinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()
dynamoDBEndpoint 使用以下首选终端节点配置开发工具包 DynamoDbAsyncClientDynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()
kinesisClientConfig 使用以下所需配置来配置开发工具包 KinesisAsyncClientKinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()
dynamoDBClientConfig 使用以下所需配置来配置开发工具包 DynamoDbAsyncClientDynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()
cloudWatchClientConfig 使用以下所需配置来配置开发工具包 CloudWatchAsyncClientCloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()
regionName 使用首选区域配置开发工具包。这对所有开发工具包客户端均相同。例如:KinesisAsyncClient.builder().region(Region.US_WEST_2).build()