为 Python 应用程序创建并运行 Kinesis Data Analytics - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为 Python 应用程序创建并运行 Kinesis Data Analytics

在本练习中,您将为 Python 应用程序创建 Kinesis Data Analytics 应用程序,将 Kinesis 流作为源和接收器。

创建相关资源

在为本练习创建适用于 Apache Flink 的 Amazon Kinesis 数据分析之前,您需要创建以下依赖资源:

  • 两个 Kinesis 数据流用于输入和输出。

  • 用于存储应用程序代码和输出的 Amazon S3 存储桶 (ka-app-code-<username>)

创建两个 Kinesis 直播

在为本练习创建 Kinesis Data Analytics 应用程序之前,请创建两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。

您可以使用 Amazon Kinesis 控制台或以下Amazon CLI命令创建这些直播。有关控制台的说明,请参阅 Amazon Kinesis 数据流开发者指南中的创建和更新数据流

创建数据流 (Amazon CLI)
  1. 要创建第一个直播 (ExampleInputStream),请使用以下 Amazon Kinesiscreate-streamAmazon CLI 命令。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

创建 Amazon S3 存储桶

您可以使用控制台创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:

  • 如何创建 S3 存储桶?Amazon Simple Storage Serv ice 通过附加您的登录名,为 Amazon S3 存储桶指定一个全球唯一的名称,例如ka-app-code-<username>

其他资源

在创建应用程序时,Kinesis Data Analytics 将创建以下Amazon CloudWatch 资源。

  • 名为 /aws/kinesis-analytics-java/MyApplication 的日志组。

  • 名为 kinesis-analytics-log-stream 的日志流。

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

注意

本节中的 Python 脚本使用Amazon CLI. 您必须将您的配置Amazon CLI为使用您的账户凭证和默认区域。要配置您的Amazon CLI,请输入以下内容:

aws configure
  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

创建并检查 Apache Flink 直播 Python 代码

此示例的 Python 应用程序代码可从以下网址获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/python/GettingStarted目录。

应用程序代码位于 streaming-file-sink.py 文件中。请注意有关应用程序代码的以下信息:

  • 该应用程序使用 Kinesis 表源从源数据流中读取数据。以下代码段调用该create_table函数来创建 Kinesis 表源:

    table_env.execute_sql( create_table(output_table_name, output_stream, output_region)

    create_table函数使用 SQL 命令创建由流式传输源支持的表:

    def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
  • 应用程序创建两个表,然后将一个表的内容写入另一个表。

    # 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
  • 该应用程序使用来自 flink - sql-connector-kinesis _2.12/1.15.2 文件的 Flink 连接器。

  • 使用第三方 python 软件包(例如 boto3)时,需要将其添加到getting-started.py所在的GettingStarted文件夹中。无需在 Apache Flink 或 Kinesis Data Analytics 中添加任何其他配置。可以在如何在 PyfLink 中使用 boto3 中找到一个示例。

上传 Apache Flink 直播 Python 代码

在本部分,您将创建一个 Simple Storage Service(Amazon S3)存储桶并上传应用程序

要使用控制台上传应用程序代码,请执行以下操作:
  1. 使用你首选的压缩应用程序压缩getting-started.pyhttps://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis _2.12/1.15.2 文件。命名档案myapp.zip。如果您在存档中包含外部文件夹,则必须将其包含在配置文件中的代码路径中:GettingStarted/getting-started.py

  2. 通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/

  3. 选择 Create bucket (创建存储桶)

  4. ka-app-code-<username>存储段名称字段中输入。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择下一步

  5. 配置选项步骤中,让设置保持原样,然后选择下一步

  6. 设置权限步骤中,让设置保持原样,然后选择下一步

  7. 选择创建桶

  8. 在 Amazon S3 控制台中,选择 ka-app-code- <username>存储桶,然后选择上传

  9. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 myapp.zip 文件。选择下一步

  10. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

要使用以下命令上传应用程序代码,请执行以下Amazon CLI操作:
注意

不要使用 Finder (macOS) 或 Windows 资源管理器 (Windows) 中的压缩功能来创建myapp.zip存档。这可能导致应用程序代码无效。

  1. 使用你首选的压缩应用程序压缩streaming-file-sink.pyhttps://mvnrepository.com/artifact/org.apache.flink/flink- sql-connector-kinesis _2.12/1.15.2 文件。

    注意

    不要使用 Finder (macOS) 或 Windows 资源管理器 (Windows) 中的压缩功能来创建 myapp.zip 存档。这可能导致应用程序代码无效。

  2. 使用您首选的压缩应用程序压缩getting-started.pyhttps://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis /1.15.2 文件。命名档案myapp.zip。如果您在存档中包含外部文件夹,则必须将其包含在配置文件中的代码路径中:GettingStarted/getting-started.py

  3. 运行以下命令:

    $ aws s3 --region aws region cp myapp.zip s3://ka-app-code-<username>

您的应用程序代码现在存储在 Amazon S3 存储桶中,您的应用程序可以在该存储桶中进行访问。

创建并运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于描述,输入 My java test app

    • 对于 Runtime (运行时),请选择 Apache Flink

    • 将该版本保留为 Apache Flink 版本 1.15.2(推荐版本)

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

当您使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

配置应用程序

可以按照以下过程来配置应用程序。

配置应用程序
  1. MyApplication页面上,选择配置

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 对于 Amazon S3 存储桶,输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入myapp.zip

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. 在 “属性” 下,选择 “添加群组”。

  5. 输入以下信息:

    组 ID
    consumer.config.0 input.stream.name ExampleInputStream
    consumer.config.0 aws.region us-west-2
    consumer.config.0 scan.stream.initpos LATEST

    选择保存

  6. 在 “属性” 下,再次选择 “添加群组”。

  7. 输入以下信息:

    组 ID
    producer.config.0 output.stream.name ExampleOutputStream
    producer.config.0 aws.region us-west-2
    producer.config.0 shard.count 1
  8. 在 “属性” 下,再次选择 “添加群组”。对于群组 ID,输入kinesis.analytics.flink.run.options。这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅指定您的代码文件

  9. 输入以下信息:

    组 ID
    kinesis.analytics.flink.run.options python getting-started.py
    kinesis.analytics.flink.run.options jarfile flink-sql-connector-kinesis-1.15.2.jar
  10. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  11. 要进行CloudWatch 记录,请选中 “启用” 复选框。

  12. 选择更新

注意

当您选择启用亚马逊 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

编辑 IAM 策略

编辑 IAM 策略以添加访问 Amazon S3 存储桶的权限。

编辑 IAM 策略以添加 S3 存储桶权限
  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

运行应用程序

可以通过运行应用程序、打开 Apache Flink 仪表板并选择所需的 Flink 作业来查看 Flink 任务图。

停止应用程序

要停止应用程序,请在MyApplication页面上选择停止。确认该操作。

下一个步骤

清理 Amazon 资源