示例:交错窗口 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

示例:交错窗口

当窗口化查询处理每个唯一分区键的单独窗口时,从具有匹配键的数据到达时开始,该窗口被称为交错窗口。有关详细信息,请参阅 交错窗口。此 Amazon Kinesis Data Analytics 示例使用 EVENT_TIME 和 TICKER 列创建交错窗口。源流包含具有相同 EVENT_TIME 和 TICKER 值的六个记录组成的组,这些值在一分钟时间内到达,但不一定具有相同的分钟值(例如 18:41:xx)。

在本示例中,您在以下时间将以下记录写入到 Kinesis 数据流中。该脚本不会将时间写入流,但应用程序接收记录的时间将写入 ROWTIME 字段:

{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:30 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:40 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:50 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:00 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:10 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:21 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:31 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:41 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:51 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:01 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:11 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:21 ...

然后,您在 AWS 管理控制台中创建一个 Kinesis Data Analytics 应用程序,并将 Kinesis 数据流作为流式传输源。发现过程读取流式传输源中的示例记录,并推断出具有两个列(EVENT_TIMETICKER)的如下所示的应用程序内部架构。


                控制台屏幕截图,显示具有价格和股票代码列的应用程序内部架构。

您使用应用程序代码以及 COUNT 函数以创建数据的窗口式聚合。然后,将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:


                控制台屏幕截图,显示应用程序内部流中的结果数据。

在以下过程中,您创建一个 Kinesis Data Analytics 应用程序,它在基于 EVENT_TIME 和 TICKER 的交错窗口中聚合输入流中的值。

步骤 1. 创建 Kinesis 数据流

创建一个 Amazon Kinesis 数据流并填充记录,如下所示:

  1. 登录 AWS 管理控制台并通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 在导航窗格中,选择 Data Streams (数据流)

  3. 选择 Create Kinesis stream (创建 Kinesis 流),然后创建具有一个分片的流。有关更多信息,请参阅 https://docs.amazonaws.cn/streams/latest/dev/learning-kinesis-module-one-create-stream.html 开发人员指南中的Amazon Kinesis Data Streams创建流

  4. 要在生产环境中将记录写入到 Kinesis 数据流,我们建议您使用 Kinesis 创建器库Kinesis 数据流 API。为简单起见,此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码在一分钟时间中连续地将一组六个记录与相同的随机 EVENT_TIME 和股票代码符号一起写入流中。让脚本保持运行,以便可以在后面的步骤中生成应用程序架构。

    import json import boto3 import random import datetime import time kinesis = boto3.client('kinesis') def getData(): data = {} now = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) str_now = now.isoformat() data['EVENT_TIME'] = str_now data['TICKER'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) return data while True: data = json.dumps(getData()) # Send six records, ten seconds apart, with the same event time and ticker for x in range(0, 6): print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey") time.sleep(10)

步骤 2. 创建 Kinesis Data Analytics 应用程序

创建一个 Kinesis Data Analytics 应用程序,如下所示:

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 选择 Create application (创建应用程序),键入应用程序名称,然后选择 Create application (创建应用程序)

  3. 在应用程序详细信息页面上,选择 Connect streaming data (连接流数据),以连接到源。

  4. Connect to source (连接到源) 页面上,执行以下操作:

    1. 选择在上一部分中创建的流。

    2. 选择 Discover Schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

    3. 选择 Edit Schema (编辑架构)。将 EVENT_TIME 列的 Column type (列类型) 更改为 TIMESTAMP

    4. 选择 Save schema and update stream samples。在控制台保存架构后,选择 Exit (退出)

    5. 选择 Save and continue

  5. 在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)

  6. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中:

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
    2. 选择 Save and run SQL

      Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。