Amazon Kinesis Data Streams
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

在 Python 中开发 Kinesis Client Library 使用者

您可使用 Kinesis Client Library (KCL) 来构建处理您的 Kinesis stream 中的数据的应用程序。Kinesis Client Library有多种语言版本。本主题将讨论 Python。

KCL 是一个 Java 库,支持 Java 以外的其他语言,提供使用多语言界面,称为MultiLangDaemon。此后台程序是基于 Java 和在后台运行,当您使用 KCL 语言而非 Java。因此,如果您安装了适用于 Python 的 KCL 并完全在 Python 中编写使用者应用程序,则由于 MultiLangDaemon,您仍需要在您的系统中安装 Java。此外,MultiLangDaemon 有一些默认设置,您可能需要根据自己的使用案例进行自定义,例如所连接到的 AWS 区域。有关 MultiLangDaemon 的更多信息,请转至 https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/multilangGitHub 上的 KCL MultiLangDaemon 项目页。

要从 GitHub 下载 Python KCL,请转至 Kinesis Client Library (Python)。要下载适用于 Python KCL 使用者应用程序的示例代码,请转至 GitHub 上的适用于 Python 的 KCL 示例项目页。

在 Python 中实现 KCL 使用者应用程序时,您必须完成下列任务:

实现 RecordProcessor 类方法

RecordProcess 类必须扩展 RecordProcessorBase 以实现以下方法。该示例提供了可用作起点的实现(请参阅 sample_kclpy_app.py)。

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)

initialize

KCL 在实例化记录处理器时调用 initialize 方法,并将特定分片 ID 作为参数传递。此记录处理器只处理此分片,并且通常情况下反过来说也成立(此分片只能由此记录处理器处理)。但是,您的使用者应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有“至少一次”语义,即分片中的每个数据记录在您的使用者中由工作程序至少处理一次。有关特定分片可能由多个工作程序进行处理的情况的更多信息,请参阅重新分片、扩展和并行处理

def initialize(self, shard_id)

process_records

KCL 调用此方法,并传递由 initialize 方法指定的分片中的数据记录的列表。您实现的记录处理器根据您的使用者的语义处理这些记录中的数据。例如,工作线程可能对数据执行转换,然后将结果存储在 S3 存储桶中。

def process_records(self, records, checkpointer)

除了数据本身之外,记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如,工作线程可选择 S3 存储桶,并在其中根据分区键的值存储数据。record 词典公开了以下键-值对来访问记录的数据、序号和分区键:

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

请注意,数据是 Base64 编码的。

在该示例中,方法 process_records 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将 Checkpointer 对象传递到 process_records 来为您执行此跟踪。记录处理器将对此对象调用 checkpoint 方法,以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败,KCL 将使用此信息在上一个已知的已处理记录处重新启动分片的处理。

对于拆分或合并操作,在原始分片的处理器调用 checkpoint 以指示原始分片上的所有处理操作都已完成之前,KCL 不会开始处理新分片。

如果您未传递参数,KCL 将假定对 checkpoint 的调用表示所有记录都已处理,一直处理到传递到记录处理器的最后一个记录。因此,记录处理器只应在已处理传递到它的列表中的所有记录后才调用 checkpoint。记录处理器不需要在每次调用 process_records 时调用 checkpoint。例如,处理器可在每第三次调用时调用 checkpoint。您可以选择性地将某个记录的确切序号指定为 checkpoint 的参数。在本例中,KCL 将假定所有记录都已处理,一直到达该记录。

在该示例中,私有方法 checkpoint 展示了如何使用适当的异常处理和重试逻辑调用 Checkpointer.checkpoint 方法。

KCL 依靠 process_records 来处理由处理数据记录引起的任何异常。如果从 process_records 中引发了异常,则 KCL 会跳过在异常发生前传递到 process_records 的数据记录;也就是说,这些记录不会重新发送到引发了异常的记录处理器或使用者中的任何其他记录处理器。

shutdown

KCL 在处理结束(关闭原因为 TERMINATE)或工作程序不再响应(关闭 reasonZOMBIE)时调用 shutdown 方法。

def shutdown(self, checkpointer, reason)

处理操作在记录处理器不再从分片中接收任何记录时结束,因为分片已被拆分或合并,或者流已删除。

KCL 还会将 Checkpointer 对象传递到 shutdown。如果关闭 reasonTERMINATE,则记录处理器应完成处理任何数据记录,然后对此接口调用 checkpoint 方法。

修改配置属性

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性(请参阅 sample.properties)。

Application Name

KCL 需要一个应用程序名称,该名称在您的应用程序中以及同一区域的 DynamoDB 表中是唯一的。KCL 通过以下方法使用应用程序名称配置值:

  • 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。

  • KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息(如检查点和工作程序-分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅 跟踪 Amazon Kinesis Data Streams Application状态

设置凭证

您必须向默认凭证提供程序链中的凭证提供程序之一提供您的 AWS 凭证。您可使用 AWSCredentialsProvider 属性设置凭证提供程序。sample.properties 需要向默认凭证提供程序链中的凭证提供程序之一提供您的凭证。如果您正在 EC2 实例上运行您的使用者应用程序,我们建议您使用 IAM 角色配置实例。反映与此 IAM 角色关联的权限的 AWS 凭证通过实例的元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的使用者应用程序的凭证最安全。

该示例的属性文件将配置 KCL 以使用 sample_kclpy_app.py 中提供的记录处理器处理名为“words”的 Kinesis stream。