Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为 Apache Flink Python 应用程序编程你的托管服务
你可以使用 Apache Flink Python 表编写适用于 Apache 的 Flink for Python 应用程序的托管服务。APIApache Flink 引擎将 Python API 表语句(在 Python 虚拟机中运行)转换为 Java API 表语句(在 Java 虚拟机中运行)。
您可以API通过执行以下操作来使用 Python 表:
创建对的引用
StreamTableEnvironment
。通过对
StreamTableEnvironment
参考文献执行查询,根据源流数据创建table
对象。对您的
table
对象执行查询以创建输出表。使用将输出表写入目的地
StatementSet
。
要开始使用适用于 Apache Flink API 的托管服务中的 Python 表,请参阅。开始使用适用于 Python 的 Apache Flink 的亚马逊托管服务
读取和写入流数据
要读取和写入流数据,需要对表环境执行SQL查询。
创建表
以下代码示例演示了用于创建SQL查询的用户定义函数。该SQL查询创建了一个与 Kinesis 流交互的表:
def create_table(table_name, stream_name, region, stream_initpos): return """ CREATE TABLE {0} ( `record_id` VARCHAR(64) NOT NULL, `event_time` BIGINT NOT NULL, `record_number` BIGINT NOT NULL, `num_retries` BIGINT NOT NULL, `verified` BOOLEAN NOT NULL ) PARTITIONED BY (record_id) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, stream_initpos)
读取流媒体数据
以下代码示例演示了如何使用前面的CreateTable
SQL查询对表环境引用来读取数据:
table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
写入流数据
以下代码示例演示如何使用CreateTable
示例中的SQL查询来创建输出表引用,以及如何使用与表交互StatementSet
以将数据写入目标 Kinesis 流:
table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
读取运行时属性
您可以使用运行时系统属性配置应用程序,而无需更改应用程序代码。
为应用程序指定应用程序属性的方式与使用 Java 应用程序的 Managed Service for Apache Flink 方法相同。您可以使用以下方法指定运行时系统属性:
使用动CreateApplication作。
使用动UpdateApplication作。
使用控制台配置应用程序。
您可以通过读取 Managed Service for Apache Flink 运行时创建application_properties.json
的名为 json 文件来检索代码中的应用程序属性。
以下代码示例演示了如何从application_properties.json
文件中读取应用程序属性:
file_path = '/etc/flink/application_properties.json' if os.path.isfile(file_path): with open(file_path, 'r') as file: contents = file.read() properties = json.loads(contents)
以下用户定义的函数代码示例演示了如何从应用程序属性对象中读取属性组:检索:
def property_map(properties, property_group_id): for prop in props: if prop["PropertyGroupId"] == property_group_id: return prop["PropertyMap"]
以下代码示例演示了KEY从上一个示例返回的属性组中读取名为 INPUT STREAM _ 的属性:
input_stream = input_property_map[INPUT_STREAM_KEY]
创建应用程序的代码包
创建 Python 应用程序后,即可将代码文件和依赖项捆绑到一个 zip 文件中。
您的 zip 文件必须包含带有main
方法的 python 脚本,并且可以选择包含以下内容:
其他 Python 代码文件
JAR文件中用户定义的 Java 代码
JAR文件中的 Java 库
注意
您的应用程序 zip 文件必须包含应用程序的所有依赖项。您不能为应用程序引用其他来源的库。