如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。
滑动窗口
您可以不使用 GROUP BY
对记录分组,而是定义基于时间或基于行的窗口。您应通过添加显式 WINDOW
子句执行此操作。
在这种情况下,在窗口随着时间滑动时,Amazon Kinesis Data Analytics 将在流上显示新记录时发送输出。Kinesis Data Analytics 在窗口中处理行以发送此输出。窗口在这种类型的处理中可以重叠,一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。
考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中,新记录到达时间T1-t2-t6、和T7,三个记录到达时间T8 秒

记住以下内容:
-
此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。
-
对于进入窗口的每一行,滑动窗口会发送输出行。应用程序启动后不久,您会看到查询针对出现在流中的每个新记录发送输出,即使尚未经过 5 秒窗口。例如,当记录出现在第一秒和第二秒时,查询会发送输出。稍后,查询会处理 5 秒窗口中的记录。
-
该窗口随着时间滑动。如果流中的旧记录落后于窗口,查询将不会发送任何输出,除非流中也有一个新记录落在该 5 秒窗口中。
假设查询开始在T时执行0那么,将出现以下情况:
-
时间T0,查询开始。查询不发送输出 (计数值),因为此时没有记录。
-
时间T1,流上会出现一个新记录,查询会发出计数值1。
-
时间T2,显示另一个记录,查询发出计数2。
-
此 5 秒窗口随着时间滑动:
-
T3,滑动窗口T3 至T0
-
T4(滑动窗口T)4 至T0)
-
T5 滑动窗口T5–t0
在所有这些时间,5 秒窗口具有相同的记录 — 没有新记录。因此,查询不会发送任何输出。
-
-
时间T6,5秒窗口是6 至T1)。查询在T中检测到一个新记录6 因此它会发出输出2。T的记录1 不再在窗口中且不计数。
-
时间T7,5秒窗口为T7 至T2。查询在T中检测到一个新记录7 因此它会发出输出2。T的记录2 不再在5秒窗口内,因此不计数。
-
时间T8,5秒窗口为T8 至T3查询检测到三个新记录,因此发送了记录计数 5。
总之,窗口是固定大小,并且随时间滑动。当出现新记录时,查询会发送输出。
我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口,应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。
以下示例查询使用 WINDOW
子句定义窗口和执行聚合。由于查询不指定 GROUP BY
,因此查询使用滑动窗口方法处理流中的记录。
示例 #1 使用1分钟滑动窗口进行流程
考虑在应用程序内流中填充演示流的演示流, SOURCE_SQL_STREAM_001
。以下是schema。
(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)
假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说,对于出现在流中的每个新记录,您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。
您可以使用以下基于时间的窗口式查询。查询使用 WINDOW
子句定义 1 分钟范围间隔。WINDOW
子句中的 PARTITION BY
按照滑动窗口中的股票行情机值对记录进行分组。
SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT
查询替换应用程序代码中的SELECT
语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
示例 2 在滑动窗口上应用聚合物的查询
针对演示流的以下查询将返回一个 10 秒窗口中,每个股票行情机的价格的平均百分比变化。
SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT
查询替换应用程序代码中的SELECT
语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
示例 3 从同一流的多个滑动窗口查询数据
您可以编写查询以发送输出,其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。
在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2
和 a10
列值派生自 2 行和 10 行滑动窗口。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
要针对演示流测试此查询,请按照示例 #1中介绍的测试过程操作。