对于新项目,我们建议您使用适用于 Apache Flink Studio 的新托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。适用于 Apache Flink Studio 的托管服务将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
滑动窗口
您可以不使用 GROUP BY
对记录分组,而是定义基于时间或基于行的窗口。您应通过添加显式 WINDOW
子句执行此操作。
在这种情况下,当窗口随时间推移时,当直播中出现新记录时,Amazon Kinesis Data Analytics 会发出输出。Kinesis Data Analytics 通过处理窗口中的行来发出此输出。窗口在这种类型的处理中可以重叠,一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。
考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中,新记录在时间 t1、t2、t 和 t6 到达7,三条记录在时间 t8 秒到达。

记住以下内容:
-
此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。
-
对于进入窗口的每一行,滑动窗口会发送输出行。应用程序启动后不久,您会看到查询针对出现在流中的每个新记录发送输出,即使尚未经过 5 秒窗口。例如,当记录出现在第一秒和第二秒时,查询会发送输出。稍后,查询会处理 5 秒窗口中的记录。
-
该窗口随着时间滑动。如果流中的旧记录落后于窗口,查询将不会发送任何输出,除非流中也有一个新记录落在该 5 秒窗口中。
假设查询从 t 开始执行0。那么,将出现以下情况:
-
在时间 t0,查询开始。查询不发送输出 (计数值),因为此时没有记录。
-
在时间 t 时1,流上会出现一条新记录,查询发出 count 值 1。
-
在时间 t2 处,出现另一条记录,查询发出计数 2。
-
此 5 秒窗口随着时间滑动:
-
在 t 处3,滑动窗口 t3 到 t0
-
在 t4(将窗口 t 滑动4到 t0)
-
在 t5 处滑动窗口 t5 —t0
在所有这些时候,5 秒窗口的记录都是一样的,没有新记录。因此,查询不会发送任何输出。
-
-
在时间 t 时6,5 秒的窗口为(t6 到 t1)。该查询在 t 处检测到一条新记录6,因此它发出输出 2。t 处的记录1已不在窗口中,也不算在内。
-
在时间 t 时7,5 秒的窗口是 t7 到 t2。该查询在 t 处检测到一条新记录7,因此它发出输出 2。t2 处的记录不再在 5 秒窗口中,因此不计算在内。
-
在时间 t 时8,5 秒的窗口是 t8 到 t3。查询检测到三个新记录,因此发送了记录计数 5。
总之,窗口是固定大小,并且随时间滑动。当出现新记录时,查询会发送输出。
注意
我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口,应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。
以下示例查询使用 WINDOW
子句定义窗口和执行聚合。由于查询不指定 GROUP BY
,因此查询使用滑动窗口方法处理流中的记录。
示例 1:使用一个 1 分钟滑动窗口处理流
在填充应用程序内部流 SOURCE_SQL_STREAM_001
时,请考虑“入门”练习中的演示流。下面是架构。
(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)
假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说,对于出现在流中的每个新记录,您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。
您可以使用以下基于时间的窗口式查询。查询使用 WINDOW
子句定义 1 分钟范围间隔。WINDOW
子句中的 PARTITION BY
按照滑动窗口中的股票行情机值对记录进行分组。
SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT
查询替换应用程序代码中的SELECT
语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
示例 2:对滑动窗口应用聚合的查询
针对演示流的以下查询将返回一个 10 秒窗口中,每个股票行情机的价格的平均百分比变化。
SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
测试查询
-
按照入门练习设置应用程序。
-
使用先前的
SELECT
查询替换应用程序代码中的SELECT
语句。生成的应用程序代码如下。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
示例 3:从同一流的多个滑动窗口查询数据
您可以编写查询以发送输出,其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。
在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2
和 a10
列值派生自 2 行和 10 行滑动窗口。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
要针对演示流测试此查询,请按照示例 1中介绍的测试过程操作。