Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Apache Flink 有状态函数
有状态函数是一种
Stateful Functions 应用程序基本上只是一个 Apache Flink 应用程序,因此可以部署到 Managed Service for Apache Flink 中。但是,为 Kubernetes 集群打包状态函数和为 Managed Service for Apache Flink打包状态函数之间有一些区别。Stateful Functions 应用程序最重要的方面是模块配置包含配置
以下是 Apache Flink 托管服务的 StateFun Python 示例的改编版:
Apache Flink 应用程序模板
客户可以编译一个 Flink 应用程序 jar,它只调用 Stateful Functions 运行时并包含所需的依赖项,而不是使用客户容器来运行状态函数。对于 Flink 1.13,所需的依赖项如下所示:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>statefun-flink-distribution</artifactId> <version>3.1.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
Flink 应用程序调用有状态函数运行时的主要方法如下所示:
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StatefulFunctionsConfig stateFunConfig = StatefulFunctionsConfig.fromEnvironment(env); stateFunConfig.setProvider((StatefulFunctionsUniverseProvider) (classLoader, statefulFunctionsConfig) -> { Modules modules = Modules.loadFromClassPath(); return modules.createStatefulFunctionsUniverse(stateFunConfig); }); StatefulFunctionsJob.main(env, stateFunConfig); }
请注意,这些组件是通用的,与状态函数中实现的逻辑无关。
模块配置的位置
Stateful Functions 模块配置需要包含在类路径中,才能在有状态函数运行时中被发现。最好将其包含在 Flink 应用程序的资源文件夹中,然后将其打包到 jar 文件中。
与常见的 Apache Flink 应用程序类似,您可以使用 maven 创建一个 uber jar 文件并将其部署到 Managed Service for Apache Flink 上。