Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。
创建并运行面向 Python 应用程序的 Managed Service for Apache Flink
在本节中,您将创建面向 Python 的 Managed Service for Apache Flink 应用程序,并将 Kinesis 流作为源和接收器。
本节包含以下步骤。
创建相关资源
在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:
-
两个 Kinesis 流用于输入和输出。
-
存储应用程序代码的 Amazon S3 存储桶。
注意
本教程假设您在 us-east-1 区域中部署应用程序。如果您使用其他区域,则必须相应地调整所有步骤。
创建两个 Kinesis 流
在为本练习创建 Managed Service for Apache Flink 应用程序之前,请在要用于部署应用程序的同一区域(本示例中的 us-east-1)中创建两个 Kinesis 数据流(ExampleInputStream 和 ExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。
可以使用 Amazon Kinesis 控制台或以下 Amazon CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。
创建数据流 (Amazon CLI)
-
要创建第一个流 (
ExampleInputStream),请使用以下 Amazon Kinesiscreate-streamAmazon CLI命令。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 -
要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为
ExampleOutputStream)。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
创建 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:
-
《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称。
注意
请确保在用于本教程的区域(us-east-1)中创建 S3 存储桶。
其他资源
在您创建应用程序时,Managed Service for Apache Flink 会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/<my-application>的日志组。 -
名为
kinesis-analytics-log-stream的日志流。
设置本地开发环境
为进行开发和调试,可以在自己的计算机上运行 Python Flink 应用程序。您可以使用 python
main.py 或所选的 Python IDE 从命令行中启动应用程序。
注意
在您的开发计算机上,必须安装 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我们建议您使用像 PyCharm
安装 PyFlink 库
要开发您的应用程序并在本地运行它,您必须安装 Flink Python 库。
-
使用 VirtualenV、Conda 或任何类似的 Python 工具创建独立的 Python 环境。
-
在该环境中安装 PyFlink 库。使用与 Amazon Managed Service for Apache Flink 中所使用相同的 Apache Flink 运行时版本。目前,建议的运行时版本为 1.19.1。
$ pip install apache-flink==1.19.1 -
运行应用程序时,请确保环境处于激活状态。如果在 IDE 中运行应用程序,请确保 IDE 使用该环境作为运行时。该过程取决于您使用的 IDE。
注意
您只需要安装 PyFlink 库。您无需在计算机上安装 Apache Flink 集群。
对您的 Amazon 会话进行身份验证
该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有经过身份验证的有效 Amazon 会话,并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证:
-
如果您没有 Amazon CLI 和已配置有效凭证的命名配置文件,请参阅 设置 Amazon Command Line Interface (Amazon CLI)。
-
通过发布以下测试记录,验证您的 Amazon CLI 是否正确配置,并且您的用户有权写入 Kinesis 数据流:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST -
如果您的 IDE 有集成 Amazon 的插件,则可以使用该插件将凭证传递给 IDE 中运行的应用程序。有关更多信息,请参阅适用于 PyCharm 的 Amazon Toolkit
、适用于 Visual Studio Code 的 Amazon Toolkit 和适用于 IntelliJ IDEA 的 Amazon Toolkit 。
下载并检查 Apache Flink 流式处理 Python 代码
在 GitHub 中提供了该示例的 Python 应用程序代码。要下载应用程序代码,请执行以下操作:
-
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
导航到
./python/GettingStarted目录。
审核应用程序组件
应用程序代码位于 main.py 中。我们使用 Python 中嵌入的 SQL 来定义应用程序的流程。
注意
为优化开发人员体验,该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行,以便在您的计算机上进行开发。应用程序使用环境变量 IS_LOCAL =
true 来检测其何时在本地运行。必须在 Shell 上或 IDE 的运行配置中设置环境变量 IS_LOCAL = true。
-
应用程序设置执行环境并读取运行时配置。要在 Amazon Managed Service for Apache Flink 上和本地运行,应用程序会检查
IS_LOCAL变量。-
以下是应用程序在 Amazon Managed Service for Apache Flink 中运行时的默认行为:
-
加载随应用程序打包的依赖项。有关更多信息,请参阅(LINK)
-
从在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载配置。有关更多信息,请参阅(LINK)
-
-
如果在本地运行应用程序,则在应用程序检测
IS_LOCAL = true时:-
从项目加载外部依赖项。
-
从项目中包含的
application_properties.json文件加载配置。... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
应用程序使用 Kinesis 连接器
定义带有 CREATE TABLE语句的源表。此表从输入 Kinesis 流中读取数据。应用程序从运行时配置中获取流的名称、区域和初始位置。table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """) -
在此示例中,应用程序还使用 Kinesis 连接器
定义接收表。此表将数据发送到输出 Kinesis 流。 table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""") -
最后,应用程序执行 SQL,该 SQL 从源表中
INSERT INTO...接收器表。在更复杂的应用程序中,在写入接收器之前,您可能还要执行其他步骤来转换数据。table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""") -
您必须在
main()函数末尾再添加一个步骤,才能在本地运行应用程序:if is_local: table_result.wait()如果不使用此语句,则当您在本地运行应用程序时,它会立即终止。在 Amazon Managed Service for Apache Flink 中运行应用程序时,不得执行此语句。
管理 JAR 依赖项
PyFlink 应用程序通常需要一个或多个连接器。本教程中的应用程序使用 Kinesis 连接器
在此示例中,我们展示如何使用 Apache Maven 获取依赖项并打包应用程序以在 Managed Service for Apache Flink 上运行。
注意
还可使用其他方法获取和打包依赖项。此示例演示正确适用于一个或多个连接器的方法。它还允许您在本地运行应用程序以进行开发,也可以在 Managed Service for Apache Flink 上运行应用程序,而无需更改代码。
使用 pom.xml 文件
Apache Maven 使用 pom.xml 文件来控制依赖项和应用程序打包。
所有 JAR 依赖项均在 pom.xml 文件的 <dependencies>...</dependencies> 块中指定。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
要查找要使用的正确构件和连接器版本,请参阅 将 Apache Flink 连接器与 Managed Service for Apache Flink 一起使用。请务必参考正在使用的 Apache Flink 版本。在此示例中,我们使用的是 Kinesis 连接器。对于 Apache Flink 1.19,连接器版本为 4.3.0-1.19。
注意
如果您使用的是 Apache Flink 1.19,则没有专门为此版本发布的连接器版本。使用为 1.18 版本发布的连接器。
下载和打包依赖项
使用 Maven 下载 pom.xml 文件中定义的依赖项,然后将其打包给 Python Flink 应用程序。
-
导航到包含 Python 入门项目(名为
python/GettingStarted)的目录。 -
运行以下命令:
$ mvn package
Maven 创建名为 ./target/pyflink-dependencies.jar 的新文件。当您在计算机上进行本地开发时,Python 应用程序会查找此文件。
注意
如果您忘记运行此命令,则在尝试运行应用程序时,它将失败并显示错误:找不到标识符 "kinesis" 的任何工厂。
将示例记录写入输入流
在本节中,使用 Python 脚本将示例记录写入流,以供应用程序处理。您可以通过两种方式生成示例数据,即使用 Python 脚本或使用 Kinesis Data Generator
使用 Python 脚本生成示例数据
您可以使用 Python 脚本将示例记录发送到数据流。
注意
要运行此 Python 脚本,必须使用 Python 3.x 并安装适用于 Python 的 Amazon SDK(Boto)
要开始向 Kinesis 输入流发送测试数据,请执行以下操作:
-
从数据生成器 GitHub 存储库
下载数据生成器 stock.pyPython 脚本。 -
运行
stock.py脚本:$ python stock.py
在完成本教程的其余部分时,请将脚本保持运行状态。您现在可以运行 Apache Flink 应用程序。
使用 Kinesis Data Generator 生成示例数据
除了使用 Python 脚本之外,还可以使用 Kinesis Data Generator
要设置和运行 Kinesis Data Generator,请执行以下操作:
-
按照 Kinesis Data Generator 文档
中的说明设置该工具的访问权限。您将运行用于设置用户和密码的 Amazon CloudFormation 模板。 -
通过 CloudFormation 模板生成的 URL 访问 Kinesis Data Generator。完成 CloudFormation 模板后,您可以在输出选项卡中找到该 URL。
-
配置数据生成器:
-
区域:选择您在本教程中使用的区域:us-east-1
-
流/传输流:选择应用程序将使用的输入流:
ExampleInputStream -
每秒记录数:100
-
记录模板:复制并粘贴以下模板:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
测试模板:选择测试模板并验证生成的记录是否与以下内容类似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 } -
启动数据生成器:选择选择发送数据。
Kinesis Data Generator 现在向 ExampleInputStream 发送数据。
在本地运行应用程序
可以在本地测试应用程序,并且使用 python main.py 从命令行或从 IDE 运行该应用程序。
要在本地运行应用程序,必须安装正确版本的 PyFLink 库,如上一节所述。有关更多信息,请参阅(LINK)
注意
在继续之前,请确认输入和输出流是否可用。请参阅 创建两个 Amazon Kinesis 数据流。此外,请确认您是否具有从两个流中读取和写入的权限。请参阅 对您的 Amazon 会话进行身份验证。
将 Python 项目导入您的 IDE
要开始在 IDE 中处理应用程序,必须将其作为 Python 项目导入。
您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将 ./python/GettingStarted 子目录中的内容导入 IDE。
将代码作为现有 Python 项目导入。
注意
导入新 Python 项目的确切过程因所使用的 IDE 而异。
检查本地应用程序配置
在本地运行时,应用程序使用 ./src/main/resources 下项目资源文件夹中的 application_properties.json 文件内的配置。您可以编辑此文件以使用不同的 Kinesis 流名称或区域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
在本地运行 Python 应用程序
可以在本地运行应用程序,可以从命令行中作为常规 Python 脚本运行,也可以从 IDE 中运行。
要运行应用程序,请从命令行运行
-
确保在其中安装 Python Flink 库的 Conda 或 VirtuaLenv 等独立 Python 环境当前处于激活状态。
-
确保至少运行一次
mvn package。 -
设置
IS_LOCAL = true环境变量:$ export IS_LOCAL=true -
将该应用程序作为常规 Python 脚本运行。
$python main.py
从 IDE 中运行应用程序
-
使用以下配置将 IDE 配置为运行
main.py脚本:-
使用独立的 Python 环境,例如在其中安装 PyfLink 库的 Conda 或 VirtualenV。
-
使用 Amazon 凭证访问输入和输出 Kinesis 数据流。
-
设置
IS_LOCAL = true。
-
-
设置运行配置的确切过程取决于您的 IDE,并且会有所不同。
-
设置 IDE 后,运行 Python 脚本,并在应用程序运行时使用 IDE 提供的工具。
在本地检查应用程序日志
在本地运行时,除了在应用程序启动时打印和显示的几行之外,应用程序不会在控制台中显示任何日志。PyfLink 将日志写入安装 Python Flink 库的目录中的一个文件。应用程序启动时会打印日志的位置。还可以运行以下命令来查找日志:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
列出日志目录中的文件。通常会找到单个的
.log文件。 -
在应用程序运行时追踪文件:
tail -f <log-path>/<log-file>.log。
观察 Kinesis 流中的输入和输出数据
您可以使用 Amazon Kinesis 控制台中的数据查看器观察由(生成示例 Python)或 Kinesis Data Generator(链接)发送到输入流的记录。
要观察记录,请执行以下操作:
停止在本地运行的应用程序
停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于 IDE。
打包应用程序代码
在本节中,您会使用 Apache Maven 将应用程序代码和所有必需的依赖项打包到一个 .zip 文件中。
再次运行 Maven 打包命令:
$ mvn package
此命令会生成文件 target/managed-flink-pyflink-getting-started-1.0.0.zip。
将应用程序包上传到 Amazon S3 存储桶
在本节中,您要将在上一节中创建的 .zip 文件上传到在本教程开始时创建的 Amazon Simple Storage Service(Amazon S3)存储桶中。如果您尚未完成此步骤,请参阅(链接)。
上传应用程序代码 JAR 文件
通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/
。 -
选择您之前为应用程序代码创建的存储桶。
-
选择上传。
-
选择添加文件。
-
导航到上一步中生成的 .zip 文件:
target/managed-flink-pyflink-getting-started-1.0.0.zip。 -
在不更改任何其他设置的情况下选择上传。
创建并配置 Managed Service for Apache Flink 应用程序
您可以使用控制台或 Amazon CLI 创建和配置 Managed Service for Apache Flink 应用程序。在本教程中,我们将使用控制台。
创建应用程序
登录 Amazon Web Services 管理控制台 并打开 Amazon MSF 控制台,网址为 https://console.aws.amazon.com/flink。
-
确认选择正确的区域:美国东部(弗吉尼亚北部)us-east-1。
-
打开右侧的菜单,选择 Apache Flink 应用程序,然后选择创建流应用程序。或者,从初始页面的入门部分中选择创建流应用程序。
-
在创建流应用程序页面上:
-
在选择设置流处理应用程序的方法中,选择从头开始创建。
-
对于 Apache Flink 配置,应用程序 Flink 版本,请选择 Apache Flink 1.19。
-
对于应用程序配置:
-
对于 应用程序名称 ,输入
MyApplication。 -
对于描述,输入
My Python test app。 -
在对应用程序资源的访问权限中,选择创建/更新具有所需策略的 IAM 角色 Kinesis-analytics-myApplication-us-east-1。
-
-
对于应用程序模板设置:
-
对于模板,选择开发。
-
-
选择创建流应用程序。
-
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesisanalytics-MyApplication-us-west-2
Amazon Managed Service for Apache Flink 之前称为 Kinesis Data Analytics。为实现向后兼容,自动创建的资源名称前缀为 kinesis-analytics。
编辑 IAM 策略
编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。
编辑 IAM policy 以添加 S3 存储桶权限
通过 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-east-1策略。 -
选择编辑,然后选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901) 替换为您的账户 ID。 -
选择下一步,然后选择保存更改。
配置应用程序
编辑应用程序配置以设置应用程序代码构件。
配置应用程序
-
在我的应用程序 页面上,选择配置。
-
在应用程序代码位置部分中:
-
对于 Amazon S3 存储桶,请选择之前为应用程序代码创建的存储桶。选择浏览并选择正确的存储桶,然后选择选择。请勿在存储桶名称上选择。
-
在 Amazon S3 对象的路径中,输入
managed-flink-pyflink-getting-started-1.0.0.zip。
-
-
对于访问权限,请选择创建/更新具有必要策略的 IAM 角色
kinesis-analytics-MyApplication-us-east-1。 -
移至运行时属性,并保留所有其他设置的默认值。
-
选择添加新项目并添加以下每个参数:
组 ID 键 值 InputStream0stream.nameExampleInputStreamInputStream0flink.stream.initposLATESTInputStream0aws.regionus-east-1OutputStream0stream.nameExampleOutputStreamOutputStream0aws.regionus-east-1kinesis.analytics.flink.run.optionspythonmain.pykinesis.analytics.flink.run.optionsjarfilelib/pyflink-dependencies.jar -
请勿修改任何其他部分,然后选择保存更改。
注意
在选择启用 Amazon CloudWatch 日志记录时,Managed Service for Apache Flink 将为您创建日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
运行应用程序
应用程序现已完成配置,准备好运行。
运行应用程序
-
在 Amazon Managed Service for Apache Flink 的控制台上,选择我的应用程序,然后选择运行。
-
在下一页的“应用程序还原配置”页面上,选择使用最新快照运行,然后选择运行。
应用程序详细信息中的状态会从
Ready转换到Starting,然后在应用程序启动时转换到Running。
当应用程序处于 Running 状态时,您现在可以打开 Flink 控制面板。
打开控制面板
-
选择打开 Apache Flink 控制面板。控制面板将在新页面上打开。
-
在正在运行的作业列表中,选择您可以看到的单个作业。
注意
如果您错误设置运行时属性或编辑 IAM 策略,则应用程序状态可能会变为
Running,但是 Flink 控制面板显示任务正在持续重新启动。如果应用程序配置错误或缺乏访问外部资源的权限,则通常会出现这种故障场景。发生这种情况时,请检查 Flink 控制面板中的异常选项卡以查看问题的原因。
观察运行中应用程序的指标
在我的应用程序页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序的一些基本指标。
查看指标
-
在刷新按钮旁边,从下拉列表中选择 10 秒。
-
当应用程序运行且运行状况良好时,您可以看到正常运行时间指标不断增加。
-
完全重新启动指标应为零。如果该指标增加,则配置可能存在问题。要调查问题,请审查 Flink 控制面板上的异常选项卡。
-
在运行状况良好的应用程序中,失败的检查点数指标应为零。
注意
此控制面板显示一组固定的指标,且粒度为 5 分钟。您可以在 CloudWatch 控制面板中创建包含任何指标的自定义应用程序控制面板。
观察 Kinesis 流中的输出数据
确保您仍在使用 Python 脚本或 Kinesis Data Generator 将数据发布到输入中。
现在,您可以使用 https://console.aws.amazon.com/kinesis/
查看输出
打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis
。 -
确认该区域与您运行本教程时使用的区域相同。默认情况下,区域为 US-East-1US 东部(弗吉尼亚北部)。如有必要,可以更改区域。
-
选择数据流。
-
选择您要观察的流。在本教程中,请使用
ExampleOutputStream。 -
选择数据查看器选项卡。
-
选择任意分片,保持最新作为起始位置,然后选择获取记录。您可能会看到“未找到该请求的记录”错误。如果看到此错误,请选择重试获取记录。发布到流显示的最新记录。
-
在“数据”列中选择值以检查 JSON 格式的记录内容。
停止应用程序
要停止应用程序,请转至名为 MyApplication 的 Managed Service for Apache Flink 应用程序的控制台页面。
停止应用程序
-
从操作下拉列表中,选择停止。
-
应用程序详细信息中的状态会从
Running转换到Stopping,然后在应用程序完全停止时转换到Ready。注意
请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。