示例:使用 Python 向Amazon S3 发送流媒体数据 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:使用 Python 向Amazon S3 发送流媒体数据

在本练习中,您将创建一个 Python Kinesis Data Analytics 应用程序,该应用程序将数据传输到亚马逊简单存储服务接收器。

注意

要为本练习设置所需的先决条件,请先完成入门指南 (Python)练习。

创建相关资源

在为本练习创建 Kinesis Data Analytics 应用程序之前,您需要创建以下依赖资源:

  • 一个 Kinesis 数据流 (ExampleInputStream)

  • 用于存储应用程序代码和输出的 Amazon S3 存储桶 (ka-app-code-<username>)

注意

在 Kinesis Data Analytics 上启用服务器端加密后,适用于 Apache Flink 的 Kinesis Data Analytics 无法将数据写入Amazon S3。

您可以使用控制台创建 Kinesis 直播桶和Amazon S3 存储桶和Amazon S3 存储桶 有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data Streams 开发人员指南中@@ 创建和更新数据流。将数据流命名为 ExampleInputStream

  • 如何创建 S3 存储桶?Amazon Service 用户指南中。通过附加您的登录名,为 Amazon S3 存储桶指定一个全球唯一的名称,例如ka-app-code-<username>

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 Amazon SDK for Python (Boto)

注意

本节中的 Python 脚本使用Amazon CLI. 您必须将您的配置Amazon CLI为使用您的账户凭证和默认区域。若要配置您的Amazon CLI,请输入以下内容:

aws configure
  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

此示例的 Python 应用程序代码可从以下网址获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
  3. 导航到 amazon-kinesis-data-analytics-java-examples/python/S3Sink目录。

应用程序代码位于 streaming-file-sink.py 文件中。请注意有关应用程序代码的以下信息:

  • 该应用程序使用 Kinesis 表源从源数据流中读取数据。以下代码段调用该create_source_table函数来创建 Kinesis 表源:

    table_env.execute_sql( create_source_table(input_table_name, input_stream, input_region, stream_initpos) )

    create_source_table函数使用 SQL 命令创建由流式传输源支持的表

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  • 此应用程序使用filesystem连接器将记录发送到 Amazon S3 存储桶:

    def create_sink_table(table_name, bucket_name): return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time VARCHAR(64) ) PARTITIONED BY (ticker) WITH ( 'connector'='filesystem', 'path'='s3a://{1}/', 'format'='csv', 'sink.partition-commit.policy.kind'='success-file', 'sink.partition-commit.delay' = '1 min' ) """.format(table_name, bucket_name)
  • 该应用程序使用 flink-sql-connector-kinesis-1.15.2.jar 文件中的 Kinesis Flink 连接器。

压缩并上传 Apache Flink 直播 Python 代码

在本部分中,您将应用程序代码上传到您在该创建相关资源部分创建的 Amazon S3 存储桶。

  1. 使用您首选的压缩应用程序压缩streaming-file-sink.pyflink-sql-connector-kinesis-1.15.2.jar 文件。命名档案myapp.zip

  2. 在 Amazon S3 控制台中,选择 ka-app-code- <username>存储桶,然后选择上传

  3. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 myapp.zip 文件。

  4. 您无需更改该对象的任何设置,因此,请选择 Upload (上传)

您的应用程序代码现在存储在 Amazon S3 桶中,您的应用程序可以在其中访问。

创建并运行 Kinesis Data Analytics 应用程序

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建 应用程序

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在 Kinesis Data Analytics 仪表板上,选择创建分析应用程序

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 Application name (应用程序名称),输入 MyApplication

    • 对于 Runtime (运行时),请选择 Apache Flink

      注意

      Kinesis Data Analytics 使用 Apache Flink 版本 1.15.2。

    • 将版本下拉列表保留为 Apache Flink 版本 1.15.2(推荐版本)

  4. 对于访问权限,请选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

当您使用控制台创建 Kinesis Data Analytics 应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源使用您的应用程序名称和区域命名,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

配置应用程序

  1. MyApplication页面上,选择配置

  2. Configure application (配置应用程序) 页面上,提供 Code location (代码位置)

    • 对于 Amazon S3 存储桶,输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入myapp.zip

  3. Access to application resources (对应用程序的访问权限) 下,对于 Access permissions (访问权限),选择 Create / update IAM role (创建/更新 IAM 角色) kinesis-analytics-MyApplication-us-west-2

  4. 在 “属性” 下,选择 “添加群组”。

  5. 输入以下应用程序属性和值:

    组 ID
    consumer.config.0 input.stream.name ExampleInputStream
    consumer.config.0 aws.region us-west-2
    consumer.config.0 scan.stream.initpos LATEST

    选择保存

  6. 在 “属性” 下,再次选择 “添加群组”。对于群组 ID,输入kinesis.analytics.flink.run.options。这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅指定您的代码文件

  7. 输入以下应用程序属性和值:

    组 ID
    kinesis.analytics.flink.run.options python streaming-file-sink.py
    kinesis.analytics.flink.run.options jarfile S3Sink/lib/flink-sql-connector-kinesis-1.15.2.jar
  8. 在 “属性” 下,再次选择 “添加群组”。对于群组 ID,输入sink.config.0。这个特殊的属性组告诉你的应用程序在哪里可以找到它的代码资源。有关更多信息,请参阅指定您的代码文件

  9. 输入以下应用程序属性和值:(将存储桶名称替换为您的 Amazon S3 存储桶的实际名称。)

    组 ID
    sink.config.0 output.bucket.name bucket-name
  10. Monitoring (监控) 下,确保 Monitoring metrics level (监控指标级别) 设置为 Application (应用程序)

  11. 要进行CloudWatch 记录,请选中 “启用” 复选框。

  12. 选择更新

注意

当您选择启用 CloudWatch 日志记录时,Kinesis Data Analytics 会为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

该日志流用于监控应用程序。这与应用程序用于发送结果的日志流不同。

编辑 IAM 策略

编辑 IAM 策略以添加访问 Kinesis 数据流的权限。

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. Summary (摘要) 页面上,选择 Edit policy (编辑策略)。请选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/myapp.zip" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteObjects", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-code-<username>", "arn:aws:s3:::ka-app-code-<username>/*" ] } ] }

运行应用程序

可以通过运行应用程序、打开 Apache Flink 仪表板并选择所需的 Flink 作业来查看 Flink 任务图。

您可以在 CloudWatch 控制台上查看 Kinesis Data Analytics 指标,以验证应用程序是否正常运行。

清理 Amazon 资源

本节包含清理在滑动窗口教程中创建的 Amazon 资源的过程。

删除 Kinesis Data Analytics

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 在 Kinesis Data Analytics 面板中,选择MyApplication

  3. 在应用程序的页面中,选择 Delete (删除),然后确认删除。

删除 Kinesis Data Streams

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Kinesis Data Streams 面板中,选择ExampleInputStream

  3. ExampleInputStream页面中,选择 “删除 Kinesis Stream”,然后确认删除。

删除您的Amazon S3 对象和存储桶

  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择 ka-app-code- 存储桶。 <username>

  3. 选择 Delete (删除),然后输入存储桶名称以确认删除。

删除您的 IAM 资源

  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航栏中,选择策略

  3. 在筛选条件控件中,输入 kinesis

  4. 选择 kinesis-analytics-service-MyApplication- <your-region>策略。

  5. 选择 Policy Actions (策略操作),然后选择 Delete (删除)

  6. 在导航栏中,选择 Roles(角色)

  7. 选择 k inesis-analyticsMyApplication- <your-region>角色。

  8. 选择 Delete role (删除角色),然后确认删除。

删除您的 CloudWatch 资源

  1. 通过 https://console.aws.amazon.com/cloudwatch/ 打开 CloudWatch 主机。

  2. 在导航栏中,选择 Logs (日志)

  3. 选择 /aws/kinesis-analytics/MyApplication 日志组。

  4. 选择 Delete Log Group (删除日志组),然后确认删除。