本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Java 中开发 Kinesis Client Library 消费端
注意
Kinesis 客户端库 (KCL) 版本 1.x 和 2.x 已过时。我们建议迁移到 KCL 3.x 版本,该版本提供了改进的性能和新功能。有关最新的 KCL 文档和迁移指南,请参阅使用 Kinesis 客户端库。
可以使用 Kinesis Client Library(KCL)构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Java。要查看 Javadoc 参考资料,请参阅类的 Amazon Javadoc 主题。 AmazonKinesisClient
要从中下载 Java KCL GitHub,请前往 K inesis 客户端库 (
该示例应用程序使用 Apache Commons Loggingconfigure
文件中定义的静态 AmazonKinesisApplicationSample.java
方法更改日志记录配置。有关如何在 Log4j 和 Amazon Java 应用程序中使用 Apache 共享日志记录的更多信息,请参阅《开发人员指南》中的使用 Log4j 进行日志记录。Amazon SDK for Java
在 Java 中实现 KCL 消费端应用程序时,您必须完成下列任务:
实现 IRecord处理器方法
KCL 当前支持 IRecordProcessor
接口的两个版本:原始接口可与第一个版本的 KCL 一起可用;而版本 2 从 KCL 1.5.0 版才开始可用。这两个接口都完全受支持。您的选择取决于您的特定方案要求。要查看所有区别,请参阅您在本地构建的 Javadocs 或源代码。以下各节概述了开始使用的最低实施要求。
IRecord处理器版本
原始接口(版本 1)
原始 IRecordProcessor
接口 (package
com.amazonaws.services.kinesis.clientlibrary.interfaces
) 公开了下列记录处理器方法,您的消费端必须实施这些方法。该示例提供了可用作起点的实现(请参阅 AmazonKinesisApplicationSampleRecordProcessor.java
)。
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
初始化
KCL 在实例化记录处理器时调用 initialize
方法,并将特定分片 ID 作为参数传递。此记录处理器仅处理此分片,并且通常情况下反过来说也成立(此分片仅由此记录处理器处理)。但是,您的消费端应该考虑数据记录可能会经过多次处理的情况。Kinesis Data Streams 具有至少一次语义,即分片中的每个数据记录至少会由消费端中的工作程序处理一次。有关特定分片可能由多个工作程序进行处理的情况的更多信息,请参阅使用重新分片、扩展和并行处理更改分片数量。
public void initialize(String shardId)
processRecords
KCL 调用 processRecords
方法,并传递来自由 initialize(shardId)
方法指定的分片的数据记录的列表。记录处理器根据消费端的语义处理这些记录中的数据。例如,工作程序可能对数据执行转换,然后将结果存储在 Amazon Simple Storage Service(Amazon S3)存储桶中。
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
除了数据本身之外,记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。Record
类公开了以下方法,这些方法提供对记录的数据、序列号和分区键的访问。
record.getData()
record.getSequenceNumber()
record.getPartitionKey()
在该示例中,私有方法 processRecordsWithRetries
具有显示工作程序如何访问记录的数据、序号和分区键的代码。
Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将检查指针(IRecordProcessorCheckpointer
)传递到 processRecords
来为您执行此跟踪。记录处理器将对此接口调用 checkpoint
方法,以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败,KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。
对于拆分或合并操作,在原始分片的处理器调用 checkpoint
以指示原始分片上的所有处理操作都已完成之前,KCL 不会开始处理新分片。
如果您未传递参数,KCL 将假定对 checkpoint
的调用表示所有记录都已处理,一直处理到传递到记录处理器的最后一个记录。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 checkpoint
。记录处理器不需要在每次调用 checkpoint
时调用 processRecords
。例如,处理器可以在每第三次调用 checkpoint
时调用 processRecords
。您可以选择性地将某个记录的确切序号指定为 checkpoint
的参数。在本例中,KCL 将假定所有记录都已处理,直至处理到该记录。
在该示例中,私有方法 checkpoint
展示了如何使用适当的异常处理和重试逻辑调用 IRecordProcessorCheckpointer.checkpoint
。
KCL 依靠 processRecords
来处理由处理数据记录引起的任何异常。如果 processRecords
引发了异常,则 KCL 将跳过在异常发生前已传递的数据记录。也就是说,这些记录不会重新发送到引发异常的记录处理器或消费端中的任何其他记录处理器。
shutdown
KCL 在处理结束(关闭原因为 TERMINATE
)或工作程序不再响应(关闭原因为 ZOMBIE
)时调用 shutdown
方法。
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。
KCL 还会将 IRecordProcessorCheckpointer
接口传递到 shutdown
。如果关闭原因为 TERMINATE
,则记录处理器应完成处理任何数据记录,然后对此接口调用 checkpoint
方法。
更新后的接口(版本 2)
更新后的 IRecordProcessor
接口 (package
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2
) 公开了下列记录处理器方法,您的消费端必须实施这些方法:
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
原始版本的接口中的所有参数可通过容器对象上的 get 方法进行访问。例如,要检索 processRecords()
中的记录的列表,可使用 processRecordsInput.getRecords()
。
自此接口的版本 2(KCL 1.5.0 及更高版本)起,除了原始接口提供的输入之外,以下新输入也可用:
- 起始序列号
-
在传递给
InitializationInput
运算的initialize()
对象中,将提供给记录处理器实例的记录的起始序列号。这是由之前处理同一分片的记录处理器实例进行最近一次检查点操作的序列号。此序列号在您的应用程序需要此信息时提供。 - 待进行检查点操作的序列号
-
在传递给
initialize()
运算的InitializationInput
对象中,在上一个记录处理器实例停止前可能无法提交的待进行检查点操作的序列号(如果有)。
为 IRecord处理器接口实现类工厂
您还需要为实现记录处理器方法的类实现一个工厂。当消费端实例化工作程序时,它将传递对此工厂的引用。
以下示例使用原始记录处理器接口在文件 AmazonKinesisApplicationSampleRecordProcessorFactory.java
中实现工厂类。如果您希望此工厂类创建版本 2 记录处理器,请使用程序包名称 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2
。
public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }
创建工作线程
如 实现 IRecord处理器方法 中所述,有两个版本的 KCL 记录处理器接口可供选择,这将影响您创建工作程序的方式。原始记录处理器接口使用以下代码结构创建工作线程:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);
当使用版本 2 的记录处理器接口时,您可使用 Worker.Builder
创建工作线程而无需担心要使用的构造函数以及参数的顺序。更新后的记录处理器接口使用以下代码结构创建工作线程:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
修改配置属性
该示例提供了配置属性的默认值。工作程序的此配置数据随后将整合到 KinesisClientLibConfiguration
对象中。此对象和对 IRecordProcessor
的类工厂的引用将传入用于实例化工作程序的调用。您可借助 Java 属性文件(请参阅 AmazonKinesisApplicationSample.java
)用您自己的值覆盖任何这些属性。
应用程序名称
KCL 需要一个应用程序名称,该名称在您的应用程序中以及同一区域的 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值:
-
假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
-
KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息(如检查点和工作程序-分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租约表跟踪 KCL 消费端应用程序处理的分片。
设置凭证
您必须将您的 Amazon 证书提供给默认凭证提供者链中的一个凭证提供商。例如,如果您在 EC2 实例上运行使用器,我们建议您使用 IAM 角色启动该实例。 Amazon 反映与此 IAM 角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。对于在 EC2 实例上运行的使用者,这是管理凭证的最安全的方式。
示例应用程序首先尝试从实例元数据中检索 IAM 凭证:
credentialsProvider = new InstanceProfileCredentialsProvider();
如果示例应用程序无法从实例元数据中获取凭证,它会尝试从属性文件中检索凭证:
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
有关实例元数据的更多信息,请参阅 Amazon EC2 用户指南中的实例元数据。
将工作线程 ID 用于多个实例
示例初始化代码通过使用本地计算机的名称并附加一个全局唯一的标识符为工作程序创建 ID (workerId
),如以下代码段所示。此方法支持消费端应用程序的多个实例在单台计算机上运行的方案。
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
迁移到版本 2 的记录处理器接口
如果要迁移使用原始接口的代码,则除了上述步骤之外,还需要执行以下步骤:
-
更改您的记录处理器类以导入版本 2 记录处理器接口:
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
-
更改对输入的引用以在容器对象上使用
get
方法。例如,在shutdown()
运算中,将“checkpointer
”更改为“shutdownInput.getCheckpointer()
”。 -
更改您的记录处理器工厂类以导入版本 2 记录处理器工厂接口:
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
-
更改工作线程的结构以使用
Worker.Builder
。例如:final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();