Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。
创建并运行适用于 Apache Flink 的托管服务应用程序
在本练习中,您将创建 Managed Service for Apache Flink 应用程序,并将数据流作为源和接收器。
本节包含以下步骤。
创建相关资源
在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:
-
存储应用程序代码和写入应用程序输出的 Amazon S3 存储桶。
注意
本教程假设您在 us-east-1 区域中部署应用程序。如果您使用其他区域,则必须相应地调整所有步骤。
创建 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:
-
《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称。
注意
请确保在用于本教程的区域中创建存储桶。教程的默认区域为 us-east-1。
其他资源
在您创建应用程序时,Managed Service for Apache Flink 会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/<my-application>的日志组。 -
名为
kinesis-analytics-log-stream的日志流。
设置本地开发环境
对于开发和调试,您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。使用 Maven 作为普通的 Java 依赖项处理任何 Apache Flink 依赖项。
注意
在开发计算机上,必须安装 Java JDK 11、Maven 和 Git。我们建议您使用开发环境(如 Eclipse Java Neon
对您的 Amazon 会话进行身份验证
该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有经过身份验证的有效 Amazon 会话,并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证:
-
如果您没有 Amazon CLI 和已配置有效凭证的命名配置文件,请参阅 设置 Amazon Command Line Interface (Amazon CLI)。
-
如果您的 IDE 有集成 Amazon 的插件,则可以使用该插件将凭证传递给 IDE 中运行的应用程序。有关更多信息,请参阅适用于 IntelliJ IDEA 的 Amazon Toolkit
和用于编译应用程序或运行 Eclipse 的 Amazon Toolkit。
下载并检查 Apache Flink 流式处理 Java 代码
在 GitHub 中提供了该示例的应用程序代码。
下载 Java 应用程序代码
-
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
导航到
./java/GettingStartedTable目录。
审核应用程序组件
该应用程序完全在 com.amazonaws.services.msf.BasicTableJob 类中实施。main() 方法定义源、转换和接收器。由此方法末尾的执行语句启动执行。
注意
为获得最佳的开发人员体验,该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行,以便在您的 IDE 中进行开发。
-
要读取运行时配置,使其在 Amazon Managed Service for Apache Flink 和 IDE 中能够正常运行,应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下,应用程序加载运行时配置的方式会有所不同:
-
当应用程序检测到其在 IDE 中以独立模式运行时,会形成包含在项目 resources 文件夹中的
application_properties.json文件。该文件的内容如下所示。 -
当应用程序在 Amazon Managed Service for Apache Flink 中运行时,默认行为会根据您将在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅 创建并配置 Managed Service for Apache Flink 应用程序 。
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
main()方法定义应用程序数据流程并运行它。-
初始化默认的流环境。在此示例中,我们展示如何创建用于 DataStream API 的
StreamExecutionEnvironment,以及用于 SQL 和表 API 的StreamTableEnvironment。为使用不同的 API,这两个环境对象是对同一个运行时环境的两个单独引用。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build()); -
加载应用程序配置参数。这将自动从正确的位置加载这些参数,具体取决于应用程序的运行位置:
Map<String, Properties> applicationParameters = loadApplicationProperties(env); -
文件系统接收器连接器
,当 Flink 完成检查点 时,应用程序使用该连接器将结果写入 Amazon S3 输出文件。必须启用检查点,才能将文件写入目标。当应用程序在 Amazon Managed Service for Apache Flink 中运行时,应用程序配置会控制检查点并默认启用该检查点。相反,在本地运行时,默认情况下检查点处于禁用状态。该应用程序检测到它在本地运行,并且每 5,000 毫秒配置一次检查点。 if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); } -
此应用程序不接收来自实际外部源的数据。它生成随机数据以通过 DataGen 连接器
进行处理。此连接器可用于 DataStream API、SQL 和表 API。为演示 API 之间的集成,该应用程序使用 DataStram API 版本,因为它提供了更大的灵活性。在此案例中,每条记录都是由称为 StockPriceGeneratorFunction的生成器函数生成的,您可以在该函数中放置自定义逻辑。DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class)); -
在 DataStream API 中,记录可以有自定义类。这些类必须遵循特定的规则,这样 Flink 才能将其用作记录。有关更多信息,请参阅支持的数据类型
。在此示例中, StockPrice类是一个 POJO。 -
然后将源代码附加到执行环境中,并且生成
StockPrice的DataStream。此应用程序不使用事件时间语义,也不会生成水印。以 1 的并行度运行 DataGenerator 源代码,独立于应用程序其余部分的并行度。 DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1); -
数据处理流程中的后续内容是使用表 API 和 SQL 定义的。为此,我们将 StockPrices 的 DataStream 转换为表。表的架构从
StockPrice类中自动推断。Table stockPricesTable = tableEnv.fromDataStream(stockPrices); -
以下代码片段展示如何使用编程的表 API 定义视图和查询:
Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable); -
定义接收器表以将结果作为 JSON 文件写入 Amazon S3 存储桶。为说明与编程方式定义视图的区别,在表 API 中,使用 SQL 定义接收器表。
tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")"); -
最后一步是
executeInsert(),其中将筛选后的股票价格视图插入接收器表中。此方法启动我们到目前为止定义的数据流程的执行。filteredStockPricesTable.executeInsert("s3_sink");
-
使用 pom.xml 文件
pom.xml 文件定义应用程序所需的所有依赖项,并且设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。
-
有些依赖项具有
provided范围。当应用程序在 Amazon Managed Service for Apache Flink 中运行时,这些依赖项自动可用。它们是应用程序或 IDE 中本地应用程序所必需的条件。有关更多信息,请参阅(表 API 的更新)在本地运行应用程序。请确保使用的 Flink 版本与 Amazon Managed Service for Apache Flink 中使用的运行时版本相同。要使用表 API 和 SQL,必须包括flink-table-planner-loader和flink-table-runtime-dependencies(两者都具有provided范围)。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
必须使用默认范围向 pom 添加其他 Apache Flink 依赖项。例如,DataGen 连接器
、文件系统 SQL 连接器 和 JSON 格式 。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
-
为了在本地运行时写入 Amazon S3,也要包含 S3 Hadoop 文件系统并采用
provided范围。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> -
Maven Java 编译器插件确保根据 Java 11 编译代码,Java 11 是 Apache Flink 目前支持的 JDK 版本。
-
Maven Shade 插件打包 fat-jar,但不包括运行时提供的一些库。它还指定两个转换器:
ServicesResourceTransformer和ManifestResourceTransformer。后者配置包含启动应用程序的main方法的类。如果重命名主类,请不要忘记更新此转换器。 -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
在本地运行应用程序
您可以在 IDE 中本地运行和调试 Flink 应用程序。
注意
在继续之前,请确认输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您是否具有从两个流中读取和写入的权限。请参阅 对您的 Amazon 会话进行身份验证。
设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE 来进行 Java 开发。确认您满足所需的先决条件。请参阅 满足完成练习的先决条件。
将 Java 项目导入您的 IDE
要开始在 IDE 中使用该应用程序,必须将其作为 Java 项目导入。
您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将 ./jave/GettingStartedTable 子目录中的内容导入 IDE。
使用 Maven 将代码作为现有 Java 项目插入。
注意
导入新 Java 项目的确切过程因所使用的 IDE 而异。
修改本地应用程序配置
在本地运行时,应用程序使用 ./src/main/resources 下项目资源文件夹中的 application_properties.json 文件内的配置。对于本教程应用程序,配置参数是存储桶的名称和写入数据的路径。
编辑配置并修改 Amazon S3 存储桶名称,使其与您在本教程开头创建的存储桶相匹配。
[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "<bucket-name>", "path": "output" } } ]
注意
配置属性 name 必须仅包含存储桶名称,例如 my-bucket-name。请勿包含任何前缀(例如 s3://)或尾部斜杠。
如果修改路径,请省略所有前导或尾部斜杠。
设置 IDE 运行配置
您可以像运行任何 Java 应用程序一样,通过运行主类 com.amazonaws.services.msf.BasicTableJob 直接从 IDE 运行和调试 Flink 应用程序。运行应用程序前,必须设置“运行”配置。设置取决于您使用的 IDE。例如,请参阅 IntelliJ IDEA 文档中的运行/调试配置
-
将
provided依赖项添加到类路径中。这是确保在本地运行时将具有provided范围的依赖项传递给应用程序所必需的条件。如果不进行此设置,应用程序会立即显示class not found错误。 -
将访问 Kinesis 流的 Amazon 凭证传递给应用程序。最快的方法是使用适用于 IntelliJ IDEA 的 Amazon Toolkit
。在“运行”配置中使用此 IDE 插件,可以选择特定的 Amazon 配置文件。 Amazon 使用此配置文件进行身份验证。您无需直接传递 Amazon 凭证。 -
验证 IDE 是否使用 JDK 11 运行应用程序。
在 IDE 中运行应用程序
设置 BasicTableJob 的“运行”配置后,您可以像常规 Java 应用程序一样运行或调试它。
注意
不能从命令行使用 java -jar
... 直接运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。
当应用程序成功启动时,它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。
21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob [] - s3Path is ExampleBucket/my-output-bucket ...
初始化完成后,应用程序不会再发出任何日志条目。当数据流动时,不会发出任何日志。
要验证应用程序是否正确处理数据,您可以检查输出存储桶的内容,如下一节所述。
注意
Flink 应用程序的正常行为是不发出有关流动数据的日志。在每条记录上发出日志可能便于调试,但在生产环境中运行时可能会增加大量开销。
观察将数据写入 S3 存储桶的应用程序
此示例应用程序在内部生成随机数据,并将这些数据写入您配置的目标 S3 存储桶。除非您修改默认配置路径,否则数据将以 ./output/<yyyy-MM-dd>/<HH> 格式写入 output 路径,后跟日期和小时分区。
文件系统接收器连接器
if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
浏览 S3 存储桶并观察应用程序写入的文件
-
通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/
。
-
选择您之前创建的存储桶。
-
导航到
output路径,然后导航到与 UTC 时区当前时间相对应的日期和小时文件夹。 -
定期刷新以观察每 5 秒钟出现的新文件。
-
选择并下载一个文件以观察其内容。
注意
默认情况下,这些文件没有扩展名。该内容的格式设置为 JSON。您可以使用任何文本编辑器打开文件来检查内容。
停止在本地运行的应用程序
停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于 IDE。
编译并打包您的应用程序代码
在本节中,您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 编译和打包代码。
要使用 Maven 命令行进行编译和打包,请执行以下操作:
移至包含 Jave GettingStarted 项目的目录,然后运行以下命令:
$ mvn package
使用 IDE 进行编译和打包
从 IDE Maven 集成中运行 mvn package。
在这两种情况下,都会创建 JAR 文件 target/amazon-msf-java-table-app-1.0.jar。
注意
从 IDE 运行构建项目可能不会创建 JAR 文件。
上传应用程序代码 JAR 文件
在本节中,您将在上一节中创建的 JAR 文件上传到在教程开始时创建的 Amazon S3 存储桶。如果已经执行此操作,请完成 创建 Amazon S3 存储桶。
上传应用程序代码
通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/
。 -
选择您之前为应用程序代码创建的存储桶。
-
选择上传字段。
-
选择添加文件。
-
导航到上一节中生成的 JAR 文件:
target/amazon-msf-java-table-app-1.0.jar。 -
在不更改任何其他设置的情况下选择上传。
警告
确保在
<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar中选择正确的 JAR 文件。目标目录还包含您无需上传的其他 JAR 文件。
创建并配置 Managed Service for Apache Flink 应用程序
您可以使用控制台或 Amazon CLI 创建和配置 Managed Service for Apache Flink 应用程序。在本教程中,您将使用控制台。
注意
当您使用控制台创建应用程序时,将创建您的 Amazon Identity and Access Management(IAM) 和 Amazon CloudWatch Logs 资源。当您使用 Amazon CLI 创建应用程序时,您必须单独创建这些资源。
创建应用程序
登录 Amazon Web Services 管理控制台 并打开 Amazon MSF 控制台,网址为 https://console.aws.amazon.com/flink。
-
确认选择正确的区域:美国东部(弗吉尼亚北部)us-east-1。
-
在右侧菜单上,选择 Apache Flink 应用程序,然后选择创建流应用程序。或者,在初始页面的入门部分中选择创建流应用程序。
-
在创建流应用程序页面上,完成以下操作:
-
在选择设置流处理应用程序的方法中,选择从头开始创建。
-
对于 Apache Flink 配置,应用程序 Flink 版本,请选择 Apache Flink 1.19。
-
在应用程序配置部分中,完成以下步骤:
-
对于 应用程序名称 ,输入
MyApplication。 -
对于描述,输入
My Java Table API test app。 -
对于对应用程序资源的访问权限,选择创建/更新具有所需策略的 IAM 角色 Kinesis-analytics-myApplication-us-east-1。
-
-
在应用程序设置模板中,完成以下操作:
-
对于模板,请选择开发。
-
-
-
选择创建流应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-east-1 -
角色:
kinesisanalytics-MyApplication-us-east-1
编辑 IAM 策略
编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。
编辑 IAM policy 以添加 S3 存储桶权限
通过 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-east-1策略。 -
选择编辑,然后选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID(
012345678901)替换为您的账户 ID,将<bucket-name>替换为您创建的 S3 存储桶的名称。 -
选择下一步,然后选择保存更改。
配置应用程序
编辑应用程序以设置应用程序代码构件。
配置应用程序
-
在我的应用程序 页面上,选择配置。
-
在应用程序代码位置部分,选择配置。
-
对于 Amazon S3 存储桶,请选择之前为应用程序代码创建的存储桶。选择浏览并选择正确的存储桶,然后选择选择。请勿单击储存桶名称。
-
在 Amazon S3 对象的路径中,输入
amazon-msf-java-table-app-1.0.jar。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-east-1。 -
在运行时属性部分中,添加以下属性。
-
选择添加新项目并添加以下每个参数:
组 ID 键 值 bucketnameyour-bucket-namebucketpathoutput -
请勿修改任何其他设置。
-
选择保存更改。
注意
在选择启用 Amazon CloudWatch 日志记录时,Managed Service for Apache Flink 将为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
运行应用程序
应用程序现已完成配置,准备好运行。
运行应用程序
-
返回 Amazon Managed Service for Apache Flink 中的控制台页面,然后选择我的应用程序。
-
选择运行以启动应用程序。
-
在应用程序还原配置中,选择使用最新快照运行。
-
选择运行。
应用程序详细信息中的状态会从
Ready转换到Starting,然后在应用程序启动后转换到Running。
当应用程序处于 Running 状态时,您可以打开 Flink 控制面板。
打开控制面板并查看作业
-
选择打开 Apache Flink 控制面板。控制面板将在新页面中打开。
-
在正在运行的作业列表中,选择您可以看到的单个作业。
注意
如果您错误设置运行时属性或编辑 IAM 策略,则应用程序状态可能会更改为
Running,但是 Flink 控制面板显示任务正在持续重新启动。在应用程序配置错误或缺乏访问外部资源的权限时,通常会出现这种故障场景。发生这种情况时,请检查 Flink 控制面板中的异常选项卡以调查问题的原因。
观察运行中应用程序的指标
在我的应用程序页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序的一些基本指标。
查看指标
-
在刷新按钮旁边,从下拉列表中选择 10 秒。
-
当应用程序运行且运行状况良好时,您可以看到正常运行时间指标不断增加。
-
完全重新启动指标应为零。如果该指标增加,则配置可能存在问题。查看 Flink 控制面板上的异常选项卡以调查问题。
-
在运行状况良好的应用程序中,失败的检查点数指标应为零。
注意
此控制面板显示一组固定的指标,且粒度为 5 分钟。您可以在 CloudWatch 控制面板中创建包含任何指标的自定义应用程序控制面板。
观察将数据写入目标存储桶的应用程序
现在,您可以观察在 Amazon Managed Service for Apache Flink 中运行的应用程序,该应用程序将文件写入 Amazon S3。
要观察这些文件,请按照应用程序在本地运行时检查写入中文件的相同过程进行操作。请参阅 观察将数据写入 S3 存储桶的应用程序。
请记住,应用程序会在 Flink 检查点上写入新文件。在 Amazon Managed Service for Apache Flink 上运行时,检查点默认处于启用状态,每 60 秒运行一次。该应用程序大约每 1 分钟创建一次新文件。
停止应用程序
要停止应用程序,请转至名为 MyApplication 的 Managed Service for Apache Flink 应用程序的控制台页面。
停止应用程序
-
从操作下拉列表中,选择停止。
-
应用程序详细信息中的状态会从
Running转换到Stopping,然后在应用程序完全停止时转换到Ready。注意
请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。