例如:使用事件时间戳的滚动窗口 - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

例如:使用事件时间戳的滚动窗口

当一个窗口式查询以非重叠方式处理每个窗口时,这样的窗口称为滚动窗口。有关详细信息,请参阅 滚动窗口(使用 GROUP BY 组的聚合)。此 Amazon Kinesis Data Analytics 示例演示一个滚动窗口,此滚动窗口使用事件时间戳,这是一个用户创建的包含在流数据中的时间戳。它使用这种方法而不是只使用 ROWTIME,这是 Kinesis Data Analytics 在应用程序收到记录时创建的时间戳。如果您想根据事件发生的时间而非应用程序收到此事件的时间创建一个聚合,则可以在流数据中使用事件时间戳。在本例中,ROWTIME 值每分钟触发一次此聚合,ROWTIME 和所包含事件时间都会聚合记录。

在本示例中,您将以下记录写入到 Amazon Kinesis 流中。这些区域有:EVENT_TIME值在过去设置为 5 秒以模拟处理和传输滞后,而滞后可能导致在发生事件和将记录提取到 Kinesis Data Analytics 之间出现延迟。

{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65} {"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61} {"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48} {"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64} ...

然后,您创建 Kinesis Data Analytics 应用程序。Amazon Web Services Management Console,将 Kinesis 数据流作为流式源。发现过程读取流式传输源中的示例记录,并推断出具有三个列(EVENT_TIMETICKERPRICE)的如下所示的应用程序内部架构。


                控制台屏幕截图,显示具有事件时间、股票代码和价格列的应用程序内部架构。

您使用应用程序代码以及 MINMAX 函数以创建数据的窗口式聚合。然后,将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:


                控制台屏幕截图,显示应用程序内部流中的结果数据。

在以下过程中,您创建一个 Kinesis Data Analytics 应用程序,此应用程序在基于事件时间的滚动窗口中聚合输入流中的值。

第 1 步:创建 Kinesis Data Streams

创建 Amazon Kinesis 数据流并填充记录,如下所示:

  1. 登录到Amazon Web Services Management Console,然后打开 Kinesis 控制台https://console.aws.amazon.com/kinesis

  2. 在导航窗格中,选择 Data Streams (数据流)

  3. 选择 Create Kinesis stream (创建 Kinesis 流),然后创建具有一个分片的流。有关更多信息,请参阅 。创建流中的Amazon Kinesis Data Streams 开发人员指南

  4. 要在生产环境中将记录写入到 Kinesis 数据流,我们建议您使用 Kinesis 客户端库Kinesis 数据流 API。为简单起见,此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码不断地将随机的股票代码记录写入到流中。让脚本保持运行,以便可以在后面的步骤中生成应用程序架构。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'EVENT_TIME': datetime.datetime.now().isoformat(), 'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'PRICE': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

第 2 步:创建 Kinesis Data Analytics 应用程序

按如下方式创建 Kinesis Data Analytics 应用程序:

  1. 打开 Kinesis Data Analytics 控制台,网站为https://console.aws.amazon.com/kinesisanalytics

  2. 选择 Create application (创建应用程序),输入应用程序名称,然后选择 Create application (创建应用程序)

  3. 在应用程序详细信息页面上,选择 Connect streaming data (连接流数据),以连接到源。

  4. Connect to source (连接到源) 页面上,执行以下操作:

    1. 选择在上一部分中创建的流。

    2. 选择 Discover Schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有三列。

    3. 选择 Edit Schema (编辑架构)。将 EVENT_TIME 列的 Column type (列类型) 更改为 TIMESTAMP

    4. 选择 Save schema and update stream samples。在控制台保存架构后,选择 Exit (退出)

    5. 选择 Save and continue

  5. 在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)

  6. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND), TICKER, MIN(PRICE) AS MIN_PRICE, MAX(PRICE) AS MAX_PRICE FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
    2. 选择 Save and run SQL

      Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。