本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 .NET 中开发 Kinesis Client Library 使用者
可以使用 Kinesis Client Library (KCL) 构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 有多种语言版本。本主题将讨论 .NET。
KCL 是一个 Java 库;使用名为MultiLang守护程序. 此守护程序是基于 Java 的,当您使用除 Java 之外的 KCL 语言时,会在后台运行。因此,如果您安装了适用于 .NET 的 KCL 并完全在 .NET 中编写使用者应用程序,则由于MultiLang守护程序。此外,MultiLangDaemon 有一些默认设置,您可能需要根据自己的使用案例自定义这些设置,例如,Amazon它连接到的区域。有关的更多信息MultiLang开守护程序GitHub,转到KCLMultiLang守护程序
从中下载 .NET KCLGitHub,转到Kinesis 客户端库 (.NET)
在 .NET 中实现 KCL 使用者应用程序时,您必须完成下列任务:
实施 IRecordProcessor类方法
使用者必须实现适用于 IRecordProcessor
的以下方法。示例使用者提供了可用作起点的实现(请参阅 SampleRecordProcessor
中的 SampleConsumer/AmazonKinesisSampleConsumer.cs
类)。
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
Initialize
在实例化记录处理器时调用此方法,并将特定分片 ID 传入input
参数 (input.ShardId
)。此记录处理器只处理此分片,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的使用者应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 有至少一次语义,即分片中的每个数据记录至少会由使用者中的工作程序处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息,请参阅重新分片、扩展和并行处理。
public void Initialize(InitializationInput input)
ProcessRecords
KCL 调用此方法,并将数据记录的列表传入input
参数 (input.Records
) 来自指定的分片Initialize
方法。您实现的记录处理器根据您的使用者的语义处理这些记录中的数据。例如,工作程序可能执行数据转换,然后将结果存储在 Amazon Simple Storage Service (Amazon S3) 存储桶中。
public void ProcessRecords(ProcessRecordsInput input)
除了数据本身之外,记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。Record
类公开了以下代理来访问记录的数据、序号和分区键:
byte[] Record.Data
string Record.SequenceNumber
string Record.PartitionKey
在该示例中,方法 ProcessRecordsWithRetries
具有显示工作程序如何访问记录的数据、序号和分区键的代码。
Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过传递一个来为您执行此跟踪。Checkpointer
反对ProcessRecords
(input.Checkpointer
)。记录处理器调用Checkpointer.Checkpoint
方法以向告知 KCL 处理分片中的记录的进度。如果工作线程失败,KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。
对于拆分或合并操作,在原始分片的处理器调用之前,KCL 不会开始处理新分片。Checkpointer.Checkpoint
表示原始分片的所有处理都已完成。
如果你没有传递参数,KCL 假定调用Checkpointer.Checkpoint
表示所有记录都已处理,一直处理到传递到记录处理器的最后一个记录。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 Checkpointer.Checkpoint
。记录处理器不需要在每次调用 Checkpointer.Checkpoint
时调用 ProcessRecords
。例如,处理器在每第三次或第四次调用时调用 Checkpointer.Checkpoint
。您可以选择性地将某个记录的确切序号指定为 Checkpointer.Checkpoint
的参数。在本例中,KCL 将假定所有记录都已处理,一直到达该记录。
在该示例中,私有方法 Checkpoint(Checkpointer checkpointer)
展示了如何使用适当的异常处理和重试逻辑调用 Checkpointer.Checkpoint
方法。
KCL 适用于 .NET 的处理异常的方式不同于其他 KCL 语言库,后者不处理因处理数据记录而引起的任何异常。用户代码中未捕获的任何异常都将使程序崩溃。
关闭
KCL 称Shutdown
方法无论是在处理结束时(关闭原因是TERMINATE
) 或工作程序不再响应(关闭)input.Reason
值是ZOMBIE
)。
public void Shutdown(ShutdownInput input)
处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。
KCL 还通过了Checkpointer
反对shutdown
. 如果关闭原因为 TERMINATE
,则记录处理器应完成处理任何数据记录,然后对此接口调用 checkpoint
方法。
修改配置属性
示例使用者提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅 SampleConsumer/kcl.properties
)。
Application Name
KCL 需要一个应用程序名称,该名称在您的应用程序中以及同一区域的 Amazon DynamoDB 表中是唯一的。KCL 通过以下方法使用应用程序名称配置值:
-
假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
-
KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息(如检查点和工作程序-分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租赁表跟踪 KCL 消费者应用程序处理的碎片。
设置凭证
你必须让你Amazon凭证提供程序链中的凭证提供程序之一提供给凭证。可以使用 AWSCredentialsProvider
属性设置凭证提供程序。sample.properties
该示例的属性文件将配置 KCL 以使用中提供的记录处理器处理名为 “words” 的 Kinesis 数据流。AmazonKinesisSampleConsumer.cs
.