使用 Kinesis Data Streams 创建创建 Guo - Amazon Kinesis Data Analytics
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis Data Streams 创建创建 Guo

本教程描述如何创建使用 Kinesis 数据流作为源。

设置

在创建 Studio 笔记本之前,创建 Kinesis 数据流 (ExampleInputStream)。您的应用程序使用此流作为应用程序源。

您可以使用 Amazon Kinesis 控制台或以下Amazon CLI命令创建此直播。有关控制台的说明,请参阅 Amazon Kinesis 数据流开发者指南中的创建和更新数据流。命名该流ExampleInputStream并将打开的分片数量设置为1

要使用创建直播 (ExampleInputStream)Amazon CLI,请使用以下 Amazon Kinesiscreate-streamAmazon CLI 命令。

$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser

创建一个 Amazon Glue 表

您的 Studio 笔记本使用Amazon Glue数据库存储有关 Kinesis Data Streams 数据源的元数据。

注意

您可以先手动创建数据库,也可以让 Kinesis Data Analytics 在创建笔记本时为您创建数据库。同样,您可以按照本节所述手动创建表,也可以在 Apache Zeppelin 的笔记本中使用 Kinesis Data Analytics 的创建表连接器代码通过 DDL 语句创建表。然后,您可以签入Amazon Glue以确保正确创建了此表。

创建表
  1. 登录 Amazon Web Services Management Console,然后打开 Amazon Glue 控制台,网址为:https://console.aws.amazon.com/glue/

  2. 如果您还没有Amazon Glue数据库,请从左侧导航栏中选择数据库。选择 “添加数据库”。在添加数据库窗口中default,输入数据库名称。选择创建

  3. 在左侧导航栏中,选择。在表格页面中,选择添加表手动添加表

  4. 设置表的属性页面中stock,输入表名。确保选择之前创建的数据库。选择下一步

  5. 添加数据存储页面中,选择 Kinesis。对于直播名称,输入ExampleInputStream。对于 Kinesis 源网址,选择回车键https://kinesis.us-east-1.amazonaws.com。如果您复制并粘贴 Kinesis 源网址,请务必删除所有开头或结尾的空格。选择下一步

  6. 分类页面中,选择 JSON。选择下一步

  7. 在 “定义架构” 页面中,选择 “添加列” 以添加列。添加具有以下属性的列:

    列名称 数据类型
    自动收报机 字符串
    价格 double

    选择下一步

  8. 在下一页上,验证您的设置,然后选择完成

  9. 从表列表中选择新创建的表。

  10. 选择编辑表并添加带有键kinesisanalytics.proctime和值的属性proctime

  11. 选择 Apply(应用)。

使用 Kinesis Data Streams 创建创建 Dio

现在,您已经创建了应用程序使用的资源,接下来就可以创建您的 Studio 笔记本了。

要创建应用程序,您可以使用Amazon Web Services Management Console或Amazon CLI。

使用创建 Studio 笔记本Amazon Web Services Management Console

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics/home?region=us-east-1#/applications/dashboard

  2. Kinesis Data Analytics 应用程序页面中,选择 Studio 选项卡。选择创建 Studio 笔记本

    注意

    您还可以从亚马逊 MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本,方法是选择您的输入亚马逊 MSK 集群或 Kinesis 数据流,然后选择 “实时处理数据”。

  3. 创建创建 Studio 笔记本页面上,提供以下信息

    • 输入笔记本MyNotebook的名称。

    • AmazonGlue 数据库选择默认值

    选择创建 Studio 笔记本

  4. MyNotebook页面中,选择 “运行”。等待状态显示正在运行。笔记本电脑运行时收费。

使用创建 Studio 笔记本Amazon CLI

要使用创建 Studio 笔记本Amazon CLI,请执行以下操作:

  1. 验证账户 ID。您需要此值才能创建应用程序。

  2. 创建角色arn:aws:iam::AccountID:role/ZeppelinRole并将以下权限添加到控制台自动创建的角色中。

    "kinesis:GetShardIterator",

    "kinesis:GetRecords",

    "kinesis:ListShards"

  3. 创建以下内容的名为 create.json 的文件。用您的信息替换占位符值。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-2_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  4. 运行以下命令创建应用程序。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  5. 命令完成后,您将看到显示新 Studio 笔记本电脑详细信息的输出。下面是输出的一个示例。

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalytics:us-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-2_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  6. 运行以下命令以启动应用程序。将示例值替换为您的账户 ID。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalytics:us-east-1:012345678901:application/MyNotebook\

将数据发送到您的 Kinesis 数据流

要将测试数据发送到您的 Kinesis 数据流,请执行以下操作:

  1. 打开 Kinesis 数据生成器

  2. 选择 “使用” 创建 Cognito 用户 CloudFormation

  3. Amazon CloudFormation控制台以 Kinesis 数据生成器模板打开。选择下一步

  4. 指定堆栈详细信息页面中,输入您的 Cognito 用户的用户名和密码。选择下一步

  5. 配置堆栈选项页面中,选择下一步

  6. 在 “查看 Kinesis-Data-Generator-Cognito-User” 页面中,选择Amazon CloudFormation 可能创建 IAM 资源的 “我确认”。 复选框。选择Create Stack(创建堆栈)。

  7. 等待Amazon CloudFormation堆栈完成创建。堆栈完成后,在Amazon CloudFormation控制台中打开 Kinesis-Data-Generator-Cognito-User 堆栈,然后选择 “输出” 选项卡。打开列出的KinesisDataGeneratorUrl输出值的 URL。

  8. Amazon Kinesis 数据生成器页面中,使用您在步骤 4 中创建的证书登录。

  9. 在下一页上,提供以下值

    区域 us-east-1
    传输流/传输流 ExampleInputStream
    每秒记录数 1

    对于 “记录模板”,粘贴以下代码:

    { "ticker": "{{random.arrayElement( ["AMZN","MSFT","GOOG"] )}}", "price": {{random.number( { "min":10, "max":150 } )}} }
  10. 选择 “发送数据”。

  11. 生成器会将数据发送到 Kinesis 数据流。

    在完成下一节的同时,让生成器保持运行状态。

测试您的工作室笔记本

在本节中,您将使用 Studio 笔记本来查询 Kinesis 数据流中的数据。

  1. 打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics/home?region=us-east-1#/applications/dashboard

  2. Kinesis Data Analytics 应用程序页面上,选择 Studio 笔记本选项卡。选择MyNotebook

  3. MyNotebook页面中,选择 “在 Apache Zeppelin 中打开”。

    Apache Zeppelin 接口在新选项卡中打开。

  4. 在《欢迎来到齐柏林》中! 页面,选择 “齐柏林笔记”。

  5. Zeppelin Note 页面中,在新笔记中输入以下查询:

    %flink.ssql(type=update) select * from stock

    选择运行图标。

    片刻之后,笔记会显示来自 Kinesis 数据流的数据。

要打开 Apache Flink 控制面板供应用程序查看操作方面,请选择 FLINK JOB。有关 Flink 仪表板的更多信息,请参阅 Kinesis D ata Analytics 开发者指南中的 Apache Flin k 仪表板

有关 Flink 串流 SQL 查询的更多示例,请参阅 Apache Flink 文档中的查询