在 Java 中使用 KCL 开发消费者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Java 中使用 KCL 开发消费者

先决条件

在开始使用 KCL 3.x 之前,请确保您具备以下条件:

  • Java 开发套件 (JDK) 8 或更高版本

  • Amazon SDK for Java 2.x

  • 用于依赖管理的 Maven 或 Gradle

KCL 从运行工作程序的计算主机上收集 CPU 利用率指标(例如 CPU 利用率),以平衡负载,从而在各工作节点之间实现均衡的资源利用率水平。要使 KCL 能够从工作人员那里收集 CPU 利用率指标,您必须满足以下先决条件:

Amazon Elastic Compute Cloud(亚马逊 EC2)

  • 您的操作系统必须是 Linux 操作系统。

  • 您必须在您的 EC2 实例IMDSv2中启用。

亚马逊上的亚马逊弹性容器服务 (Amazon ECS) Container Service EC2

Amazon ECS 已开启 Amazon Fargate

  • 您必须启用 Fargate 任务元数据端点版本 4。如果您使用的是 Fargate 平台版本 1.4.0 或更高版本,则默认启用此功能。

  • Fargate 平台版本 1.4.0 或更高版本。

亚马逊上的亚马逊 Elastic Kubernetes Service(亚马逊 EKS) EC2

  • 您的操作系统必须是 Linux 操作系统。

亚马逊 EKS 开启 Amazon Fargate

  • Fargate 平台 1.3.0 或更高版本。

重要

如果 KCL 无法从工作人员那里收集 CPU 利用率指标,KCL 将回退到使用每个工作人员的吞吐量来分配租约,并在队列中的工作人员之间平衡负载。有关更多信息,请参阅 KCL 如何将租约分配给员工并平衡负荷

安装并添加依赖关系

如果您使用的是 Maven,请将以下依赖项添加到您的pom.xml文件中。确保将 3.x.x 替换为最新的 KCL 版本。

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

如果您使用的是 Gradle,请将以下内容添加到您的build.gradle文件中。确保将 3.x.x 替换为最新的 KCL 版本。

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

你可以在 Maven 中央存储库中查看最新版本的 KCL。

实现消费端

KCL 消费者应用程序由以下关键组件组成:

RecordProcessor

RecordProcessor 是处理 Kinesis 数据流记录的业务逻辑所在的核心组件。它定义了您的应用程序如何处理从 Kinesis 流接收的数据。

主要职责:

  • 初始化分片的处理

  • 处理来自 Kinesis 数据流的批量记录

  • 关闭分片的处理(例如,当分片拆分或合并时,或者租约移交给另一台主机时)

  • 处理检查点操作以跟踪进度

下面显示了一个实现示例:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

以下是示例中使用的每种方法的详细说明:

初始化(初InitializationInput始化输入)

  • 目的:为处理记录设置任何必要的资源或状态。

  • 何时调用:Once,当 KCL 为该记录处理器分配分片时。

  • 关键点:

    • initializationInput.shardId(): 此处理器将处理的分片的 ID。

    • initializationInput.extendedSequenceNumber():开始处理的序列号。

流程记录 () ProcessRecordsInput processRecordsInput

  • 目的:处理传入的记录和可选的检查点进度。

  • 何时调用:重复调用,只要记录处理器持有分片的租约。

  • 关键点:

    • processRecordsInput.records():要处理的记录清单。

    • processRecordsInput.checkpointer():用于检查进度。

    • 确保在处理过程中处理了所有异常,以防止 KCL 失败。

    • 此方法应该是等效的,因为在某些情况下,同一条记录可能会被多次处理,例如在工作程序意外崩溃或重启之前尚未进行检查的数据。

    • 务必在检查点检查之前刷新所有缓冲的数据,以确保数据一致性。

LeaseLost () LeaseLostInput leaseLostInput

  • 目的:清理专门用于处理此分片的所有资源。

  • 何时被调用:当另一个调度器接管此分片的租约时。

  • 关键点:

    • 此方法不允许使用检查点操作。

ShardEnded () ShardEndedInput shardEndedInput

  • 目的:完成此分片和检查点的处理。

  • 何时调用:当分片拆分或合并时,表示该分片的所有数据都已处理完毕。

  • 关键点:

    • shardEndedInput.checkpointer():用于执行最后的检查点操作。

    • 要完成处理,必须使用此方法进行检查点检查。

    • 未能在此处刷新数据和检查点可能会导致分片重新打开时数据丢失或重复处理。

已请求关机 () ShutdownRequestedInput shutdownRequestedInput

  • 目的:在 KCL 关闭时检查和清理资源。

  • 何时被调用:当 KCL 关闭时,例如,当应用程序终止时)。

  • 关键点:

    • shutdownRequestedInput.checkpointer():用于在关机前执行检查点操作。

    • 确保在方法中实现了检查点操作,以便在应用程序停止之前保存进度。

    • 未能在此处刷新数据和检查点可能会导致数据丢失或在应用程序重新启动时重新处理记录。

重要

KCL 3.x 在关闭前一个工作人员之前通过检查点将租约从一个工作人员移交给另一个工作人员时,可确保减少数据重新处理的次数。如果你没有在shutdownRequested()方法中实现检查点逻辑,你就看不到这个好处。确保您已在shutdownRequested()方法中实现了检查点逻辑。

RecordProcessorFactory

RecordProcessorFactory 负责创建新 RecordProcessor实例。KCL 使用此工厂 RecordProcessor 为应用程序需要处理的每个分片创建一个新分片。

主要职责:

  • 按需创建新 RecordProcessor 实例

  • 确保每个都已 RecordProcessor 正确初始化

以下是一个实现示例:

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

在此示例中, SampleRecordProcessor 每次调用 shardRecordProcessor () 时,工厂都会创建一个新的。您可以将其扩展为包括任何必要的初始化逻辑。

调度器

调度器是一个高级组件,用于协调 KCL 应用程序的所有活动。它负责数据处理的总体编排。

主要职责:

  • 管理生命周期 RecordProcessors

  • 处理分片的租赁管理

  • 坐标检查点

  • 在应用程序的多个工作程序之间平衡分片处理负载

  • 处理优雅的关机信号和应用程序终止信号

调度程序通常是在主应用程序中创建和启动的。你可以在下一节 “主使用者应用程序” 中查看 Scheduler 的实现示例。

主要消费类应用

主要消费者应用程序将所有组件连接在一起。它负责设置 KCL 使用者、创建必要的客户端、配置调度程序和管理应用程序的生命周期。

主要职责:

  • 设置 Amazon 服务客户端(Kinesis、DynamoDB 等) CloudWatch

  • 配置 KCL 应用程序

  • 创建并启动调度器

  • 处理应用程序关闭

以下是一个实现示例:

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

默认情况下,KCL 会创建具有专用吞吐量的增强型扇出 (EFO) 使用器。有关增强型扇出功能的更多信息,请参阅。开发具有专用吞吐量的增强型扇出消费者如果您的使用者少于 2 个,或者不需要低于 200 毫秒的读取传播延迟,则必须在调度器对象中将以下配置设置为使用共享吞吐量使用者:

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

以下代码是创建使用共享吞吐量使用者的调度程序对象的示例:

进口

import software.amazon.kinesis.retrieval.polling.PollingConfig;

代码

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/