使用 Amazon MSK 创建 Studio 笔记本 - Managed Service for Apache Flink
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon MSK 创建 Studio 笔记本

本教程描述如何创建使用 Amazon MSK 集群作为源的 Studio 笔记本。

设置

在此教程中,您需要一个允许纯文本访问的 Amazon MSK 集群。如果您尚未设置 Amazon MSK 集群,请按照使用 Amazon MSK 入门 教程创建 Amazon VPC、Amazon MSK 集群、主题和 Amazon EC2 客户端实例。

在学习教程时,执行以下操作:

将 NAT 网关添加到您的 VPC

如果您按照使用 Amazon MSK 入门教程创建了 Amazon MSK 集群,或者您的现有 Amazon VPC 还没有用于其私有子网的 NAT 网关,则必须将 NAT 网关添加到您的 Amazon VPC 中。下图演示了架构。

要为您的 Amazon VPC 创建 NAT 网关,请执行以下操作:

  1. 通过以下网址打开 Amazon VPC 控制台:https://console.aws.amazon.com/vpc/

  2. 从左侧导航栏中选择 NAT 网关

  3. NAT 网关页面上,选择创建 NAT 网关

  4. 创建 NAT 网关页面上,提供以下值:

    name - 可选 ZeppelinGateway
    子网 AmazonKafkaTutorialSubnet1
    弹性 IP 分配 ID Choose an available Elastic IP. If there are no Elastic IPs available, choose 分配弹性 IP, and then choose the Elasic IP that the console creates.

    选择 Create NAT Gateway(创建 NAT 网关)。

  5. 在左侧导航栏中,选择 路由表

  6. 选择 Create Route Table

  7. 创建(路由表) 页面上,提供以下信息:

    • 名称标签:ZeppelinRouteTable

    • VPC:选择您的 VPC(例如 AmazonKafkaTutorialVPC)。

    选择 创建

  8. 在路由表列表中,选择 ZeppelinRouteTable。选择 路由选项卡,然后选择 编辑路由

  9. 编辑路由页面上,选择添加路由

  10. 目标位置字段,输入0.0.0.0/0。对于 目标,选择 NAT 网关ZeppelinGateway。选择 保存路由。选择 关闭

  11. 在路由表页面上,选择 ZeppelinRouteTable,选择子网关联选项卡。选择编辑子网关联

  12. “编辑子网关联” 页面中,选择 AmazonkafkaTutorialSubnet2 和 kafkaTutorial Subnet3。Amazon选择 Save(保存)。

创建一个Amazon Glue 连接和表

您的 Studio 笔记本使用Amazon Glue数据库来存储有关您的Amazon MSK 数据来源的元数据。在本节中,您将创建一个描述如何访问您的 Amazon MSK 集群的Amazon Glue连接,以及一个描述如何将数据源中的数据呈现给客户端(例如 Studio 笔记本)的Amazon Glue表。

创建连接
  1. 登录 Amazon Web Services Management Console,然后打开 Amazon Glue 控制台,网址为:https://console.aws.amazon.com/glue/

  2. 如果您还没有Amazon Glue数据库,请从左侧导航栏中选择 “数据库”。选择 添加数据库。在“添加数据库” 窗口中,输入 default数据库名称”。选择 创建

  3. 从左侧导航菜单中,选择连接。选择 添加连接

  4. 在 “添加连接” 窗口中,提供以下值:

    • 对于 连接名称,输入 ZeppelinConnection

    • 对于 Connection type (连接类型),选择 Kafka

    • 对于 Kafka 引导服务器 URL,请为您的集群提供引导代理字符串。您可以从 MSK 控制台或通过输入以下 CLI 命令来获取引导程序代理:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • 取消选中 “需要 SSL 连接” 复选框。

    选择 下一步

  5. VPC 页面上,提供以下值:

    • 对于 VPC,请选择您的 VPC 的名称(例如 AmazonKafkaTutorialVPC。)

    • 对于子网,请选择 AmazonkafkaTutorialSubnet2

    • 对于安全组,请选择所有可用的组。

    选择 下一步

  6. 在“连接属性/连接访问权限” 页中,选择“完成

创建表
注意

您可以按照以下步骤所述手动创建表,也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink创建表连接器代码,通过 DDL 语句创建表。然后,您可以签入Amazon Glue以确保正确创建了此表。

  1. 在左侧导航栏中,选择 。在 “” 页中,选择 “添加表”,“手动添加表”。

  2. 设置表的属性页面中,输入stock表格名称。请务必选择之前创建的数据库。选择 下一步

  3. 添加数据存储页面中,选择 Kafka。对于主题名称,输入您的主题名称(例如 AmazonKafkaTutorialTopic)。在“连接”中,选择 ZeppelinConnection

  4. 分类页面中,选择 JSON。选择 下一步

  5. 定义架构页面中,选择 Add Column 以添加列。添加具有以下属性的列:

    列名称 数据类型
    自动收报机 字符串
    价格 double

    选择 下一步

  6. 在下一页上,验证您的设置,然后选择完成

  7. 从表列表中选择新创建的表。

  8. 选择 “编辑表”,然后添加包含键managed-flink.proctime和值的属性proctime

  9. 选择 Apply(应用)。

使用 Amazon MSK 创建 Studio 笔记本

现在,您已经创建了应用程序使用的资源,接下来就可以创建自己的 Studio 笔记本了。

您可以使用Amazon Web Services Management Console或Amazon CLI,创建应用程序。
注意

您也可以从 Amazon MSK 控制台创建 Studio 笔记本,方法是选择现有集群,然后选择 “实时处理数据”。

使用Amazon Web Services Management Console创建 Studio 笔记本

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard

  2. Managed Service for Apache Flink 应用程序页面中,选择 Studio 选项卡。选择创建 Studio 笔记本

    注意

    要从Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本,请选择您输入的Amazon MSK 集群或 Kinesis 数据流,然后选择“实时处理数据”。

  3. 创建笔记本实例 页面上,提供以下信息:

    • 输入 MyNotebook Studio 笔记本的名称

    • Glue 数据库Amazon选择默认值

    选择创建 Studio 笔记本

  4. 在 “我的笔记本” 页面中,选择 “配置” 选项卡。在 联网 部分中,选择 编辑

  5. 编辑我的笔记本网络页面中,选择基于 Amazon MSK 集群的 VPC 配置。为Amazon MSK 集群选择您的Amazon MSK 集群。选择 Save changes(保存更改)。

  6. 在“我的笔记本”页面中,选择“运行”。等待“状态”显示为“正在运行”。

使用Amazon CLI创建 Studio 笔记本

要使用创建 Studio 笔记本Amazon CLI,请执行以下操作:

  1. 验证您具有以下信息:创建应用程序时,您需要用到这些值。

    • 您的 账户 ID

    • 包含您的 Amazon MSK 集群的 Amazon VPC 的子网 ID 和安全组 ID。

  2. 创建以下内容的名为 create.json 的文件。确保将占位符值替换为您自己的信息。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. 要创建应用程序,请运行以下命令:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. 命令完成后,您应该会看到类似于以下内容的输出,其中显示了新 Studio 笔记本的详细信息:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. 要开始应用程序,请运行以下命令。请将占位符值替换为账户 ID。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

向您的Amazon MSK 集群发送数据

在本节中,您将在 Amazon EC2 客户端中运行 Python 脚本,将数据发送到您的Amazon MSK 数据源。

  1. 连接到您的 Amazon EC2 客户端。

  2. 运行以下命令安装 Python 版本 3、Pip 和 Kafka for Python 软件包,然后确认操作:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 通过输入以下命令,在客户端计算机Amazon CLI上配置:

    aws configure

    提供您的账户凭证,us-east-1并提供region

  4. 创建以下内容的名为 stock.py 的文件。将示例值替换为您的 Amazon MSK 集群的 Bootstrap Brokers 字符串,如果您的主题不Amazon是 kafkaTutorialTopic,请更新主题名称:

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 使用以下命令运行脚本:

    $ python3 stock.py
  6. 完成以下部分后,请让脚本继续运行。

测试您的 Studio 笔记本

在本节中,您将使用 Studio 笔记本查询来自 Amazon MSK 集群的数据。

  1. 打开 Managed Service for Apache Flink 控制台,网址为 https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard

  2. Managed Service for Apache Flink 应用程序页面上,选择 Studio 笔记本选项卡。选择 MyNotebook

  3. 在 “我的笔记本” 页面中,选择 “在 Apache Zeppelin 中打开”。

    Apache Zeppelin 接口会在新选项卡中打开。

  4. 欢迎来到 Zeppelin!页面上,选择Zeppelin 新笔记

  5. Zeppelin Note 页面上,在新笔记中输入以下查询:

    %flink.ssql(type=update) select * from stock

    选择运行图标。

    该应用程序显示来自 Amazon MSK 集群的数据。

要打开应用程序的 Apache Flink 仪表板以查看操作方面,请选择 FLINK JOB。有关 Flink 控制面板的更多信息,请参阅《Managed Service for Apache Flink 开发者指南》中的 Apache Flink 控制面板。

有关 Flink Streaming SQL 查询的更多示例,请参阅 Apache Fl ink 文档中的查询