示例:使用 ROWTIME 的滚动窗口 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

示例:使用 ROWTIME 的滚动窗口

当一个窗口式查询以非重叠方式处理每个窗口时,这样的窗口称为滚动窗口。有关详细信息,请参阅 滚动窗口(使用 GROUP BY 组的聚合)。此 Amazon Kinesis Data Analytics 示例使用 ROWTIME 列创建滚动窗口。ROWTIME 列表示应用程序读取该记录的时间。

在本示例中,您将以下记录写入到 Kinesis 数据流中。

{"TICKER": "TBV", "PRICE": 33.11} {"TICKER": "INTC", "PRICE": 62.04} {"TICKER": "MSFT", "PRICE": 40.97} {"TICKER": "AMZN", "PRICE": 27.9} ...

然后,您在 AWS 管理控制台中创建一个 Kinesis Data Analytics 应用程序,并将 Kinesis 数据流作为流式传输源。发现过程读取流式传输源中的示例记录,并推断出具有两个列(TICKERPRICE)的如下所示的应用程序内部架构。


                控制台屏幕截图,显示具有价格和股票代码列的应用程序内部架构。

您使用应用程序代码以及 MINMAX 函数以创建数据的窗口式聚合。然后,将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:


                控制台屏幕截图,显示应用程序内部流中的结果数据。

在以下过程中,您创建一个 Kinesis Data Analytics 应用程序,它在基于 ROWTIME 的滚动窗口中聚合输入流中的值。

步骤 1. 创建 Kinesis 数据流

创建一个 Amazon Kinesis 数据流并填充记录,如下所示:

  1. 登录 AWS 管理控制台并通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 在导航窗格中,选择 Data Streams (数据流)

  3. 选择 Create Kinesis stream (创建 Kinesis 流),然后创建具有一个分片的流。有关更多信息,请参阅 https://docs.amazonaws.cn/streams/latest/dev/learning-kinesis-module-one-create-stream.html 开发人员指南中的Amazon Kinesis Data Streams创建流

  4. 要在生产环境中将记录写入到 Kinesis 数据流,我们建议您使用 Kinesis 客户端库Kinesis 数据流 API。为简单起见,此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码不断地将随机的股票代码记录写入到流中。让脚本保持运行,以便可以在后面的步骤中生成应用程序架构。

    import json import boto3 import random import datetime kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['EVENT_TIME'] = str_now data['TICKER'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['PRICE'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")

步骤 2. 创建 Kinesis Data Analytics 应用程序

创建一个 Kinesis Data Analytics 应用程序,如下所示:

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 选择 Create application (创建应用程序),输入应用程序名称,然后选择 Create application (创建应用程序)

  3. 在应用程序详细信息页面上,选择 Connect streaming data (连接流数据),以连接到源。

  4. Connect to source (连接到源) 页面上,执行以下操作:

    1. 选择在上一部分中创建的流。

    2. 选择 Discover Schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

    3. 选择 Save schema and update stream samples。在控制台保存架构后,选择 Exit (退出)

    4. 选择 Save and continue

  5. 在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)

  6. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中:

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
    2. 选择 Save and run SQL

      Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。