使用亚马逊 MSK 创建 Studio 笔记本 - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用亚马逊 MSK 创建 Studio 笔记本

本教程描述如何创建将 Amazon MSK 集群用作源的 Studio 笔记本。

设置

对于本教程,您需要一个允许明文访问的 Amazon MSK 群集。如果您还没有设置 Amazon MSK 集群,请按照开始使用亚马逊 MSK创建 Amazon VPC、Amazon MSK 集群、主题和 Amazon EC2 客户端实例的教程。

在遵循本教程时,请执行以下操作:

将 NAT 网关添加到您的 VPC

如果您通过遵循开始使用亚马逊 MSK教程,或者如果您现有的 Amazon VPC 还没有用于其私有子网的 NAT 网关,则必须向您的 Amazon VPC 添加 NAT 网关。下图演示了架构。

要为您的 Amazon VPC 创建 NAT Gateway,请执行以下操作:

  1. 通过以下网址打开 Amazon VPC 控制台:https://console.aws.amazon.com/vpc/

  2. 选择NAT 网关从左侧导航栏中。

  3. 在存储库的NAT 网关页面上,选择创建 NAT 网关.

  4. 在存储库的创建 NAT 网关页面上,提供以下值:

    姓名-可选的 齐柏林网关
    子网 AmazonKafkatTorialSubnet1
    弹性 IP 分配 ID Choose an available Elastic IP. If there are no Elastic IPs available, choose 分配弹性 IP, and then choose the Elasic IP that the console creates.

    选择创建 NAT 网关.

  5. 在左侧导航栏上,选择路由表.

  6. 选择 Create Route Table

  7. 在存储库的创建路由表页面上,提供以下信息:

    • 名称标签:ZeppelinRouteTable

    • VPC:选择你的 VPC(例如AmazonKafkatutorialVPC)。

    选择 Create(创建)。

  8. 在路由表列表中,选择齐柏林路由表. 选择路由选项卡,然后选择编辑路线.

  9. 编辑路线页面上,选择添加路由.

  10. 适用于目的地输入0.0.0.0/0. 适用于目标,选择NAT 网关齐柏林网关. 选择保存路由. 选择关闭

  11. 在路由表页面上,使用齐柏林路由表已选中,选择子网关联选项卡。选择编辑子网关联.

  12. 编辑子网关联页面上,选择AmazonKafkatutorialSubnet2AmazonKafkatTorialSubnet3. 选择 Save(保存)。

创建Amazon Glue连接和表

你的 Studio 笔记本使用Amazon Glue有关亚马逊 MSK 数据源的元数据库。在本节中,您将创建Amazon Glue描述如何访问 Amazon MSK 集群的连接,以及Amazon Glue该表介绍了如何向客户端(例如 Studio 笔记本电脑)呈现数据源中的数据。

创建连接

  1. 登录 Amazon Web Services Management Console,然后打开 Amazon Glue 控制台,网址为:https://console.aws.amazon.com/glue/

  2. 如果您还没有Amazon Glue数据库,选择数据库从左侧导航栏中。选择添加数据库. 在添加数据库窗口中,输入default为了数据库名称. 选择 Create(创建)。

  3. 选择连接从左侧导航栏中。选择添加连接.

  4. 添加连接在窗口中,提供以下值:

    • 适用于连接名称输入ZeppelinConnection.

    • 对于 Connection type (连接类型),选择 Kafka

    • 适用于Kafka 引导启动服务器 URL,为您的集群提供引导代理字符串。您可以从 MSK 控制台或通过输入以下 CLI 命令获取引导经纪商:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • 取消选中需要 SSL 连接复选框。

    选择 Next(下一步)。

  5. VPC页面上,提供以下值:

    • 适用于VPC中,选择您的 VPC 的名称(例如 AmazonKafkatutorialVPC。)

    • 适用于子网,选择AmazonKafkatutorialSubnet2.

    • 适用于安全组中,选择所有可用的组。

    选择 Next(下一步)。

  6. 连接属性/连接访问页面上,选择Finish.

创建表

注意

您可以按照以下步骤所述手动创建表,也可以在 Apache Zeppelin 的笔记本中使用 Kinesis Data Analytics 的创建表连接器代码通过 DDL 语句创建表。然后你可以办理登机手续Amazon Glue以确保表格已正确创建。

  1. 在左侧导航栏中,选择. 在页面上,选择添加表手动添加表.

  2. 设置表的属性页面,输入stock(对于 )表名称. 确保选择以前创建的数据库。选择 Next(下一步)。

  3. 添加数据存储页面上,选择Kafka. 对于主题名称,输入你的主题名称(例如AmazonKafkat教程主题)。适用于Connection,选择齐柏林连接.

  4. Classification页面上,选择JSON. 选择 Next(下一步)。

  5. 定义架构在页面上,选择添加列以添加列。添加具有以下属性的列:

    列名称 数据类型
    自动收报机 字符串
    价格 double

    选择 Next(下一步)。

  6. 在下一页上,验证您的设置,然后选择Finish.

  7. 从表的列表中选择您新创建的表。

  8. 选择编辑表然后用钥匙添加属性kinesisanalytics.proctime和价值proctime.

  9. 选择 Apply(应用)。

使用亚马逊 MSK 创建 Studio 笔记本

既然您已经创建了应用程序使用的资源,就可以创建 Studio 笔记本。

注意

您还可以通过选择现有群集然后选择从 Amazon MSK 控制台创建 Studio 笔记本实时处理数据.

使用创建 Studio 笔记本Amazon Web Services Management Console

  1. 打开 Kinesis Data Analytics 控制台https://console.aws.amazon.com/kinesisanalytics/home?region=us-east-1#/applications/dashboard.

  2. Kinesis Data Analytics页面上,选择工作室选项卡。选择创建 Studio 笔记本.

    注意

    要从 Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本电脑,请选择输入的亚马逊 MSK 集群或 Kinesis 数据流,然后选择实时处理数据.

  3. 创建 Studio 笔记本页面上,提供以下信息:

    • EnterMyNotebook为了Studio 笔记本名称.

    • 选择默认为了AmazonGlue 数据库.

    选择创建 Studio 笔记本.

  4. MyNotebook页面上,选择配置选项卡。在联网部分,选择编辑.

  5. 为 myNotebook 编辑网络页面上,选择基于 Amazon MSK 集群的 VPC 配置. 为选择您的 Amazon MSK 集群亚马逊 MSK 集群. 选择 Save changes(保存更改)。

  6. MyNotebook页面上,选择运行. 等待状态显示正在运行.

使用创建 Studio 笔记本Amazon CLI

使用创建 Studio 笔记本Amazon CLI中,执行以下操作:

  1. 验证您是否具有以下信息。您需要使用这些值来创建您的应用程序。

    • 您的 账户 ID

    • 包含 Amazon MSK 集群的 Amazon VPC 的子网 ID 和安全组 ID。

  2. 创建以下内容的名为 create.json 的文件。将占位符值替换为您的信息。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-2_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. 运行以下命令以创建您的应用程序:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. 当命令完时,您应看到类似于以下内容的输出,其中显示新 Studio 笔记本的详细信息:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalytics:us-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-2_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. 运行以下命令以启动您的应用程序。将示例值替换为您的账户 ID。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalytics:us-east-1:012345678901:application/MyNotebook\

将数据发送到您的亚马逊 MSK 集群

在本节中,您在 Amazon EC2 客户端中运行 Python 脚本以将数据发送到您的 Amazon MSK 数据源。

  1. Connect 到您的 Amazon EC2 客户端。

  2. 运行以下命令安装 Python 版本 3、Pip 和 Kafka of Python 软件包,然后确认操作:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 配置Amazon CLI在客户端计算机上输入以下命令:

    aws configure

    提供您的账户凭证,us-east-1(对于 )region.

  4. 创建以下内容的名为 stock.py 的文件。将示例值替换为亚马逊 MSK 集群的 Bootstrap Broker 字符串,如果主题不是,则更新主题名称AmazonKafkat教程主题

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['EVENT_TIME'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 使用以下命令运行脚本:

    $ python3 stock.py
  6. 在完成以下部分时,请将脚本保持运行状态。

测试 Studio 笔记本

在本节中,您可以使用 Studio 笔记本查询来自亚马逊 MSK 集群的数据。

  1. 打开 Kinesis Data Analytics 控制台https://console.aws.amazon.com/kinesisanalytics/home?region=us-east-1#/applications/dashboard.

  2. 在存储库的Kinesis Data Analytics页面上,选择Studio 笔记本选项卡。选择MyNotebook.

  3. MyNotebook页面上,选择在 Apache Zeppelin 中打开.

    在新的选项卡中打开 Apache Zeppelin 界面。

  4. 欢迎使用 Zeppelin!页面上,选择Zeppelin 新注意.

  5. Zeppelin Note在新备注中输入以下查询:

    %flink.ssql(type=update) select * from stock

    选择运行图标。

    此应用程序将显示来自 Amazon MSK 集群的数据。

要为应用程序打开 Apache Flink 控制面板以查看运营方面,请选择FLINK 工作. 有关 Flink 控制面板的更多信息,请参阅Apache Flink 控制面板中的Kinesis Data Analytics 开发者指南.

有关 Flink Stream SQL 查询的更多示例,请参阅查询中的Apache Flink 文档.