在 Node.js 中开发 Kinesis Client Library 使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Node.js 中开发 Kinesis Client Library 使用者

可以使用 Kinesis Client Library (KCL) 构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 有多种语言版本。本主题将讨论 Node.js。

KCL 是一个 Java 库;使用名为MultiLang守护程序. 此守护程序是基于 Java 的,当您使用除 Java 之外的 KCL 语言时,会在后台运行。因此,如果您安装了适用于 Node.js 的 KCL 并完全在 Node.js 中编写使用者应用程序,则由于MultiLang守护程序。此外,MultiLangDaemon 有一些默认设置,您可能需要根据自己的使用案例自定义这些设置,例如,Amazon它连接到的区域。有关的更多信息MultiLang开守护程序GitHub,转到KCLMultiLang守护程序页.

要从下载 Node.js KCLGitHub,转到Kinesis Client Library (Node.js).

示例代码下载

Node.js 中有两个代码示例可用于 KCL:

  • basic-sample

    在下列节中用于阐释使用 Node.js 构建 KCL 使用者应用程序的基础知识。

  • click-stream-sample

    稍微复杂一些,使用了现实世界的情景,适合在您熟悉基本示例代码之后采用。此示例在这里不做讨论,但它有一个包含更多信息的自述文件。

在 Node.js 中实现 KCL 使用者应用程序时,您必须完成下列任务:

实现记录处理器

使用适用于 Node.js 的 KCL 的可能最简单的使用者必须实现recordProcessor函数,该函数反过来包含函数initializeprocessRecords, 和shutdown. 该示例提供了可用作起点的实现(请参阅 sample_kcl_app.js)。

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}

initialize

KCL 称initialize在记录处理器启动时函数。此记录处理器只处理作为 initializeInput.shardId 传递的分片 ID,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的使用者应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 有至少一次语义,即分片中的每个数据记录至少会由使用者中的工作程序处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息,请参阅重新分片、扩展和并行处理

initialize: function(initializeInput, completeCallback)

processRecords

KCL 将使用包含一个数据记录的列表(该记录从指定到initializefunction. 您实现的记录处理器根据您的使用者的语义处理这些记录中的数据。例如,工作程序可能执行数据转换,然后将结果存储在 Amazon Simple Storage Service (Amazon S3) 存储桶中。

processRecords: function(processRecordsInput, completeCallback)

除了数据本身之外,记录还包含工作程序在处理数据时可使用的序号和分区键。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。record 词典公开了以下键-值对来访问记录的数据、序号和分区键:

record.data record.sequenceNumber record.partitionKey

请注意,数据是 Base64 编码的。

在该基本示例中,函数 processRecords 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 用checkpointer对象传递为processRecordsInput.checkpointer. 你的记录处理器调用checkpointer.checkpoint函数以向告知 KCL 处理分片中的记录的进度。如果工作程序失败,将在您重新启动分片的处理时使用此信息,以便在上一个已知的已处理记录处继续处理。

对于拆分或合并操作,在原始分片的处理器调用之前,KCL 不会开始处理新分片。checkpoint表示原始分片的所有处理都已完成。

如果你没有将序列号传递给checkpoint函数,KCL 假定调用checkpoint表示所有记录都已处理,一直处理到传递到记录处理器的最后一个记录。因此,记录处理器应该调用checkpoint 仅限在处理传递到它的列表中的所有记录后。记录处理器不需要在每次调用 checkpoint 时调用 processRecords。例如,处理器可以在每第三次调用时调用 checkpoint,或调用记录处理器外部的某个事件(如您已实现的自定义确认/验证服务)。

您可以选择性地将某个记录的确切序号指定为 checkpoint 的参数。在本例中,KCL 将假定所有记录都已处理,一直到达该记录。

基本示例应用程序显示了对 checkpointer.checkpoint 函数最简单的调用。您此时可以在该函数中为您的使用者添加您需要的其他检查点逻辑。

shutdown

KCL 称shutdown处理结束时可以是函数 (shutdownInput.reasonTERMINATE)或者工作人员不再响应(shutdownInput.reasonZOMBIE)。

shutdown: function(shutdownInput, completeCallback)

处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。

KCL 还通过了shutdownInput.checkpointer反对shutdown. 如果关闭原因为 TERMINATE,则应确保记录处理器已完成处理任何数据记录,然后对此接口调用 checkpoint 函数。

修改配置属性

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅基本示例中的 sample.properties)。

Application Name

KCL 需要一个应用程序名称,该名称在您的应用程序中以及同一区域的 Amazon DynamoDB 表中是唯一的。KCL 通过以下方法使用应用程序名称配置值:

  • 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。

  • KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息(如检查点和工作程序-分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租赁表跟踪 KCL 消费者应用程序处理的碎片

设置凭证

你必须让你Amazon凭证提供程序链中的凭证提供程序之一提供给凭证。可以使用 AWSCredentialsProvider 属性设置凭证提供程序。sample.properties 文件必须向默认凭证提供程序链中的凭证提供程序之一提供您的凭证。如果您正在 Amazon EC2 实例上运行您的使用者,我们建议您使用 IAM 角色配置实例。Amazon反映与此 IAM 角色关联的权限的凭证通过实例的元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的使用者应用程序的凭证最安全。

下面的示例将 KCL 配置为处理名为的 Kinesis 数据流:kclnodejssample使用中提供的记录处理器sample_kcl_app.js

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default Amazon credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON