为 Python 应用程序编程 Kinesis Data Analytics - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为 Python 应用程序编程 Kinesis Data Analytics

您可以使用 Apache Flink Python 表 API 为 Python 应用程序编码 Kinesis Data Analytics 应用程序。Apache Flink 引擎将 Python 表 API 语句(在 Python 虚拟机中运行)转换为 Java 表 API 语句(在 Java VM 中运行)。

您可以通过执行以下操作来使用 Python 表 API:

  • 创建对StreamTableEnvironment.

  • Createtable对源流数据中的对象执行查询StreamTableEnvironment引用。

  • 对你的table用于创建输出表的对象。

  • 使用StatementSet.

要开始在 Kinesis Data Analytics 中使用 Python 表 API,请参阅适用于 Python 的 Apache Flink 的 Amazon Kinesis Data Analytics 入门.

读取和写入流数据

要读取和写入流数据,请在表环境中执行 SQL 查询。

创建表

以下代码示例演示了创建 SQL 查询的用户定义函数。SQL 查询创建了一个与 Kinesis 流交互的表:

def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)

阅读流媒体数据

以下代码示例演示如何使用上述CreateTable对表环境引用进行 SQL 查询以读取数据:

table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))

编写流媒体数据

以下代码示例演示如何使用中的 SQL 查询CreateTable创建输出表引用的示例,以及如何使用StatementSet与表交互以将数据写入目标 Kinesis 流:

table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))

读取运行时属性

您可以使用运行时属性配置应用程序,而无需更改应用程序代码。

为应用程序指定应用程序属性的方法与使用适用于 Java 的 Kinesis Data Analytics 应用程序的方式相同。您可以通过以下方式指定运行时属性:

通过读取名为的 json 文件来检索代码中的应用程序属性application_properties.jsonKinesis Data Analytics 运行时创建的。

以下代码示例演示如何从中读取应用程序属性application_properties.jsonfile:

file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)

以下用户定义的函数代码示例演示了从应用程序属性对象读取属性组:检索:

def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]

以下代码示例演示从上一个示例返回的属性组中读取名为 INPUT_STREAM_KEY 的属性:

input_stream = input_property_map[INPUT_STREAM_KEY]

创建应用程序的代码包

创建 Python 应用程序后,将代码文件和依赖关系捆绑到一个 zip 文件中。

你的 zip 文件必须包含一个 python 脚本main方法,并可以选择包含以下内容:

  • 其他 Python 代码文件

  • JAR 文件中用户定义的 Java 代码

  • JAR 文件中的 Java 库

注意

您的应用程序 zip 文件必须包含应用程序的所有依赖关系。你不能为你的应用程序引用来自其他来源的库。