本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Node.js 中开发 Kinesis Client Library 消费端
注意
Kinesis 客户端库 (KCL) 版本 1.x 和 2.x 已过时。我们建议迁移到 3.x KCL 版,该版本提供了改进的性能和新功能。有关最新的KCL文档和迁移指南,请参阅使用 Kinesis 客户端库。
您可以使用 Kinesis 客户端库 (KCL) 来构建用于处理来自 Kinesis 数据流的数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Node.js。
KCL是一个 Java 库;对 Java 以外其他语言的支持是使用名为的多语言接口提供的MultiLangDaemon。此守护程序基于 Java,当您使用 Java 以外的KCL语言时,它会在后台运行。因此,如果你安装适用KCL于 Node.js 的,然后完全用 Node.js 编写消费者应用程序,那么你仍然需要在系统上安装 Java,因为 MultiLangDaemon. 此外 MultiLangDaemon ,您可能需要根据自己的用例自定义一些默认设置,例如它所连接的 Amazon 区域。有关 MultiLangDaemon on 的更多信息 GitHub,请转到KCL MultiLangDaemon 项目
要KCL从中下载 Node.js GitHub,请前往 Kinesis 客户端库 (Node.js)
示例代码下载
Node.js 中有两个可用的KCL代码示例:
-
在下列节中用于阐释使用 Node.js 构建 KCL 使用者应用程序的基础知识。
-
稍微复杂一些,使用了现实世界的情景,适合在您熟悉基本示例代码之后采用。此处不讨论此示例,但有一个包含更多信息的README文件。
在 Node.js 中实现 KCL 使用者应用程序时,您必须完成下列任务:
实现记录处理器
使用适用于 Node.js 的 KCL 的可能最简单的使用者必须实现 recordProcessor
函数,该函数反过来包含函数 initialize
、processRecords
和 shutdown
。该示例提供了可用作起点的实现(请参阅 sample_kcl_app.js
)。
function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
初始化
KCL 在记录处理器启动时调用 initialize
函数。此记录处理器只处理作为 initializeInput.shardId
传递的分片 ID,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有至少一次语义,即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息,请参阅使用重新分片、扩展和并行处理更改分片数量。
initialize: function(initializeInput, completeCallback)
processRecords
KCL 使用包含一个数据记录的列表(这些记录来自在 initialize
函数中指定的分片)的输入来调用此函数。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如,工作程序可能对数据执行转换,然后将结果存储在 Amazon Simple Storage Service(Amazon S3)存储桶中。
processRecords: function(processRecordsInput, completeCallback)
除了数据本身之外,记录还包含工作程序在处理数据时可使用的序号和分区键。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。record
词典公开了以下键-值对来访问记录的数据、序号和分区键:
record.data
record.sequenceNumber
record.partitionKey
请注意,数据是 Base64 编码的。
在该基本示例中,函数 processRecords
具有显示工作程序如何访问记录的数据、序号和分区键的代码。
Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 利用作为 processRecordsInput.checkpointer
传递的 checkpointer
执行此跟踪。您的记录处理器将调用 checkpointer.checkpoint
函数以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败,KCL 将在您重新启动分片的处理时使用此信息,以便在上一个已知的已处理记录处继续处理。
对于拆分或合并操作,在原始分片的处理器调用 checkpoint
以指示原始分片上的所有处理操作都已完成之前,KCL 不会开始处理新分片。
如果您未将序号传递到 checkpoint
函数,KCL 将假定对 checkpoint
的调用表示所有记录都已处理,一直处理到传递到记录处理器的最后一个记录。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 checkpoint
。记录处理器不需要在每次调用 checkpoint
时调用 processRecords
。例如,处理器可以在每第三次调用时调用 checkpoint
,或调用记录处理器外部的某个事件(如您已实现的自定义确认/验证服务)。
您可以选择性地将某个记录的确切序号指定为 checkpoint
的参数。在本例中,KCL 将假定所有记录都已处理,一直到达该记录。
基本示例应用程序显示了对 checkpointer.checkpoint
函数最简单的调用。您此时可以在该函数中为您的消费端添加您需要的其他检查点逻辑。
shutdown
KCL 在处理结束(shutdownInput.reason
为 TERMINATE
)或工作程序不再响应(shutdownInput.reason
为 ZOMBIE
)时调用 shutdown
函数。
shutdown: function(shutdownInput, completeCallback)
处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。
KCL 还会将 shutdownInput.checkpointer
对象传递到 shutdown
。如果关闭原因为 TERMINATE
,则应确保记录处理器已完成处理任何数据记录,然后对此接口调用 checkpoint
函数。
修改配置属性
该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅基本示例中的 sample.properties
)。
应用程序名称
KCL需要一个在您的应用程序中以及同一区域的 Amazon DynamoDB 表中具有唯一性的应用程序。KCL 通过以下方法使用应用程序名称配置值:
-
假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
-
使用应用程序名称KCL创建一个 DynamoDB 表,并使用该表维护应用程序的状态信息(例如检查点和工作分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租赁表来跟踪使用者应用程序处理的KCL分片。
设置凭证
您必须将您的 Amazon 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 AWSCredentialsProvider
属性设置凭证提供程序。sample.properties
文件必须向默认凭证提供程序链中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行使用器,我们建议您为该实例配置一个IAM角色。 Amazon 反映与此IAM角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。这是管理EC2实例上运行的使用者应用程序凭证的最安全的方式。
以下示例配置KCL为使用中提供的记录处理器处理kclnodejssample
名为 Kinesis 数据流:sample_kcl_app.js
# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default Amazon credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON