Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行适用于 Python 的 Apache Flink 应用程序的托管服务
在本练习中,您将创建面向 Python 的 Managed Service for Apache Flink 应用程序,并将 Kinesis 流作为源和接收器。
本节包含以下步骤。
创建依赖资源
在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:
-
两个 Kinesis 流用于输入和输出。
-
存储应用程序代码和输出的 Amazon S3 存储桶 (
ka-app-code-
)<username>
创建两个 Kinesis 直播
在为本练习创建 Managed Service for Apache Flink 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStream
和ExampleOutputStream
)。您的应用程序将这些数据流用于应用程序源和目标流。
可以使用 Amazon Kinesis 控制台或以下 Amazon CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。
创建数据流 (Amazon CLI)
-
要创建第一个直播 (
ExampleInputStream
),请使用以下 Amazon Kinesis 命令create-stream
Amazon CLI 。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为
ExampleOutputStream
)。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
创建 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:
-
《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如
ka-app-code-
。<username>
其他资源
在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/MyApplication
的日志组。 -
名为
kinesis-analytics-log-stream
的日志流。
将样本记录写入输入流
在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。
注意
此部分需要 Amazon SDK for Python (Boto)
注意
本节中的 Python 脚本使用 Amazon CLI。您必须将您的配置 Amazon CLI 为使用您的账户凭证和默认区域。要配置您的 Amazon CLI,请输入以下内容:
aws configure
-
使用以下内容创建名为
stock.py
的文件:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
运行
stock.py
脚本:$ python stock.py
在完成本教程的其余部分时,请将脚本保持运行状态。
创建并检查 Apache Flink 流式传输 Python 代码
此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:
如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git
。 使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
导航到
amazon-kinesis-data-analytics-java-examples/python/GettingStarted
目录。
应用程序代码位于 getting_started.py
文件中。请注意有关应用程序代码的以下信息:
应用程序使用 Kinesis 表源从源流中进行读取。以下代码段调用该
create_table
函数来创建 Kinesis 表源:table_env.execute_sql( create_table(output_table_name, output_stream, output_region)
该
create_table
函数使用 SQL 命令创建由流式传输源支持的表:def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
应用程序创建两个表,然后将一个表的内容写入另一个表。
# 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
该应用程序使用来自 flink - sql-connector-kinesis _2.12/1.15.2 文件的 Flink
连接器。
向 Python 应用程序添加第三方依赖项
使用第三方 python 包(例如 boto3site-packages
文件夹,以创建如下所示的目录结构:
PythonPackages │ README.md │ python-packages.py │ └───my_deps └───boto3 │ │ session.py │ │ utils.py │ │ ... │ └───botocore │ │ args.py │ │ auth.py │ ... └───mynonpypimodule │ │ mymodulefile1.py │ │ mymodulefile2.py ... └───lib │ │ flink-sql-connector-kinesis-4.2.0-1.18.jar │ │ ... ...
要将 boto3 添加为第三方,请执行以下操作:
在本地计算机上创建具有所需依赖项的独立 Python 环境(conda 或类似环境)。
记下该环境
site_packages
文件夹中的初始软件包列表。您的应用程序所需的
pip-install
所有依赖项。记下在上述步骤 3 之后添加到该
site_packages
文件夹的软件包。这些是您需要包含在软件包中的文件夹(在my_deps
文件夹下),其组织方式如上所示。这将允许您捕获步骤 2 和步骤 3 之间的软件包差异,从而为您的应用程序确定正确的软件包依赖关系。在
kinesis.analytics.flink.run.options
属性组中my_deps/
作为pyFiles
属性的参数提供,如下jarfiles
属性所述。Flink 还允许您使用 add_python_file函数指定 Python 依赖关系,但请务必记住,只需要指定其中一个,而不是两者兼而有之。
注意
您不必为该文件夹my_deps
命名。重点是使用pyFiles
或add_python_file
注册依赖关系。可以在 PyfLink 中的如何使用 boto3
上传 Apache Flink 直播 Python 代码
在本节中,创建 Amazon S3 存储桶并上传应用程序代码。
要使用控制台上传应用程序代码,请执行以下操作:
使用您首选的压缩应用程序来压缩
getting-started.py
和 Flink SQL 连接器文件。为存档 myapp.zip
命名。如果您在存档中包含外部文件夹,则必须将其包含在路径中,并在配置文件中添加代码:GettingStarted/getting-started.py
。通过 https://console.aws.amazon.com/s3/
打开 Amazon S3 控制台。 -
选择 创建存储桶 。
-
在 存储桶名称 字段中输入
ka-app-code-
。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择 下一步。<username>
-
在配置选项步骤中,让设置保持原样,然后选择下一步。
-
在设置权限步骤中,让设置保持原样,然后选择下一步。
-
选择 创建存储桶 。
-
在 Amazon S3 控制台中,选择 ka-app-code-
<username>存储桶,然后选择上传。
-
在选择文件步骤中,选择添加文件。导航到您在上一步中创建的
myapp.zip
文件。选择 下一步。 -
您无需更改该对象的任何设置,因此,请选择 上传。
要使用 Amazon CLI上传应用程序代码:
注意
请勿使用 Finder (macOS) 或 Windows 资源管理器 (Windows) 中的压缩功能来创建myapp.zip
存档。这可能导致应用程序代码无效。
使用您首选的压缩应用程序来压缩
streaming-file-sink.py
和 Flink SQL 连接器文件。 注意
请勿使用 Finder (macOS) 或 Windows 资源管理器 (Windows) 中的压缩功能来创建 myapp.zip 存档。这可能导致应用程序代码无效。
使用您首选的压缩应用程序来压缩
getting-started.py
和 Flink SQL 连接器文件。为存档 myapp.zip
命名。如果您在存档中包含外部文件夹,则必须将其包含在路径中,并在配置文件中添加代码:GettingStarted/getting-started.py
。运行以下命令:
$ aws s3 --region
aws region
cp myapp.zip s3://ka-app-code-<username>
您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。
创建并运行适用于 Apache 的托管服务 Flink 应用程序
按照以下步骤,使用控制台创建、配置、更新和运行应用程序。
创建应用程序
打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink
-
在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序。
-
在Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:
-
对于 应用程序名称 ,输入
MyApplication
。 -
对于描述,输入
My java test app
。 -
对于运行时系统,请选择 Apache Flink。
-
将该版本保留为 Apache Flink 版本 1.18。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
选择创建应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesisanalytics-
MyApplication
-us-west-2
配置应用程序
请使用以下过程来配置应用程序。
配置应用程序
-
在MyApplication页面上,选择配置。
-
在 配置应用程序 页面上,提供 代码位置:
-
对于Amazon S3 存储桶,请输入
ka-app-code-
。<username>
-
在 Amazon S3 对象的路径中,输入
myapp.zip
。
-
-
在 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-west-2
。 -
在属性下面,选择添加组。
-
输入以下信息:
组 ID 键 值 consumer.config.0
input.stream.name
ExampleInputStream
consumer.config.0
aws.region
us-west-2
consumer.config.0
scan.stream.initpos
LATEST
选择保存。
在属性下面,再次选择添加组。
输入以下信息:
组 ID 键 值 producer.config.0
output.stream.name
ExampleOutputStream
producer.config.0
aws.region
us-west-2
producer.config.0
shard.count
1
在属性下面,再次选择添加组。对于 组 ID,输入
flink.sql.connector.kinesis.options
。这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅 指定您的代码文件。输入以下信息:
组 ID 键 值 kinesis.analytics.flink.run.options
python
getting-started.py
kinesis.analytics.flink.run.options
jarfile
flink-sql-connector-kinesis-4.2.0-1.18.jar
-
在 监控 下,确保 监控指标级别 设置为 应用程序。
-
要进行CloudWatch 日志记录,请选中 “启用” 复选框。
-
选择更新。
注意
当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication
-
日志流:
kinesis-analytics-log-stream
编辑 IAM 政策
编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。
编辑 IAM policy 以添加 S3 存储桶权限
通过 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-west-2
策略。 -
在 摘要 页面上,选择 编辑策略。选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901
) 替换为您的账户 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
运行应用程序
可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。
停止应用程序
要停止应用程序,请在MyApplication页面上选择停止。确认该操作。