适用于 Apache Flink 的亚马逊托管服务(亚马逊 MSF)以前被称为适用于 Apache Flink 的亚马逊 Kinesis Data Analytics。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行适用于 Apache Flink 的托管服务应用程序
在本练习中,您将使用 Kinesis 数据流作为源和接收器创建适用于 Apache Flink 的托管服务。
本节包含以下步骤。
创建依赖资源
在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:
-
一个 Amazon S3 存储桶,用于存储应用程序的代码和写入应用程序输出。
注意
本教程假设您在 us-east-1 区域部署应用程序。如果您使用其他区域,则必须相应地调整所有步骤。
创建 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:
-
《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。通过附加您的登录名,为 Amazon S3 存储桶指定一个全球唯一的名称。
注意
请务必在本教程中使用的区域中创建存储桶。本教程的默认设置为 us-east-1。
其他资源
在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/<my-application>
的日志组。 -
名为
kinesis-analytics-log-stream
的日志流。
设置本地开发环境
对于开发和调试,您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。任何 Apache Flink 依赖项都使用 Maven 作为普通的 Java 依赖项进行处理。
注意
在你的开发计算机上,你必须安装 Java JDK 11、Maven 和 Git。我们建议你使用诸如 Eclipse Java Neon 或 IntelliJ IDEA
对您的 Amazon 会话进行身份验证
该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有有效的 Amazon 经过身份验证的会话,并具有写入 Kinesis 数据流的权限。使用以下步骤对您的会话进行身份验证:
-
如果您没有配置带有有效凭据 Amazon CLI 的命名配置文件,请参阅设置 Amazon Command Line Interface (Amazon CLI)。
-
如果您的 IDE 有要集成的插件 Amazon,则可以使用该插件将凭据传递给 IDE 中运行的应用程序。有关更多信息,请参阅 IntelliJ IDEA Amazon 工具包和用于
编译应用程序或运行 Eclipse 的Amazon 工具包。
下载并检查 Apache Flink 流式处理 Java 代码
此示例的应用程序代码可从中获得 GitHub。
下载 Java 应用程序代码
-
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
导航到
./java/GettingStartedTable
目录。
查看应用程序组件
该应用程序完全是在com.amazonaws.services.msf.BasicTableJob
课堂上实现的。该main()
方法定义源、变换和接收器。执行由此方法末尾的执行语句启动。
注意
为了获得最佳的开发者体验,该应用程序设计为无需更改任何代码即可在适用于 Apache Flink 的亚马逊托管服务上运行,也可在本地运行,以便在 IDE 中进行开发。
-
要读取运行时配置,使其在适用于 Apache Flink 的 Amazon 托管服务和 IDE 中运行时能够正常运行,应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下,应用程序加载运行时配置的方式会有所不同:
-
当应用程序检测到自己在 IDE 中以独立模式运行时,请
application_properties.json
生成包含在项目资源文件夹中的文件。文件内容如下。 -
当应用程序在适用于 Apache Flink 的亚马逊托管服务中运行时,默认行为会根据您将在适用于 Apache Flink 的亚马逊托管服务 Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序。
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
该
main()
方法定义应用程序数据流并运行它。-
初始化默认的流媒体环境。在此示例中,我们展示了如何创建
StreamExecutionEnvironment
要用于 DataStream API 的,以及StreamTableEnvironment
要用于 SQL 和表 API 的。这两个环境对象是对同一个运行时环境的两个单独引用,用法不同 APIs。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
-
加载应用程序配置参数。这将自动从正确的位置加载它们,具体取决于应用程序的运行位置:
Map<String, Properties> applicationParameters = loadApplicationProperties(env);
-
当 Flink 完成检查点
时,应用程序用于将结果写入 Amazon S3 输出文件的FileSystem 接收器连接器 。必须启用检查点才能将文件写入目标。当应用程序在适用于 Apache Flink 的 Amazon 托管服务中运行时,应用程序配置会控制检查点并默认启用该检查点。相反,在本地运行时,默认情况下会禁用检查点。该应用程序检测到它在本地运行,并每 5,000 毫秒配置一次检查点。 if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
-
此应用程序不接收来自实际外部来源的数据。它生成随机数据以通过DataGen 连接
器进行处理。此连接器可用于 DataStream API、SQL 和表 API。为了演示两者之间的集成 APIs,应用程序使用 DataStram API 版本,因为它提供了更大的灵活性。在本例 StockPriceGeneratorFunction
中,每条记录都是由调用的生成器函数生成的,您可以在其中放置自定义逻辑。DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class));
-
在 DataStream API 中,记录可以有自定义类。课程必须遵循特定的规则,这样 Flink 才能将其用作记录。有关更多信息,请参阅支持的数据类型
。在此示例中,该 StockPrice
类是一个 POJO。 -
然后将源代码附加到执行环境中,生成一个
DataStream
StockPrice
。此应用程序不使用事件时间语义,也不会生成水印。以 1 的并行度运行 DataGenerator 源代码,与应用程序其余部分的并行度无关。 DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1);
-
数据处理流程中的后续内容是使用表 API 和 SQL 定义的。为此,我们将 o DataStream f StockPrices 转换为表格。表的架构是从
StockPrice
类中自动推断出来的。Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
-
以下代码片段展示了如何使用编程的 Table API 定义视图和查询:
Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
-
定义接收器表是为了将结果作为 JSON 文件写入 Amazon S3 存储桶。为了说明与以编程方式定义视图的区别,在表 API 中,汇表是使用 SQL 定义的。
tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")");
-
的最后一步是
executeInsert()
将筛选后的股票价格视图插入汇表中。此方法启动我们迄今为止定义的数据流的执行。filteredStockPricesTable.executeInsert("s3_sink");
-
使用 pom.xml 文件
pom.xml 文件定义了应用程序所需的所有依赖关系,并设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。
-
有些依赖关系有
provided
作用域。当应用程序在适用于 Apache Flink 的亚马逊托管服务中运行时,这些依赖关系将自动可用。它们是应用程序或 IDE 中本地应用程序所必需的。有关更多信息,请参阅(更新到 TableAPI)在本地运行应用程序。确保您使用的 Flink 版本与您将在适用于 Apache Flink 的亚马逊托管服务中使用的运行时版本相同。要使用 TableAPI 和 SQL,必须将flink-table-planner-loader
和(两者都包含在provided
作用域flink-table-runtime-dependencies
中)。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
你必须使用默认作用域向 pom 添加其他 Apache Flink 依赖项。例如,DataGen 连接器
、FileSystem SQL 连接器 和 JSON 格式 。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
-
为了在本地运行时写入 Amazon S3,S3 Hadoop 文件系统也包含在作用域中。
provided
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Maven Java 编译器插件确保代码是根据 Java 11 编译的,Java 11 是 Apache Flink 目前支持的 JDK 版本。
-
Maven Shade 插件打包了 fat-jar,但不包括运行时提供的一些库。它还指定了两个变压器:
ServicesResourceTransformer
和ManifestResourceTransformer
。后者配置包含启动应用程序的main
方法的类。如果你重命名了主类,别忘了更新这个转换器。 -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
在本地运行应用程序
您可以在 IDE 中本地运行和调试 Flink 应用程序。
注意
在继续操作之前,请验证输入和输出流是否可用。请参阅创建两个 Amazon Kinesis 数据流。此外,请确认您有权从两个流中读取和写入数据。请参阅对您的 Amazon 会话进行身份验证。
设置本地开发环境需要 Java 11 JDK、Apache Maven 和用于 Java 开发的 IDE。确认您满足所需的先决条件。请参阅满足完成练习的先决条件。
将 Java 项目导入你的 IDE
要开始在 IDE 中使用该应用程序,必须将其作为 Java 项目导入。
您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将./jave/GettingStartedTable
子目录中的内容导入到 IDE 中。
使用 Maven 将代码作为现有 Java 项目插入。
注意
导入新 Java 项目的确切过程因所使用的 IDE 而异。
修改本地应用程序配置
在本地运行时,应用程序使用下项目资源文件夹中application_properties.json
文件中的配置./src/main/resources
。对于本教程应用程序,配置参数是存储桶的名称和写入数据的路径。
编辑配置并修改 Amazon S3 存储桶的名称,使其与您在本教程开头创建的存储桶相匹配。
[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "
<bucket-name>
", "path": "output" } } ]
注意
例如,配置属性name
必须仅包含存储桶名称my-bucket-name
。请勿包含任何前缀,例如s3://
或尾部斜杠。
如果修改路径,请省略所有前导或尾部的斜杠。
设置 IDE 运行配置
您可以像运行任何 Java 应用程序一样,通过运行主类com.amazonaws.services.msf.BasicTableJob
直接从 IDE 运行和调试 Flink 应用程序。在运行应用程序之前,必须设置运行配置。设置取决于您使用的 IDE。例如,请参阅 IntelliJ ID EA 文档中的运行/调试配置
-
将@@
provided
依赖项添加到类路径中。这是确保在本地运行时将具有provided
作用域的依赖关系传递给应用程序所必需的。如果不进行此设置,应用程序会立即显示class not found
错误。 -
将访问 Kinesis 直播的 Amazon 凭证传递给应用程序。最快的方法是使用 IntelliJ IDEA Amazon 工具包
。在 “运行” 配置中使用此 IDE 插件,可以选择特定的 Amazon 配置文件。 Amazon 使用此配置文件进行身份验证。您无需直接传递 Amazon 证书。 -
验证 IDE 是否使用 JDK 11 运行应用程序。
在 IDE 中运行该应用程序
为设置运行配置后BasicTableJob
,您可以像常规 Java 应用程序一样运行或调试它。
注意
你不能直接java -jar
...
从命令行运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。
当应用程序成功启动时,它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。
21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob [] - s3Path is ExampleBucket/my-output-bucket ...
初始化完成后,应用程序不会再发出任何日志条目。当数据流动时,不会发出任何日志。
要验证应用程序是否正确处理数据,您可以检查输出存储桶的内容,如下一节所述。
注意
不发出有关流动数据的日志是 Flink 应用程序的正常行为。在每条记录上发出日志可能便于调试,但在生产环境中运行时可能会增加大量开销。
观察应用程序向 S3 存储桶写入数据
此示例应用程序在内部生成随机数据,并将这些数据写入您配置的目标 S3 存储桶。除非您修改了默认配置路径,否则数据将以以下格式./output/<yyyy-MM-dd>/<HH>
写入output
路径,然后是数据和小时分区。
FileSystem 接收器连接器
if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
浏览 S3 存储桶并观察应用程序写入的文件
-
打开 Amazon S3 控制台,网址为 https://console.aws.amazon.com/s3/
。
-
选择您之前创建的存储桶。
-
导航到
output
路径,然后导航到与 UTC 时区当前时间相对应的日期和小时文件夹。 -
定期刷新以观察每 5 秒钟出现的新文件。
-
选择并下载一个文件以观察其内容。
注意
默认情况下,这些文件没有扩展名。内容的格式为 JSON。您可以使用任何文本编辑器打开文件来检查内容。
停止应用程序在本地运行
停止应用程序在 IDE 中运行。IDE 通常会提供 “停止” 选项。确切的位置和方法取决于 IDE。
编译并打包您的应用程序代码
在本节中,您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 编译和打包代码。
使用 Maven 命令行进行编译和打包
移至包含 Jave GettingStarted 项目的目录并运行以下命令:
$ mvn package
使用 IDE 进行编译和打包
mvn package
从你的 IDE Maven 集成中运行。
在这两种情况下,都会创建 JAR 文件target/amazon-msf-java-table-app-1.0.jar
。
注意
从 IDE 运行生成项目可能无法创建 JAR 文件。
上传应用程序代码 JAR 文件
在本节中,您将您在上一节中创建的 JAR 文件上传到您在本教程开头创建的 Amazon S3 存储桶。如果你已经完成了,请完成创建 Amazon S3 存储桶。
上传应用程序代码
打开 Amazon S3 控制台,网址为 https://console.aws.amazon.com/s3/
。 -
选择您之前为应用程序代码创建的存储桶。
-
选择 “上传” 字段。
-
选择 Add files。
-
导航到上一节中生成的 JAR 文件:
target/amazon-msf-java-table-app-1.0.jar
。 -
在不更改任何其他设置的情况下选择 “上传”。
警告
确保在中选择了正确的 JAR 文件
<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar
。目标目录还包含您无需上传的其他 JAR 文件。
创建和配置适用于 Apache Flink 的托管服务 Flink 应用程序
您可以使用控制台或 Apache Flink 应用程序创建和配置托管服务。 Amazon CLI在本教程中,您将使用控制台。
注意
当您使用控制台创建应用程序时,系统会为您创建您的 Amazon Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。使用创建应用程序时 Amazon CLI,必须单独创建这些资源。
创建应用程序
登录并通过 /f Amazon Web Services Management Console link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com
-
确认选择了正确的区域:美国东部(弗吉尼亚北部)us-east-1。
-
在右侧菜单上,选择 Apache Flink 应用程序,然后选择 “创建流媒体应用程序”。或者,在初始页面的 “入门” 部分中选择 “创建流媒体应用程序”。
-
在创建流媒体应用程序页面上,完成以下操作:
-
在 “选择设置流处理应用程序的方法” 中,选择 “从头开始创建”。
-
对于 Apache Flink 配置,即应用程序 Flink 版本,请选择 Ap ache Flink 1.19。
-
在 “应用程序配置” 部分,完成以下操作:
-
对于 应用程序名称 ,输入
MyApplication
。 -
对于描述,输入
My Java Table API test app
。 -
要访问应用程序资源,请选择使用所需策略创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1。
-
-
在应用程序设置模板中,完成以下操作:
-
对于 “模板”,选择 “开发”。
-
-
-
选择 “创建流媒体应用程序”。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-
MyApplication
-us-east-1
-
角色:
kinesisanalytics-
MyApplication
-us-east-1
编辑 IAM 策略
编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。
编辑 IAM policy 以添加 S3 存储桶权限
使用 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-east-1
策略。 -
选择 “编辑”,然后选择 “JSON” 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901
) 替换为您的账户 ID 和<bucket-name>
您创建的 S3 存储桶的名称。 -
选择下一步,然后选择保存更改。
配置应用程序
编辑应用程序以设置应用程序代码对象。
配置应用程序
-
在MyApplication页面上,选择配置。
-
在应用程序代码位置部分,选择配置。
-
对于 Amazon S3 存储桶,请选择您之前为应用程序代码创建的存储桶。选择 “浏览” 并选择正确的存储桶,然后选择 “选择”。不要点击存储桶名称。
-
在 Amazon S3 对象的路径中,输入
amazon-msf-java-table-app-1.0.jar
。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-east-1
。 -
在 “运行时属性” 部分中,添加以下属性。
-
选择 “添加新项目” 并添加以下每个参数:
组 ID 键 值 bucket
name
your-bucket-name
bucket
path
output
-
请勿修改任何其他设置。
-
选择保存更改。
注意
当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication
-
日志流:
kinesis-analytics-log-stream
运行应用程序
应用程序现已配置完毕,可以运行了。
运行应用程序
-
返回适用于 Apache Flink 的亚马逊托管服务中的控制台页面并选择。MyApplication
-
选择 “运行” 以启动应用程序。
-
在应用程序还原配置中,选择使用最新快照运行。
-
选择运行。
应用程序启动
Running
后,“应用程序状态” 详细信息会从Ready
到,Starting
然后转换为。
当应用程序处于Running
状态时,您可以打开 Flink 控制面板。
打开仪表板并查看作业
-
选择 “打开 Apache Flink 仪表板”。仪表板将在新页面中打开。
-
在 “正在运行的作业” 列表中,选择您可以看到的单个作业。
注意
如果您设置了运行时属性或编辑了 IAM 策略不正确,则应用程序状态可能会更改为
Running
,但是 Flink 控制面板会显示任务持续重启。这是应用程序配置错误或缺少访问外部资源的权限的常见故障情况。发生这种情况时,请检查 Flink 控制面板中的 “异常” 选项卡以调查问题的原因。
观察正在运行的应用程序的指标
在该MyApplication页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序中的一些基本指标。
查看指标
-
在 “刷新” 按钮旁边,从下拉列表中选择 10 秒。
-
当应用程序运行且运行正常时,您可以看到正常运行时间指标不断增加。
-
完全重启指标应为零。如果它增加,则配置可能会出现问题。查看 Flink 控制面板上的 “异常” 选项卡以调查问题。
-
在运行良好的应用程序中,失败的检查点数指标应为零。
注意
此仪表板显示一组固定的指标,粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。
观察应用程序向目标存储桶写入数据
现在,您可以观察在适用于 Apache Flink 的亚马逊托管服务中运行的应用程序将文件写入亚马逊 S3。
要观察这些文件,请按照应用程序在本地运行时检查正在写入的文件的相同过程进行操作。请参阅观察应用程序向 S3 存储桶写入数据。
请记住,应用程序会在 Flink 检查点上写入新文件。在适用于 Apache Flink 的亚马逊托管服务上运行时,检查点默认处于启用状态,每 60 秒运行一次。该应用程序大约每 1 分钟创建一次新文件。
停止应用程序
要停止应用程序,请转到名为的 Apache Flink 托管服务应用程序的控制台页面。MyApplication
停止应用程序
-
从 “操作” 下拉列表中,选择 “停止”。
-
应用程序详细信息中的状态从
Running
变为Stopping
,然后转换到应用程序完全停止Ready
时。注意
别忘了停止从 Python 脚本或 Kinesis 数据生成器向输入流发送数据。