创建并运行面向 Python 应用程序的 Managed Service for Apache Flink - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建并运行面向 Python 应用程序的 Managed Service for Apache Flink

在本练习中,您将创建面向 Python 的 Managed Service for Apache Flink 应用程序,并将 Kinesis 流作为源和接收器。

创建相关资源

在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:

  • 两个 Kinesis 流用于输入和输出。

  • 存储应用程序代码和输出的 Amazon S3 存储桶 (ka-app-code-<username>)

创建两个 Kinesis 流

在为本练习创建 Managed Service for Apache Flink 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。

可以使用 Amazon Kinesis 控制台或以下 Amazon CLI 命令创建这些流。有关控制台说明,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流

创建数据流 (Amazon CLI)
  1. 要创建第一个流 (ExampleInputStream),请使用以下 Amazon Kinesis create-streamAmazon CLI命令。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

创建 Amazon S3 存储桶

您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:

  • 《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如 ka-app-code-<username>

其他资源

在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):

  • 名为 /AWS/KinesisAnalytics-java/MyApplication 的日志组。

  • 名为 kinesis-analytics-log-stream 的日志流。

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

注意

本节中的 Python 脚本使用Amazon CLI。您必须将您的配置Amazon CLI为使用您的账户凭证和默认区域。要配置您的 Amazon CLI,请输入以下内容:

aws configure
  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

创建并检查 Apache Flink 流式处理 Python 代码

此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 导航到 amazon-kinesis-data-analytics-java-examples/python/GettingStarted 目录。

应用程序代码位于 getting_started.py 文件中。请注意有关应用程序代码的以下信息:

  • 应用程序使用 Kinesis 表源从源流中进行读取。以下代码段调用该 create_table 函数来创建 Kinesis 表源:

    table_env.execute_sql( create_table(output_table_name, output_stream, output_region)

    create_table函数使用 SQL 命令创建由流式传输源支持的表:

    def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
  • 应用程序创建两个表,然后将一个表的内容写入另一个表。

    # 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
  • 该应用程序使用来自 flink - sql-connector-kinesis _2.12/1.15.2 文件的 Flink 连接器。

向 Python 应用程序添加第三方依赖项

使用第三方 python 包(例如 boto3)时,需要添加它们的传递依赖关系和定位这些依赖关系所需的属性。简而言之,对于 PyPi 依赖关系,您可以复制位于 python 环境文件夹中的文件和site-packages文件夹,以创建如下所示的目录结构:

PythonPackages │ README.md │ python-packages.py │ └───my_deps └───boto3 │ │ session.py │ │ utils.py │ │ ... │ └───botocore │ │ args.py │ │ auth.py │ ... └───mynonpypimodule │ │ mymodulefile1.py │ │ mymodulefile2.py ... └───lib │ │ flink-sql-connector-kinesis-1.15.2.jar │ │ ... ...

要将 boto3 添加为第三方,请执行以下操作:

  1. 在本地计算机上创建具有所需依赖项的独立 Python 环境(conda 或类似环境)。

  2. 记下该环境site_packages文件夹中的初始软件包列表。

  3. 您的应用程序所需的pip-install所有依赖项。

  4. 记下在上述步骤 3 之后添加到该site_packages文件夹的软件包。这些是您需要包含在软件包中的文件夹(在my_deps文件夹下),其组织方式如上所示。这将允许您捕获步骤 2 和步骤 3 之间的软件包差异,从而为您的应用程序确定正确的软件包依赖关系。

  5. kinesis.analytics.flink.run.options属性组中my_deps/作为pyFiles属性的参数提供,如下jarfiles属性所述。Flink 还允许您使用 add_python_file 函数指定 Python 依赖关系,但请务必记住,只需要指定其中一个,而不是两者兼而有之。

注意

您不必为该文件夹my_deps命名。重点是使用pyFilesadd_python_file注册依赖关系。可以在 PyfLink 中的如何使用 boto3 找到一个示例。

上传 Apache Flink 流式处理 Python 代码

在本节中,创建 Amazon S3 存储桶并上传应用程序代码。

要使用控制台上传应用程序代码,请执行以下操作:
  1. 使用你首选的压缩应用程序压缩getting-started.pyhttps://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis _2.12/1. 15.2 文件。为存档myapp.zip命名。如果您在存档中包含外部文件夹,则必须将其包含在路径中,并在配置文件中添加代码:GettingStarted/getting-started.py

  2. 通过 https://console.aws.amazon.com/s3/ 打开 Amazon S3 控制台。

  3. 选择 创建存储桶

  4. 存储桶名称 字段中输入 ka-app-code-<username>。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择 下一步

  5. 配置选项步骤中,让设置保持原样,然后选择下一步

  6. 设置权限步骤中,让设置保持原样,然后选择下一步

  7. 选择 创建存储桶

  8. 在 Amazon S3 控制台中,选择 ka-app-code- <username>存储桶,然后选择上传

  9. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 myapp.zip 文件。选择 下一步

  10. 您无需更改该对象的任何设置,因此,请选择 上传

要使用Amazon CLI上传应用程序代码:
注意

请勿使用 Finder (macOS) 或 Windows 资源管理器 (Windows) 中的压缩功能来创建myapp.zip存档。这可能导致应用程序代码无效。

  1. 使用你首选的压缩应用程序压缩streaming-file-sink.pyhttps://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis _2.12/1. 15.2 文件。

    注意

    请勿使用 Finder (macOS) 或 Windows 资源管理器 (Windows) 中的压缩功能来创建 myapp.zip 存档。这可能导致应用程序代码无效。

  2. 使用你首选的压缩应用程序来压缩getting-started.pyhttps://mvnrepository.com/artifact/org.apache.flink/ flink-sql-connector-kinesis /1.15.2 文件。为存档myapp.zip命名。如果您在存档中包含外部文件夹,则必须将其包含在路径中,并在配置文件中添加代码:GettingStarted/getting-started.py

  3. 运行以下命令:

    $ aws s3 --region aws region cp myapp.zip s3://ka-app-code-<username>

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建并运行 Managed Service for Apache Flink

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建应用程序

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/flink

  2. 在 Managed Service for Apache Flink 控制面板上,选择创建分析应用程序

  3. Managed Service for Apache Flink - 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 应用程序名称 ,输入 MyApplication

    • 对于描述,输入 My java test app

    • 对于 运行时,请选择 Apache Flink

    • 将版本保留为 Apache Flink 版本 1.15.2(建议的版本)

  4. 对于访问权限,请选择 创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesisanalytics-MyApplication-us-west-2

配置应用程序

请使用以下过程来配置应用程序。

配置应用程序
  1. MyApplication页面上,选择配置

  2. 配置应用程序 页面上,提供 代码位置

    • 对于Amazon S3 存储桶,请输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入myapp.zip

  3. 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-west-2

  4. 属性下面,选择添加组

  5. 输入以下信息:

    组 ID
    consumer.config.0 input.stream.name ExampleInputStream
    consumer.config.0 aws.region us-west-2
    consumer.config.0 scan.stream.initpos LATEST

    选择保存

  6. 属性下面,再次选择添加组

  7. 输入以下信息:

    组 ID
    producer.config.0 output.stream.name ExampleOutputStream
    producer.config.0 aws.region us-west-2
    producer.config.0 shard.count 1
  8. 属性下面,再次选择添加组。对于 组 ID,输入 kinesis.analytics.flink.run.options。这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅 指定您的代码文件

  9. 输入以下信息:

    组 ID
    kinesis.analytics.flink.run.options python getting-started.py
    kinesis.analytics.flink.run.options jarfile flink-sql-connector-kinesis-1.15.2.jar
  10. 监控 下,确保 监控指标级别 设置为 应用程序

  11. 要进行CloudWatch 日志记录,请选中 “启用” 复选框。

  12. 选择更新

注意

当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

编辑 IAM policy

编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。

编辑 IAM policy 以添加 S3 存储桶权限
  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. 摘要 页面上,选择 编辑策略。选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

运行应用程序

可以通过运行应用程序、打开 Apache Flink 控制面板并选择所需的 Flink 任务来查看 Flink 任务图。

停止应用程序

要停止应用程序,请在MyApplication页面上选择停止。确认该操作。

下一个步骤

清理 Amazon 资源