使用 Amazon MSK 创建 Studio 笔记本 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon MSK 创建 Studio 笔记本

本教程描述如何创建使用 Amazon MSK 集群作为源的 Studio 笔记本。

设置

在此教程中,您需要一个允许纯文本访问的 Amazon MSK 集群。如果您尚未设置 Amazon MSK 集群,请按照使用 Amazon MSK 入门 教程创建 Amazon VPC、Amazon MSK 集群、主题和 Amazon EC2 客户端实例。

在学习教程时,执行以下操作:

将 NAT 网关添加到您的 VPC

如果您按照使用 Amazon MSK 入门教程创建了 Amazon MSK 集群,或者您的现有 Amazon VPC 还没有用于其私有子网的 NAT 网关,则必须将 NAT 网关添加到您的 Amazon VPC 中。下图演示了架构。

要为您的 Amazon VPC 创建 NAT 网关,请执行以下操作:

  1. 通过 https://console.aws.amazon.com/vpc/ 打开 Amazon VPC 控制台。

  2. 从左侧导航栏中选择 NAT 网关

  3. NAT 网关页面上,选择创建 NAT 网关

  4. 创建 NAT 网关页面上,提供以下值:

    姓名-可选 ZeppelinGateway
    子网 Amazon KafkaTutorialSubnet1
    弹性 IP 分配 ID 选择可用的弹性 IP。如果没有可用的弹性 IP,请选择分配弹性 IP,然后选择控制台创建的弹性 IP。

    选择 Create NAT Gateway(创建 NAT 网关)。

  5. 在左侧导航栏中,选择 路由表

  6. 选择 Create Route Table

  7. 创建(路由表) 页面上,提供以下信息:

    • 名称标签:ZeppelinRouteTable

    • VPC:选择您的 VPC(例如 Amazon KafkaTutorialVPC)。

    选择创建

  8. 在路由表列表中,选择ZeppelinRouteTable。选择 路由选项卡,然后选择 编辑路由

  9. 编辑路由页面上,选择添加路由

  10. 目标位置字段,输入0.0.0.0/0。对于目标,选择 NAT 网关ZeppelinGateway。选择 保存路由。选择 关闭

  11. 在 “路由表” 页面上,ZeppelinRouteTable选中,选择 “子网关联” 选项卡。选择编辑子网关联

  12. 编辑子网关联页面中,选择 Amazon KafkaTutorialSubnet2Amazon KafkaTutorialSubnet3。选择保存

创建 Amazon Glue 连接和表

您的 Studio 笔记本使用Amazon Glue数据库来存储有关您的Amazon MSK 数据来源的元数据。在本节中,您将创建一个描述如何访问您的 Amazon MSK 集群的 Amazon Glue 连接,以及一个描述如何将数据源中的数据呈现给客户端(例如 Studio 笔记本)的 Amazon Glue 表。

创建连接
  1. 登录 Amazon Web Services Management Console 并打开 Amazon Glue 控制台,网址为 https://console.aws.amazon.com/glue/

  2. 如果您还没有 Amazon Glue 数据库,请从左侧导航栏中选择 “数据库”。选择 添加数据库。在“添加数据库” 窗口中,输入 default数据库名称”。选择 创建

  3. 从左侧导航菜单中,选择连接。选择 添加连接

  4. 在 “添加连接” 窗口中,提供以下值:

    • 对于 连接名称,输入 ZeppelinConnection

    • 对于 Connection type (连接类型),选择 Kafka

    • 对于 Kafka 引导服务器 URL,请为您的集群提供引导代理字符串。您可以从 MSK 控制台或通过输入以下 CLI 命令来获取引导程序代理:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • 取消选中 “需要 SSL 连接” 复选框。

    选择 下一步

  5. VPC 页面上,提供以下值:

    • 对于 VPC,请选择您的 VPC 的名称(例如 Amazon KafkaTutorialVPC。)

    • 对于子网,选择 Amazon KafkaTutorialSubnet2

    • 对于安全组,请选择所有可用的组。

    选择 下一步

  6. 在“连接属性/连接访问权限” 页中,选择“完成

创建表
注意

您可以按照以下步骤所述手动创建表,也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink创建表连接器代码,通过 DDL 语句创建表。然后,您可以签入 Amazon Glue 以确保表格已正确创建。

  1. 在左侧导航栏中,选择 。在 “” 页中,选择 “添加表”,“手动添加表”。

  2. 设置表的属性页面中,输入stock表格名称。请务必选择之前创建的数据库。选择 下一步

  3. 添加数据存储页面中,选择 Kafka。在主题名称中,输入您的主题名称(例如 Amazon KafkaTutorialTopic)。对于 “连接”,选择ZeppelinConnection

  4. 分类页面中,选择 JSON。选择 下一步

  5. 定义架构页面中,选择 Add Column 以添加列。添加具有以下属性的列:

    列名称 数据类型
    ticker string
    price double

    选择 下一步

  6. 在下一页上,验证您的设置,然后选择完成

  7. 从表列表中选择新创建的表。

  8. 选择 “编辑表”,然后添加包含键managed-flink.proctime和值的属性proctime

  9. 选择 应用

使用 Amazon MSK 创建 Studio 笔记本

现在,您已经创建了应用程序使用的资源,接下来就可以创建自己的 Studio 笔记本了。

您可以使用 Amazon Web Services Management Console 或创建应用程序 Amazon CLI。
注意

您也可以从 Amazon MSK 控制台创建 Studio 笔记本,方法是选择现有集群,然后选择 “实时处理数据”。

使用创建 Studio 笔记本 Amazon Web Services Management Console

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard

  2. Managed Service for Apache Flink 应用程序页面中,选择 Studio 选项卡。选择创建 Studio 笔记本

    注意

    要从Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本,请选择您输入的Amazon MSK 集群或 Kinesis 数据流,然后选择“实时处理数据”。

  3. 创建笔记本实例 页面上,提供以下信息:

    • 输入 MyNotebook Studio 笔记本的名称

    • Glue 数据库Amazon 选择默认值

    选择创建 Studio 笔记本

  4. 在该MyNotebook页面中,选择 “配置” 选项卡。在 联网 部分中,选择 编辑

  5. 编辑网络连接 MyNotebook页面中,选择基于 Amazon MSK 集群的 VPC 配置。为Amazon MSK 集群选择您的Amazon MSK 集群。选择保存更改

  6. MyNotebook页面中,选择 “运行”。等待“状态”显示为“正在运行”。

使用创建 Studio 笔记本 Amazon CLI

要使用创建 Studio 笔记本 Amazon CLI,请执行以下操作:

  1. 验证您具有以下信息:创建应用程序时,您需要用到这些值。

    • 您的 账户 ID

    • 包含您的 Amazon MSK 集群的 Amazon VPC 的子网 ID 和安全组 ID。

  2. 创建以下内容的名为 create.json 的文件。确保将占位符值替换为您自己的信息。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. 要创建应用程序,请运行以下命令:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. 命令完成后,您应该会看到类似于以下内容的输出,其中显示了新 Studio 笔记本的详细信息:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. 要开始应用程序,请运行以下命令。请将占位符值替换为账户 ID。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

向您的Amazon MSK 集群发送数据

在本节中,您将在 Amazon EC2 客户端中运行 Python 脚本,将数据发送到您的Amazon MSK 数据源。

  1. 连接到您的 Amazon EC2 客户端。

  2. 运行以下命令安装 Python 版本 3、Pip 和 Kafka for Python 软件包,然后确认操作:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 通过输入以下命令 Amazon CLI 在您的客户端计算机上进行配置:

    aws configure

    提供您的账户凭证,us-east-1并提供region

  4. 创建以下内容的名为 stock.py 的文件。将示例值替换为您的 Amazon MSK 集群的 Bootstrap Brokers 字符串,如果您的主题不是,请更新主题名称:Amazon KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 使用以下命令运行脚本:

    $ python3 stock.py
  6. 完成以下部分后,请让脚本继续运行。

测试您的 Studio 笔记本

在本节中,您将使用 Studio 笔记本查询来自 Amazon MSK 集群的数据。

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard

  2. Managed Service for Apache Flink 应用程序页面上,选择 Studio 笔记本选项卡。选择MyNotebook

  3. MyNotebook页面中,选择 “在 Apache 齐柏林飞艇中打开”。

    Apache Zeppelin 接口会在新选项卡中打开。

  4. 欢迎来到 Zeppelin!页面上,选择Zeppelin 新笔记

  5. Zeppelin Note 页面上,在新笔记中输入以下查询:

    %flink.ssql(type=update) select * from stock

    选择运行图标。

    该应用程序显示来自 Amazon MSK 集群的数据。

要打开应用程序的 Apache Flink 仪表板以查看操作方面,请选择 FLINK JOB。有关 Flink 控制面板的更多信息,请参阅《Managed Service for Apache Flink 开发者指南》中的 Apache Flink 控制面板。

有关 Flink Streaming SQL 查询的更多示例,请参阅 Apache Fl ink 文档中的查询