Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:将 EFO 使用者与 Kinesis 数据流配合使用
在本练习中,您将创建一个Managed Service for Apache Flink的应用程序,该应用程序使用增强型扇出 (EFO) 使用者从 Kinesis 数据流中读取。如果 Kinesis 使用者使用 EFO,则 Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让其与从流中读取数据的其他使用者共享流的固定带宽。
有关在 Kinesis 使用者上使用 EFO 的更多信息,请参阅 FLIP-128:Kinesis 使用者的增强型扇出功能
您在本示例中创建的应用程序使用 Amazon Kinesis Connector (flink-connector-kinesis) 1.15.3。
注意
要为本练习设置所需的先决条件,请先完成入门指南 (DataStream API)练习。
本主题包含下列部分:
创建相关资源
在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:
两个 Kinesis 数据流(
ExampleInputStream
和ExampleOutputStream
)。存储应用程序代码 (
ka-app-code-
) 的 Amazon S3 存储桶<username>
您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:
Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。将数据流命名为
ExampleInputStream
和ExampleOutputStream
。Amazon Simple Storage Service 用户指南中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如
ka-app-code-
。<username>
将示例记录写入输入流
在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。
注意
此部分需要 Amazon SDK for Python (Boto)
-
使用以下内容创建名为
stock.py
的文件:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
运行
stock.py
脚本:$ python stock.py
在完成本教程的其余部分时,请将脚本保持运行状态。
下载并检查应用程序代码
此示例的 Java 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
导航到
amazon-kinesis-data-analytics-java-examples/EfoConsumer
目录。
应用程序代码位于 EfoApplication.java
文件中。请注意有关应用程序代码的以下信息:
您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者:
RECORD_PUBLISHER_TYPE:将此参数设置为 EFO,让您的应用程序使用 EFO 使用者访问 Kinesis 数据流数据。
EFO_CONSUMER_NAME:将此参数设置为该流使用者中的唯一字符串值。在同一 Kinesis 数据流中重复使用使用者名称,会导致之前使用该名称的使用者被终止。
以下代码示例演示如何为使用者配置属性赋值,以便使用 EFO 使用者从源流中读取:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
编译应用程序代码
要编译应用程序,请执行以下操作:
如果还没有 Java 和 Maven,请安装它们。有关更多信息,请参阅入门指南 (DataStream API)教程中的先决条件。
使用以下命令编译应用程序:
mvn package -Dflink.version=1.15.3
注意
提供的源代码依赖于 Java 11 中的库。
编译应用程序将创建应用程序 JAR 文件 (target/aws-kinesis-analytics-java-apps-1.0.jar
)。
上传 Apache Flink 流式处理 Java 代码
在本节中,您将应用程序代码上传到在创建相关资源一节中创建的 Amazon S3 存储桶。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,然后选择上传。
-
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
aws-kinesis-analytics-java-apps-1.0.jar
文件。 您无需更改该对象的任何设置,因此,请选择上传。
您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。
创建并运行 Managed Service for Apache Flink
按照以下步骤,使用控制台创建、配置、更新和运行应用程序。
创建应用程序
打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink
-
在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序。
-
在Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 应用程序名称 ,输入
MyApplication
。 -
对于 运行时,请选择 Apache Flink。
注意
Managed Service for Apache Flink 使用 Apache Flink 版本 1.15.2。
将版本下拉列表保留为 Apache Flink 版本 1.15.2(建议的版本)。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
选择创建应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesisanalytics-
MyApplication
-us-west-2
编辑 IAM policy
编辑 IAM policy 以添加访问 Kinesis 数据流的权限。
通过 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-west-2
策略。 -
在 摘要 页面上,选择 编辑策略。选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901
) 替换为您的账户 ID。注意
这些权限使应用程序能够访问 EFO 使用者。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/aws-kinesis-analytics-java-apps-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "AllStreams", "Effect": "Allow", "Action": [ "kinesis:ListShards", "kinesis:ListStreamConsumers", "kinesis:DescribeStreamSummary" ], "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/*" }, { "Sid": "Stream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:RegisterStreamConsumer", "kinesis:DeregisterStreamConsumer" ], "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }, { "Sid": "Consumer", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream/consumer/my-efo-flink-app", "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream/consumer/my-efo-flink-app:*" ] }
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 配置应用程序 页面上,提供 代码位置:
-
对于Amazon S3 存储桶,请输入
ka-app-code-
。<username>
-
在 Amazon S3 对象的路径中,输入
aws-kinesis-analytics-java-apps-1.0.jar
。
-
-
在 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
在属性下,选择创建组。
-
输入以下应用程序属性和值:
组 ID 键 值 ConsumerConfigProperties
flink.stream.recordpublisher
EFO
ConsumerConfigProperties
flink.stream.efo.consumername
basic-efo-flink-app
ConsumerConfigProperties
INPUT_STREAM
ExampleInputStream
ConsumerConfigProperties
flink.inputstream.initpos
LATEST
ConsumerConfigProperties
AWS_REGION
us-west-2
-
在属性下,选择创建组。
-
输入以下应用程序属性和值:
组 ID 键 值 ProducerConfigProperties
OUTPUT_STREAM
ExampleOutputStream
ProducerConfigProperties
AWS_REGION
us-west-2
ProducerConfigProperties
AggregationEnabled
false
-
在 监控 下,确保 监控指标级别 设置为 应用程序。
-
要进行CloudWatch 日志记录,请选中 “启用” 复选框。
-
选择更新。
注意
当您选择启用 CloudWatch 日志记录时,适用于 Apache Flink 的托管服务会为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication
-
日志流:
kinesis-analytics-log-stream
该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。
运行应用程序
可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。
您可以在 CloudWatch 控制台上查看托管服务的 Apache Flink 指标,以验证应用程序是否正常运行。
您也可以在 Kinesis Data Streams 控制台的数据流的 “增强扇出” 选项卡中查看使用者的姓名()。basic-efo-flink-app
清理 Amazon 资源
本节包含清理在 efo 窗口教程中创建的 Amazon 资源的相关步骤。
本主题包含下列部分:
删除 Managed Service for Apache Flink 应用程序
打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink
在 Apache Flink 的托管服务面板中,选择。MyApplication
在应用程序的页面中,选择 删除,然后确认删除。
删除您的 Kinesis 数据流
打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis
。 在 Kinesis Data Streams 面板中,ExampleInputStream选择。
在该ExampleInputStream页面中,选择 “删除 Kinesis Stream”,然后确认删除。
在 Kinesis 直播页面中 ExampleOutputStream,选择,选择操作,选择删除,然后确认删除。
删除您的 Amazon S3 对象和存储桶
打开 Amazon S3 控制台,网址为:https://console.aws.amazon.com/s3/
。 选择 ka-app-code-
存储桶。 <username>
选择 删除,然后输入存储桶名称以确认删除。
删除您的 IAM 资源
通过 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 在导航栏中,选择策略。
在筛选条件控件中,输入 kinesis。
选择 kinesis-analytics-service--us-MyApplication west-2 策略。
选择 策略操作,然后选择 删除。
在导航栏中,选择 角色。
选择 k inesis-analytics-us-west-2 角色MyApplication。
选择 删除角色,然后确认删除。
删除您的 CloudWatch 资源
打开 CloudWatch 控制台,网址为 https://console.aws.amazon.com/cloudwatch/
。 在导航栏中,选择 日志。
选择 /aws/kinesis-analytics/ 日志组MyApplication。
选择 删除日志组,然后确认删除。