流的窗口化聚合 - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

流的窗口化聚合

为了说明窗口化聚合对于 Amazon Kinesis 数据流的工作方式,假定下表中的数据将流经一个名为 WEATHERSTREAM 的流。

ROWTIME CITY TEMP

2018-11-01 01:00:00.0

Denver

29

2018-11-01 01:00:00.0

Anchorage

2

2018-11-01 06:00:00.0

Miami

65

2018-11-01 07:00:00.0

Denver

32

2018-11-01 09:00:00.0

Anchorage

9

2018-11-01 13:00:00.0

Denver

50

2018-11-01 17:00:00.0

Anchorage

10

2018-11-01 18:00:00.0

Miami

71

2018-11-01 19:00:00.0

Denver

43

2018-11-02 01:00:00.0

Anchorage

4

2018-11-02 01:00:00.0

Denver

39

2018-11-02 07:00:00.0

Denver

46

2018-11-02 09:00:00.0

Anchorage

3

2018-11-02 13:00:00.0

Denver

56

2018-11-02 17:00:00.0

Anchorage

2

2018-11-02 19:00:00.0

Denver

50

2018-11-03 01:00:00.0

Denver

36

2018-11-03 01:00:00.0

Anchorage

1

假设您要查找 24 小时期间内记录的全球(无论是哪个城市)的最低和最高温度(在任意给定读数之前)。为此,您需要定义 RANGE INTERVAL '1' DAY PRECEDING 的一个窗口,并在 MINMAX 分析函数的 OVER 子句中使用它:

SELECT STREAM        ROWTIME,        MIN(TEMP) OVER W1 AS WMIN_TEMP,        MAX(TEMP) OVER W1 AS WMAX_TEMP FROM WEATHERSTREAM WINDOW W1 AS (    RANGE INTERVAL '1' DAY PRECEDING );

结果

ROWTIME WMIN_TEMP WMAX_TEMP

2018-11-01 01:00:00.0

29

29

2018-11-01 01:00:00.0

2

29

2018-11-01 06:00:00.0

2

65

2018-11-01 07:00:00.0

2

65

2018-11-01 09:00:00.0

2

65

2018-11-01 13:00:00.0

2

65

2018-11-01 17:00:00.0

2

65

2018-11-01 18:00:00.0

2

71

2018-11-01 19:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 07:00:00.0

4

71

2018-11-02 09:00:00.0

3

71

2018-11-02 13:00:00.0

3

71

2018-11-02 17:00:00.0

2

71

2018-11-02 19:00:00.0

2

56

2018-11-03 01:00:00.0

2

56

2018-11-03 01:00:00.0

1

56

现在,假定您要查找在 24 小时期间内记录的在任意给定读数之前的最低、最高和平均温度(按城市划分)。为此,您可以向窗口规范添加针对 CITYPARTITION BY 子句,并向选择列表添加针对同一个窗口的 AVG 分析函数:

SELECT STREAM        ROWTIME,        CITY,        MIN(TEMP) over W1 AS WMIN_TEMP,        MAX(TEMP) over W1 AS WMAX_TEMP,        AVG(TEMP) over W1 AS WAVG_TEMP FROM AGGTEST.WEATHERSTREAM WINDOW W1 AS (        PARTITION BY CITY        RANGE INTERVAL '1' DAY PRECEDING );

结果

ROWTIME CITY WMIN_TEMP WMAX_TEMP WAVG_TEMP

2018-11-01 01:00:00.0

Denver

29

29

29

2018-11-01 01:00:00.0

Anchorage

2

2

2

2018-11-01 06:00:00.0

Miami

65

65

65

2018-11-01 07:00:00.0

Denver

29

32

30

2018-11-01 09:00:00.0

Anchorage

2

9

5

2018-11-01 13:00:00.0

Denver

29

50

37

2018-11-01 17:00:00.0

Anchorage

2

10

7

2018-11-01 18:00:00.0

Miami

65

71

68

2018-11-01 19:00:00.0

Denver

29

50

38

2018-11-02 01:00:00.0

Anchorage

2

10

6

2018-11-02 01:00:00.0

Denver

29

50

38

2018-11-02 07:00:00.0

Denver

32

50

42

2018-11-02 09:00:00.0

Anchorage

3

10

6

2018-11-02 13:00:00.0

Denver

39

56

46

2018-11-02 17:00:00.0

Anchorage

2

10

4

2018-11-02 19:00:00.0

Denver

39

56

46

2018-11-03 01:00:00.0

Denver

36

56

45

2018-11-03 01:00:00.0

Anchorage

1

4

2

行时间边界和窗口化聚合的示例

下面是窗口化聚合查询的示例:

SELECT STREAM ROWTIME, ticker, amount, SUM(amount)    OVER (        PARTITION BY ticker        RANGE INTERVAL '1' HOUR PRECEDING) AS hourlyVolume FROM Trades

由于这是针对流的查询,当行进入后,它们将立即弹出此查询。例如,给定以下输入:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 10:25:00 Trades: IBM 30 11:05:00 Trades.bound: 11:10:00

在本示例中,输出将如下所示:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 35 10:25:00 Trades: IBM 30 30 11:05:00 Trades.bound: 11:10:00

这些行仍然在后台逗留一个小时,因此第二个 ORCL 行输出的总计为 35;但原始 IBM 交易落在“上一个小时”窗口外,因此它未包含在 IBM 总计中。

示例

某些业务问题似乎需要汇总流的整个历史记录,但这对于计算通常不实用。但是,此类业务问题通常可通过观察最近一天、最近一小时或最近 N 个记录来解决。此类记录的集合称为窗口化聚合

它们很容易在流数据库中计算,并且可采用 ANSI (SQL:2008) 标准 SQL 表示,如下所示:

SELECT STREAM ticker,  avg(price) OVER lastHour AS avgPrice,      max(price) OVER lastHour AS maxPrice   FROM Bids   WINDOW lastHour AS  (      PARTITION BY ticker      RANGE INTERVAL '1' HOUR PRECEDING)

注意

Interval_clause 必须属于以下合适的类型之一:

  • 带 ROWS 的整数文本

  • 数字列中的 RANGE 的数字值

  • 日期/时间/时间戳内的 RANGE 的 INTERVAL