在 .NET 中开发 Kinesis Client Library 使用者 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 .NET 中开发 Kinesis Client Library 使用者

可以使用 Kinesis Client Library (KCL) 构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 有多种语言版本。本主题将讨论 .NET。

KCL 是一个 Java 库;使用名为MultiLang守护程序. 此守护程序是基于 Java 的,当您使用除 Java 之外的 KCL 语言时,会在后台运行。因此,如果您安装了适用于 .NET 的 KCL 并完全在 .NET 中编写使用者应用程序,则由于MultiLang守护程序。此外,MultiLangDaemon 有一些默认设置,您可能需要根据自己的使用案例自定义这些设置,例如,Amazon它连接到的区域。有关的更多信息MultiLang开守护程序GitHub,转到KCLMultiLang守护程序页.

从中下载 .NET KCLGitHub,转到Kinesis 客户端库 (.NET). 要下载适用于 .NET KCL 使用者应用程序的示例代码,请转至.NET 示例消费者项目的 KCL上的页面GitHub.

在 .NET 中实现 KCL 使用者应用程序时,您必须完成下列任务:

实施 IRecordProcessor类方法

使用者必须实现适用于 IRecordProcessor 的以下方法。示例使用者提供了可用作起点的实现(请参阅 SampleRecordProcessor 中的 SampleConsumer/AmazonKinesisSampleConsumer.cs 类)。

public void Initialize(InitializationInput input) public void ProcessRecords(ProcessRecordsInput input) public void Shutdown(ShutdownInput input)

Initialize

在实例化记录处理器时调用此方法,并将特定分片 ID 传入input参数 (input.ShardId)。此记录处理器只处理此分片,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的使用者应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 有至少一次语义,即分片中的每个数据记录至少会由使用者中的工作程序处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息,请参阅重新分片、扩展和并行处理

public void Initialize(InitializationInput input)

ProcessRecords

KCL 调用此方法,并将数据记录的列表传入input参数 (input.Records) 来自指定的分片Initialize方法。您实现的记录处理器根据您的使用者的语义处理这些记录中的数据。例如,工作程序可能执行数据转换,然后将结果存储在 Amazon Simple Storage Service (Amazon S3) 存储桶中。

public void ProcessRecords(ProcessRecordsInput input)

除了数据本身之外,记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。Record 类公开了以下代理来访问记录的数据、序号和分区键:

byte[] Record.Data string Record.SequenceNumber string Record.PartitionKey

在该示例中,方法 ProcessRecordsWithRetries 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过传递一个来为您执行此跟踪。Checkpointer反对ProcessRecords(input.Checkpointer)。记录处理器调用Checkpointer.Checkpoint方法以向告知 KCL 处理分片中的记录的进度。如果工作线程失败,KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。

对于拆分或合并操作,在原始分片的处理器调用之前,KCL 不会开始处理新分片。Checkpointer.Checkpoint表示原始分片的所有处理都已完成。

如果你没有传递参数,KCL 假定调用Checkpointer.Checkpoint表示所有记录都已处理,一直处理到传递到记录处理器的最后一个记录。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 Checkpointer.Checkpoint。记录处理器不需要在每次调用 Checkpointer.Checkpoint 时调用 ProcessRecords。例如,处理器在每第三次或第四次调用时调用 Checkpointer.Checkpoint。您可以选择性地将某个记录的确切序号指定为 Checkpointer.Checkpoint 的参数。在本例中,KCL 将假定所有记录都已处理,一直到达该记录。

在该示例中,私有方法 Checkpoint(Checkpointer checkpointer) 展示了如何使用适当的异常处理和重试逻辑调用 Checkpointer.Checkpoint 方法。

KCL 适用于 .NET 的处理异常的方式不同于其他 KCL 语言库,后者不处理因处理数据记录而引起的任何异常。用户代码中未捕获的任何异常都将使程序崩溃。

关闭

KCL 称Shutdown方法无论是在处理结束时(关闭原因是TERMINATE) 或工作程序不再响应(关闭)input.Reason值是ZOMBIE)。

public void Shutdown(ShutdownInput input)

处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。

KCL 还通过了Checkpointer反对shutdown. 如果关闭原因为 TERMINATE,则记录处理器应完成处理任何数据记录,然后对此接口调用 checkpoint 方法。

修改配置属性

示例使用者提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅 SampleConsumer/kcl.properties)。

Application Name

KCL 需要一个应用程序名称,该名称在您的应用程序中以及同一区域的 Amazon DynamoDB 表中是唯一的。KCL 通过以下方法使用应用程序名称配置值:

  • 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。

  • KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息(如检查点和工作程序-分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 使用租赁表跟踪 KCL 消费者应用程序处理的碎片

设置凭证

你必须让你Amazon凭证提供程序链中的凭证提供程序之一提供给凭证。可以使用 AWSCredentialsProvider 属性设置凭证提供程序。sample.properties 必须向默认凭证提供程序链中的凭证提供程序之一提供您的凭证。如果您正在 EC2 实例上运行您的使用者应用程序,我们建议您使用 IAM 角色配置实例。Amazon反映与此 IAM 角色关联的权限的凭证通过实例的元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的使用者的凭证最安全。

该示例的属性文件将配置 KCL 以使用中提供的记录处理器处理名为 “words” 的 Kinesis 数据流。AmazonKinesisSampleConsumer.cs.