使用 KCL 进行多流处理 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 KCL 进行多流处理

本节介绍了 KCL 中需要进行的更改,这些更改允许您创建可以同时处理多个数据流的 KCL 使用者应用程序。

重要
  • 只有 KCL 2.3 或更高版本支持多流处理。

  • 使用运行的非 Java 语言编写的 KCL 使用者支持多流处理。multilangdaemon

  • 任何版本的 KCL 1.x 都支持多流处理。

  • MultistreamTracker 接口

    • 要构建可以同时处理多个流的使用者应用程序,必须实现一个名为的新接口MultistreamTracker。此接口包括返回要由 KCL 消费端应用程序处理的数据流及其配置列表的 streamConfigList 方法。请注意,正在处理的数据流可以在使用者应用程序运行时进行更改。 streamConfigList由 KCL 定期调用,以了解要处理的数据流的变化。

    • streamConfigList填充StreamConfig列表。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

或者,MultiStreamTracker如果要实现同时处理多个流的 KCL 使用者应用程序,也可以使用进行初始化 ConfigsBuilder 。

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 通过为您的 KCL 使用者应用程序实现多流支持,应用程序租赁表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。

  • 当您的 KCL 使用者应用程序实现多流支持时,LeaseKey 采用以下结构:。account-id:StreamName:streamCreationTimestamp:ShardId例如,111111111:multiStreamTest-1:12345:shardId-000000000336

重要

当您的现有 KCL 使用者应用程序配置为仅处理一个数据流时,leaseKey(这是租赁表的分区键)就是分片 ID。如果您将现有 KCL 使用者应用程序重新配置为处理多个数据流,则会破坏您的租赁表,因为leaseKey结构必须如下所示:account-id:StreamName:StreamCreationTimestamp:ShardId以支持多流。