Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。
使用 Apache Beam 创建应用程序
在本练习中,您将创建一个 Managed Service for Apache Flink,该应用程序使用 Apache Beam 转换数据。
注意
要为本练习设置所需的先决条件,请先完成教程:开始在 Managed Service for Apache Flink 中使用 DataStream API练习。
本主题包含下列部分:
创建相关资源
在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:
两个 Kinesis 数据流(
ExampleInputStream和ExampleOutputStream)。存储应用程序代码 (
ka-app-code-) 的 Amazon S3 存储桶<username>
您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:
Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。将数据流命名为
ExampleInputStream和ExampleOutputStream。Amazon Simple Storage Service 用户指南中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如
ka-app-code-。<username>
将示例记录写入输入流
在本节中,您使用 Python 脚本将随机字符串写入流,以供应用程序处理。
注意
此部分需要 Amazon SDK for Python (Boto)
-
使用以下内容创建名为
ping.py的文件:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey") -
运行
ping.py脚本:$ python ping.py在完成本教程的其余部分时,请将脚本保持运行状态。
下载并检查应用程序代码
在 GitHub 中提供了该示例的 Java 应用程序代码。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git导航到
amazon-kinesis-data-analytics-java-examples/Beam目录。
应用程序代码位于 BasicBeamStreamingJob.java 文件中。请注意有关应用程序代码的以下信息:
该应用程序使用 Apache Beam ParDo
通过调用名为 PingPongFn的自定义转换函数来处理传入的记录。调用该
PingPongFn函数的代码如下:.apply("Pong transform", ParDo.of(new PingPongFn())使用 Apache Beam 的 Managed Service for Apache Flink 应用程序需要以下组件。如果您未在中包含这些组件和版本
pom.xml,则您的应用程序会从环境依赖项中加载错误的版本,并且由于版本不匹配,您的应用程序将在运行时崩溃。<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>除非输入数据是 ping,
PingPongFn否则转换函数会将输入数据传递到输出流,在这种情况下,它会向输出流发出字符串 pong\ n。变换函数的代码如下:
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
编译应用程序代码
要编译应用程序,请执行以下操作:
如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅完成所需的先决条件教程中的教程:开始在 Managed Service for Apache Flink 中使用 DataStream API。
使用以下命令编译应用程序:
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8注意
提供的源代码依赖于 Java 11 中的库。
编译应用程序将创建应用程序 JAR 文件 (target/basic-beam-app-1.0.jar)。
上传 Apache Flink 流式处理 Java 代码
在本节中,您将应用程序代码上传到在创建相关资源一节中创建的 Amazon S3 存储桶。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<用户名>存储桶,然后选择上传。 -
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
basic-beam-app-1.0.jar文件。 您无需更改该对象的任何设置,因此,请选择上传。
您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。
创建并运行适用于 Apache Flink 的托管服务
按照以下步骤,使用控制台创建、配置、更新和运行应用程序。
创建应用程序
登录 Amazon Web Services 管理控制台 并打开 Amazon MSF 控制台,网址为 https://console.aws.amazon.com/flink。
-
在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序。
-
在Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 应用程序名称 ,输入
MyApplication。 -
对于运行时系统,请选择 Apache Flink。
注意
Apache Beam 目前与 Apache Flink 版本 1.19 或更高版本不兼容。
从版本下拉列表中选择 Apache Flink 版本 1.15。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2。 -
选择创建应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesis-analytics-MyApplication-us-west-2
编辑 IAM 策略
编辑 IAM policy 以添加访问 Kinesis 数据流的权限。
通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。
-
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-west-2策略。 -
在 摘要 页面上,选择 编辑策略。选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901) 替换为您的账户 ID。
配置应用程序
-
在我的应用程序 页面上,选择配置。
-
在 配置应用程序 页面上,提供 代码位置:
-
对于Amazon S3 存储桶,请输入
ka-app-code-。<username> -
在 Amazon S3 对象的路径中,输入
basic-beam-app-1.0.jar。
-
-
在 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2。 -
输入以下信息:
组 ID 键 值 BeamApplicationPropertiesInputStreamNameExampleInputStreamBeamApplicationPropertiesOutputStreamNameExampleOutputStreamBeamApplicationPropertiesAwsRegionus-west-2 -
在 监控 下,确保 监控指标级别 设置为 应用程序。
-
对于 CloudWatch 日志记录,选中启用复选框。
-
选择更新。
注意
在选择启用 CloudWatch 日志记录时,适用于 Apache Flink 的托管服务将为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。
运行应用程序
可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。
您可以在 CloudWatch 控制台上查看Managed Service for Apache Flink的指标,以验证应用程序是否正常运行。
清理 Amazon资源
本节包含清理在滚动窗口教程中创建的 Amazon 资源的过程。
本主题包含下列部分:
删除 Managed Service for Apache Flink 应用程序
登录 Amazon Web Services 管理控制台 并打开 Amazon MSF 控制台,网址为 https://console.aws.amazon.com/flink。
在 Managed Service for Apache Flink 面板中,选择我的应用程序。
在应用程序的页面中,选择 删除,然后确认删除。
删除您的 Kinesis 数据流
打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis。
在 Kinesis Data Streams 面板中,选择ExampleInputStream 。
在 ExampleInputStream 页面中,选择 删除 Kinesis 流,然后确认删除。
在 Kinesis 流页面中,选择 ExampleOutputStream,选择操作,选择删除,然后确认删除。
删除您的 Amazon S3 对象和存储桶
通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/。
选择 ka-app-code-
<用户名>存储桶。选择 删除,然后输入存储桶名称以确认删除。
删除您的 IAM 资源
通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。
在导航栏中,选择策略。
在筛选条件控件中,输入 kinesis。
选择 kinesis-analytics-service-MyApplication-us-west-2 策略。
选择 策略操作,然后选择 删除。
在导航栏中,选择 角色。
选择 kinesis-analytics-MyApplication-us-west-2 角色。
选择 删除角色,然后确认删除。
删除您的 CloudWatch 资源
通过 https://console.aws.amazon.com/cloudwatch/ 打开 CloudWatch 控制台。
在导航栏中,选择 日志。
选择 /aws/kinesis-analytics/MyApplication 日志组。
选择 删除日志组,然后确认删除。
后续步骤
现在,您已经创建并运行了基本的 Managed Service for Apache Flink应用程序,该应用程序使用 Apache Beam 转换数据,有关更高级的 Managed Service for Apache Flink解决方案的示例,请参阅以下应用程序。
Managed Service for Apache Flink 流式研讨会上的 Beam
:在本研讨会中,我们将探讨一个端到端的示例,该示例将批处理和流媒体方面结合在一个统一的 Apache Beam 管道中。