在 Python 中开发 Kinesis Client Library 消费端 - Amazon Kinesis Data Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Python 中开发 Kinesis Client Library 消费端

可以使用 Kinesis Client Library(KCL)构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Python。

KCL 是一个 Java 库;对 Java 以外其他语言的支持是使用名为的多语言接口提供的。MultiLangDaemon此进程守护程序基于 Java,当您使用 Java 以外的 KCL 语言时,该程序会在后台运行。因此,如果您安装适用于 Python 的 KCL 并完全使用 Python 编写消费者应用程序,则仍然需要在系统上安装 Java,因为。 MultiLangDaemon此外 MultiLangDaemon ,您可能需要根据自己的用例自定义一些默认设置,例如它所连接的Amazon区域。有关 MultiLangDaemon on 的更多信息 GitHub,请访问 KCL MultiLangDaemon 项目页面。

要从中下载 Python KCL GitHub,请前往 K inesis 客户端库 (Python)。要下载 Python KCL 使用者应用程序的示例代码,请转到上的 KCL for Python 示例项目页面。 GitHub

在 Python 中实现 KCL 消费端应用程序时,您必须完成下列任务:

实现 RecordProcessor 类方法

RecordProcess 类必须扩展 RecordProcessorBase 类以实现以下方法:

initialize process_records shutdown_requested

此示例提供了可用作起点的实现。

#!/usr/bin/env python # Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Amazon Software License (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://aws.amazon.com/asl/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import print_function import sys import time from amazon_kclpy import kcl from amazon_kclpy.v3 import processor class RecordProcessor(processor.RecordProcessorBase): """ A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern: * initialize will be called once * process_records will be called zero or more times * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due a scaling change. """ def __init__(self): self._SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 self._CHECKPOINT_FREQ_SECONDS = 60 self._largest_seq = (None, None) self._largest_sub_seq = None self._last_checkpoint_time = None def log(self, message): sys.stderr.write(message) def initialize(self, initialize_input): """ Called once by a KCLProcess before any calls to process_records :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record processor has been assigned. """ self._largest_seq = (None, None) self._last_checkpoint_time = time.time() def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): """ Checkpoints with retries on retryable exceptions. :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records or shutdown :param str or None sequence_number: the sequence number to checkpoint at. :param int or None sub_sequence_number: the sub sequence number to checkpoint at. """ for n in range(0, self._CHECKPOINT_RETRIES): try: checkpointer.checkpoint(sequence_number, sub_sequence_number) return except kcl.CheckpointError as e: if 'ShutdownException' == e.value: # # A ShutdownException indicates that this record processor should be shutdown. This is due to # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard. # print('Encountered shutdown exception, skipping checkpoint') return elif 'ThrottlingException' == e.value: # # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many # dynamo writes. We will sleep temporarily to let it recover. # if self._CHECKPOINT_RETRIES - 1 == n: sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n)) return else: print('Was throttled while checkpointing, will attempt again in {s} seconds' .format(s=self._SLEEP_SECONDS)) elif 'InvalidStateException' == e.value: sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') else: # Some other error sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) time.sleep(self._SLEEP_SECONDS) def process_record(self, data, partition_key, sequence_number, sub_sequence_number): """ Called for each record that is passed to process_records. :param str data: The blob of data that was contained in the record. :param str partition_key: The key associated with this recod. :param int sequence_number: The sequence number associated with this record. :param int sub_sequence_number: the sub sequence number associated with this record. """ #################################### # Insert your processing logic here #################################### self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}" .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data))) def should_update_sequence(self, sequence_number, sub_sequence_number): """ Determines whether a new larger sequence number is available :param int sequence_number: the sequence number from the current record :param int sub_sequence_number: the sub sequence number from the current record :return boolean: true if the largest sequence should be updated, false otherwise """ return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \ (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1]) def process_records(self, process_records_input): """ Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers from the records to indicate where in the stream to checkpoint. :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the records. """ try: for record in process_records_input.records: data = record.binary_data seq = int(record.sequence_number) sub_seq = record.sub_sequence_number key = record.partition_key self.process_record(data, key, seq, sub_seq) if self.should_update_sequence(seq, sub_seq): self._largest_seq = (seq, sub_seq) # # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds # if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS: self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1]) self._last_checkpoint_time = time.time() except Exception as e: self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e)) def lease_lost(self, lease_lost_input): self.log("Lease has been lost") def shard_ended(self, shard_ended_input): self.log("Shard has ended checkpointing") shard_ended_input.checkpointer.checkpoint() def shutdown_requested(self, shutdown_requested_input): self.log("Shutdown has been requested, checkpointing.") shutdown_requested_input.checkpointer.checkpoint() if __name__ == "__main__": kcl_process = kcl.KCLProcess(RecordProcessor()) kcl_process.run()

修改配置属性

该示例提供了配置属性的默认值,如以下脚本所示。您可使用自己的值覆盖任何这些属性。

# The script that abides by the multi-language protocol. This script will # be executed by the MultiLangDaemon, which will communicate with this script # over STDIN and STDOUT according to the multi-language protocol. executableName = sample_kclpy_app.py # The name of an Amazon Kinesis stream to process. streamName = words # Used by the KCL as the name of this application. Will be used as the name # of an Amazon DynamoDB table which will store the lease and checkpoint # information for workers with this application name applicationName = PythonKCLSample # Users can change the credentials provider the KCL will use to retrieve credentials. # The DefaultAWSCredentialsProviderChain checks several other providers, which is # described here: # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Appended to the user agent of the KCL. Does not impact the functionality of the # KCL in any other way. processingLanguage = python/2.7 # Valid options at TRIM_HORIZON or LATEST. # See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax initialPositionInStream = TRIM_HORIZON # The following properties are also available for configuring the KCL Worker that is created # by the MultiLangDaemon. # The KCL defaults to us-east-1 #regionName = us-east-1 # Fail over time in milliseconds. A worker which does not renew it's lease within this time interval # will be regarded as having problems and it's shards will be assigned to other workers. # For applications that have a large number of shards, this msy be set to a higher number to reduce # the number of DynamoDB IOPS required for tracking leases #failoverTimeMillis = 10000 # A worker id that uniquely identifies this worker among all workers using the same applicationName # If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. #workerId = # Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. #shardSyncIntervalMillis = 60000 # Max records to fetch from Kinesis in a single GetRecords call. #maxRecords = 10000 # Idle time between record reads in milliseconds. #idleTimeBetweenReadsInMillis = 1000 # Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) #callProcessRecordsEvenForEmptyRecordList = false # Interval in milliseconds between polling to check for parent shard completion. # Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on # completion of parent shards). #parentShardPollIntervalMillis = 10000 # Cleanup leases upon shards completion (don't wait until they expire in Kinesis). # Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try # to delete the ones we don't need any longer. #cleanupLeasesUponShardCompletion = true # Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). #taskBackoffTimeMillis = 500 # Buffer metrics for at most this long before publishing to CloudWatch. #metricsBufferTimeMillis = 10000 # Buffer at most this many metrics before publishing to CloudWatch. #metricsMaxQueueSize = 10000 # KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls # to RecordProcessorCheckpointer#checkpoint(String) by default. #validateSequenceNumberBeforeCheckpointing = true # The maximum number of active threads for the MultiLangDaemon to permit. # If a value is provided then a FixedThreadPool is used with the maximum # active threads set to the provided value. If a non-positive integer or no # value is provided a CachedThreadPool is used. #maxActiveThreads = 0

Application Name

KCL 需要一个应用程序名称,该名称在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值:

  • 假定与此应用程序名称关联的所有工作线程在同一个流上一起运行。这些工作线程可分布在多个实例中。如果运行同一应用程序代码的其他实例,但使用不同的应用程序名称,则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。

  • KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息(如检查点和工作程序-分片映射)。每个应用程序都有自己的 DynamoDB 表。有关更多信息,请参阅使用租约表跟踪 KCL 消费端应用程序处理的分片

凭证

您必须向默认凭证提供程序链中的凭证提供程序之一提供您的 Amazon 凭证。可以使用 AWSCredentialsProvider 属性设置凭证提供程序。如果您在 Amazon EC2 实例上运行消费端应用程序,则建议您使用 IAM 角色进行配置。反映与此 IAM 角色关联的权限 Amazon 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。