在 Node.js 中开发 Kinesis Client Library 消费端 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Node.js 中开发 Kinesis Client Library 消费端

可以使用 Kinesis Client Library(KCL)构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Node.js。

KCL 是一个 Java 库;对 Java 以外其他语言的支持是使用名为的多语言接口提供的。MultiLangDaemon此进程守护程序基于 Java,当您使用 Java 以外的 KCL 语言时,该程序会在后台运行。因此,如果你安装适用于 Node.js 的 KCL 并完全用 Node.js 编写消费者应用程序,那么你仍然需要在系统上安装 Java,因为. MultiLangDaemon 此外 MultiLangDaemon ,您可能需要根据自己的用例自定义一些默认设置,例如它所连接的Amazon区域。有关 MultiLangDaemon on 的更多信息 GitHub,请访问 KCL MultiLangDaemon 项目页面。

要从中下载 Node.js KCL GitHub,请前往 K inesis 客户端库 (Node.js)

示例代码下载

Node.js 中有两个代码示例可用于 KCL:

  • basic-sample

    在下列节中用于阐释在 Node.js 中构建 KCL 消费端应用程序的基础知识。

  • click-stream-sample

    稍微复杂一些,使用了现实世界的情景,适合在您熟悉基本示例代码之后采用。此示例在这里不做讨论,但它有一个包含更多信息的自述文件。

在 Node.js 中实现 KCL 消费端应用程序时,您必须完成下列任务:

实现记录处理器

使用适用于 Node.js 的 KCL 的最简易的潜在消费端必须实现 recordProcessor 函数,该函数反之包含函数 initializeprocessRecordsshutdown。该示例提供了可用作起点的实现(请参阅 sample_kcl_app.js)。

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
初始化

KCL 在记录处理器启动时调用 initialize 函数。此记录处理器只处理作为 initializeInput.shardId 传递的分片 ID,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有至少一次语义,即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息,请参阅重新分片、扩展和并行处理

initialize: function(initializeInput, completeCallback)
processRecords

KCL 使用包含一个数据记录的列表(这些记录来自在 initialize 函数中指定的分片)的输入来调用此函数。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如,工作程序可能对数据执行转换,然后将结果存储在 Amazon Simple Storage Service(Amazon S3)存储桶中。

processRecords: function(processRecordsInput, completeCallback)

除了数据本身之外,记录还包含工作程序在处理数据时可使用的序号和分区键。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。record 词典公开了以下键-值对来访问记录的数据、序号和分区键:

record.data record.sequenceNumber record.partitionKey

请注意,数据是 Base64 编码的。

在该基本示例中,函数 processRecords 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 利用作为 processRecordsInput.checkpointer 传递的 checkpointer 对象执行此跟踪。您的记录处理器将调用 checkpointer.checkpoint 函数以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败,KCL 将在您重新启动分片的处理时使用此信息,以便在上一个已知的已处理记录处继续处理。

对于拆分或合并操作,在原始分片的处理器调用 checkpoint 以指示原始分片上的所有处理操作都已完成之前,KCL 不会开始处理新分片。

如果您未将序列号传递到 checkpoint 函数,KCL 将假定对 checkpoint 的调用表示所有记录都已处理,一直处理到传递到记录处理器的最后一个记录。因此,记录处理器应在已处理传递到它的列表中的所有记录后才调用 checkpoint。记录处理器不需要在每次调用 checkpoint 时调用 processRecords。例如,处理器可以在每第三次调用时调用 checkpoint,或调用记录处理器外部的某个事件(如您已实现的自定义确认/验证服务)。

您可以选择性地将某个记录的确切序号指定为 checkpoint 的参数。在本例中,KCL 将假定所有记录都已处理,直至处理到该记录。

基本示例应用程序显示了对 checkpointer.checkpoint 函数最简单的调用。您此时可以在该函数中为您的消费端添加您需要的其他检查点逻辑。

shutdown

KCL 在处理结束(shutdownInput.reasonTERMINATE)或工作程序不再响应(shutdownInput.reasonZOMBIE)时调用 shutdown 函数。

shutdown: function(shutdownInput, completeCallback)

处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。

KCL 还会将 shutdownInput.checkpointer 对象传递到 shutdown。如果关闭原因为 TERMINATE,则应确保记录处理器已完成处理任何数据记录,然后对此接口调用 checkpoint 函数。

修改配置属性

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅基本示例中的 sample.properties)。

应用程序名称

KCL 需要一个应用程序,该应用程序在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值:

  • 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。

  • KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息(如检查点和工作程序-分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租约表跟踪 KCL 消费端应用程序处理的分片

设置 凭证

您必须向默认凭证提供程序链中的凭证提供程序之一提供您的 Amazon 凭证。可以使用 AWSCredentialsProvider 属性设置凭证提供程序。sample.properties 文件必须向默认凭证提供程序链中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行消费端,则建议您使用 IAM 角色进行配置。反映与此 IAM 角色关联的权限 Amazon 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

以下示例配置 KCL 以使用 sample_kcl_app.js 中提供的记录处理器,处理名为 kclnodejssample 的 Kinesis 数据流。

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default Amazon credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON