流的窗口式聚合 - Amazon Kinesis Data Analytics SQL 参考
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

流的窗口式聚合

为了说明窗口式聚合在 Amazon Kinesis Data Streams 中的工作原理,假定下表中的数据将流经一个名为 WEATHERSTREAM 的流。

ROWTIME CITY TEMP

2018-11-01 01:00:00.0

丹佛

29

2018-11-01 01:00:00.0

安克雷奇

2

2018-11-01 06:00:00.0

迈阿密

65

2018-11-01 07:00:00.0

丹佛

32

2018-11-01 09:00:00.0

安克雷奇

9

2018-11-01 13:00:00.0

丹佛

50

2018-11-01 17:00:00.0

安克雷奇

10

2018-11-01 18:00:00.0

迈阿密

71

2018-11-01 19:00:00.0

丹佛

43

2018-11-02 01:00:00.0

安克雷奇

4

2018-11-02 01:00:00.0

丹佛

39

2018-11-02 07:00:00.0

丹佛

46

2018-11-02 09:00:00.0

安克雷奇

3

2018-11-02 13:00:00.0

丹佛

56

2018-11-02 17:00:00.0

安克雷奇

2

2018-11-02 19:00:00.0

丹佛

50

2018-11-03 01:00:00.0

丹佛

36

2018-11-03 01:00:00.0

安克雷奇

1

假设您要查找 24 小时期间内记录的全球(无论是哪个城市)的最低和最高温度(在任意给定读数之前)。为此,您需要定义 RANGE INTERVAL '1' DAY PRECEDING 的一个窗口,并在 MINMAX 分析函数的 OVER 子句中使用它:

SELECT STREAM        ROWTIME,        MIN(TEMP) OVER W1 AS WMIN_TEMP,        MAX(TEMP) OVER W1 AS WMAX_TEMP FROM WEATHERSTREAM WINDOW W1 AS (    RANGE INTERVAL '1' DAY PRECEDING );

结果

ROWTIME WMIN_TEMP WMAX_TEMP

2018-11-01 01:00:00.0

29

29

2018-11-01 01:00:00.0

2

29

2018-11-01 06:00:00.0

2

65

2018-11-01 07:00:00.0

2

65

2018-11-01 09:00:00.0

2

65

2018-11-01 13:00:00.0

2

65

2018-11-01 17:00:00.0

2

65

2018-11-01 18:00:00.0

2

71

2018-11-01 19:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 07:00:00.0

4

71

2018-11-02 09:00:00.0

3

71

2018-11-02 13:00:00.0

3

71

2018-11-02 17:00:00.0

2

71

2018-11-02 19:00:00.0

2

56

2018-11-03 01:00:00.0

2

56

2018-11-03 01:00:00.0

1

56

现在,假定您要查找在 24 小时期间内记录的在任意给定读数之前的最低、最高和平均温度(按城市划分)。为此,您可以向窗口规范添加针对 CITYPARTITION BY 子句,并向选择列表添加针对同一个窗口的 AVG 分析函数:

SELECT STREAM        ROWTIME,        CITY,        MIN(TEMP) over W1 AS WMIN_TEMP,        MAX(TEMP) over W1 AS WMAX_TEMP,        AVG(TEMP) over W1 AS WAVG_TEMP FROM AGGTEST.WEATHERSTREAM WINDOW W1 AS (        PARTITION BY CITY        RANGE INTERVAL '1' DAY PRECEDING );

结果

ROWTIME CITY WMIN_TEMP WMAX_TEMP WAVG_TEMP

2018-11-01 01:00:00.0

丹佛

29

29

29

2018-11-01 01:00:00.0

安克雷奇

2

2

2

2018-11-01 06:00:00.0

迈阿密

65

65

65

2018-11-01 07:00:00.0

丹佛

29

32

30

2018-11-01 09:00:00.0

安克雷奇

2

9

5

2018-11-01 13:00:00.0

丹佛

29

50

37

2018-11-01 17:00:00.0

安克雷奇

2

10

7

2018-11-01 18:00:00.0

迈阿密

65

71

68

2018-11-01 19:00:00.0

丹佛

29

50

38

2018-11-02 01:00:00.0

安克雷奇

2

10

6

2018-11-02 01:00:00.0

丹佛

29

50

38

2018-11-02 07:00:00.0

丹佛

32

50

42

2018-11-02 09:00:00.0

安克雷奇

3

10

6

2018-11-02 13:00:00.0

丹佛

39

56

46

2018-11-02 17:00:00.0

安克雷奇

2

10

4

2018-11-02 19:00:00.0

丹佛

39

56

46

2018-11-03 01:00:00.0

丹佛

36

56

45

2018-11-03 01:00:00.0

安克雷奇

1

4

2

行时间边界和窗口式聚合示例

以下是窗口式聚合查询的一个示例:

SELECT STREAM ROWTIME, ticker, amount, SUM(amount)    OVER (        PARTITION BY ticker        RANGE INTERVAL '1' HOUR PRECEDING) AS hourlyVolume FROM Trades

因为这是对流的查询,所以行一进入就会弹出此查询。例如,给定以下输入:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 10:25:00 Trades: IBM 30 11:05:00 Trades.bound: 11:10:00

在此示例中,输出如下:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 35 10:25:00 Trades: IBM 30 30 11:05:00 Trades.bound: 11:10:00

这些行仍然在后台逗留一个小时,因此第二个 ORCL 行输出的总计为 35;但原始 IBM 交易落在“上一个小时”窗口外,因此它未包含在 IBM 总计中。

示例

有些业务问题似乎需要对流的整个历史进行总计,但这通常无法计算。但是,此类业务问题通常可以通过查看最后一天、最后一小时或最近 N 条记录来解决。此类记录的集合称为窗口化聚合

它们在流数据库中易于计算,可以用 ANSI (SQL:2008) 标准 SQL 表示,如下所示:

SELECT STREAM ticker,  avg(price) OVER lastHour AS avgPrice,      max(price) OVER lastHour AS maxPrice   FROM Bids   WINDOW lastHour AS  (      PARTITION BY ticker      RANGE INTERVAL '1' HOUR PRECEDING)

注意

Interval_clause 必须属于以下合适的类型之一:

  • 包含 ROWS 的整数文字

  • 数字列上的 RANGE 的数字值

  • 范围超过 a 的间隔 date/time/timestamp