滑动窗口 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

滑动窗口

您可以不使用 GROUP BY 对记录分组,而是定义基于时间或基于行的窗口。您应通过添加显式 WINDOW 子句执行此操作。

在这种情况下,在窗口随着时间滑动时,Amazon Kinesis Data Analytics 将在流上显示新记录时发送输出。Kinesis Data Analytics 在窗口中处理行以发送此输出。窗口在这种类型的处理中可以重叠,一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。

考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中,新记录到达时间T1-t2-t6、和T7,三个记录到达时间T8

记住以下内容:

  • 此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。

  • 对于进入窗口的每一行,滑动窗口会发送输出行。应用程序启动后不久,您会看到查询针对出现在流中的每个新记录发送输出,即使尚未经过 5 秒窗口。例如,当记录出现在第一秒和第二秒时,查询会发送输出。稍后,查询会处理 5 秒窗口中的记录。

  • 该窗口随着时间滑动。如果流中的旧记录落后于窗口,查询将不会发送任何输出,除非流中也有一个新记录落在该 5 秒窗口中。

假设查询开始在T时执行0那么,将出现以下情况:

  1. 时间T0,查询开始。查询不发送输出 (计数值),因为此时没有记录。

  2. 时间T1,流上会出现一个新记录,查询会发出计数值1。

  3. 时间T2,显示另一个记录,查询发出计数2。

  4. 此 5 秒窗口随着时间滑动:

    • T3,滑动窗口T3 至T0

    • T4(滑动窗口T)4 至T0)

    • T5 滑动窗口T5–t0

    在所有这些时间,5 秒窗口具有相同的记录 — 没有新记录。因此,查询不会发送任何输出。

  5. 时间T6,5秒窗口是6 至T1)。查询在T中检测到一个新记录6 因此它会发出输出2。T的记录1 不再在窗口中且不计数。

  6. 时间T7,5秒窗口为T7 至T2。查询在T中检测到一个新记录7 因此它会发出输出2。T的记录2 不再在5秒窗口内,因此不计数。

  7. 时间T8,5秒窗口为T8 至T3查询检测到三个新记录,因此发送了记录计数 5。

总之,窗口是固定大小,并且随时间滑动。当出现新记录时,查询会发送输出。

注意

我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口,应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。

以下示例查询使用 WINDOW 子句定义窗口和执行聚合。由于查询不指定 GROUP BY,因此查询使用滑动窗口方法处理流中的记录。

示例 #1 使用1分钟滑动窗口进行流程

考虑在应用程序内流中填充演示流的演示流, SOURCE_SQL_STREAM_001。以下是schema。

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说,对于出现在流中的每个新记录,您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。

您可以使用以下基于时间的窗口式查询。查询使用 WINDOW 子句定义 1 分钟范围间隔。WINDOW 子句中的 PARTITION BY 按照滑动窗口中的股票行情机值对记录进行分组。

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

测试查询

  1. 按照入门练习设置应用程序。

  2. 使用先前的 SELECT 查询替换应用程序代码中的 SELECT 语句。生成的应用程序代码如下。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

示例 2 在滑动窗口上应用聚合物的查询

针对演示流的以下查询将返回一个 10 秒窗口中,每个股票行情机的价格的平均百分比变化。

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

测试查询

  1. 按照入门练习设置应用程序。

  2. 使用先前的 SELECT 查询替换应用程序代码中的 SELECT 语句。生成的应用程序代码如下。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

示例 3 从同一流的多个滑动窗口查询数据

您可以编写查询以发送输出,其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。

在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2a10 列值派生自 2 行和 10 行滑动窗口。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

要针对演示流测试此查询,请按照示例 #1中介绍的测试过程操作。