滑动窗口 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,我们建议您使用适用于 Apache Flink Studio 的新托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。适用于 Apache Flink Studio 的托管服务将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

滑动窗口

您可以不使用 GROUP BY 对记录分组,而是定义基于时间或基于行的窗口。您应通过添加显式 WINDOW 子句执行此操作。

在这种情况下,当窗口随时间推移时,当直播中出现新记录时,Amazon Kinesis Data Analytics 会发出输出。Kinesis Data Analytics 通过处理窗口中的行来发出此输出。窗口在这种类型的处理中可以重叠,一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。

考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中,新记录在时间 t1、t2、t 和 t6 到达7,三条记录在时间 t8 秒到达。

记住以下内容:

  • 此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。

  • 对于进入窗口的每一行,滑动窗口会发送输出行。应用程序启动后不久,您会看到查询针对出现在流中的每个新记录发送输出,即使尚未经过 5 秒窗口。例如,当记录出现在第一秒和第二秒时,查询会发送输出。稍后,查询会处理 5 秒窗口中的记录。

  • 该窗口随着时间滑动。如果流中的旧记录落后于窗口,查询将不会发送任何输出,除非流中也有一个新记录落在该 5 秒窗口中。

假设查询从 t 开始执行0。那么,将出现以下情况:

  1. 在时间 t0,查询开始。查询不发送输出 (计数值),因为此时没有记录。

  2. 在时间 t 时1,流上会出现一条新记录,查询发出 count 值 1。

  3. 在时间 t2 处,出现另一条记录,查询发出计数 2。

  4. 此 5 秒窗口随着时间滑动:

    • 在 t 处3,滑动窗口 t3 到 t0

    • 在 t4(将窗口 t 滑动4到 t0

    • 在 t5 处滑动窗口 t5 —t0

    在所有这些时候,5 秒窗口的记录都是一样的,没有新记录。因此,查询不会发送任何输出。

  5. 在时间 t 时6,5 秒的窗口为(t6 到 t1)。该查询在 t 处检测到一条新记录6,因此它发出输出 2。t 处的记录1已不在窗口中,也不算在内。

  6. 在时间 t 时7,5 秒的窗口是 t7 到 t2。该查询在 t 处检测到一条新记录7,因此它发出输出 2。t2 处的记录不再在 5 秒窗口中,因此不计算在内。

  7. 在时间 t 时8,5 秒的窗口是 t8 到 t3。查询检测到三个新记录,因此发送了记录计数 5。

总之,窗口是固定大小,并且随时间滑动。当出现新记录时,查询会发送输出。

注意

我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口,应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。

以下示例查询使用 WINDOW 子句定义窗口和执行聚合。由于查询不指定 GROUP BY,因此查询使用滑动窗口方法处理流中的记录。

示例 1:使用一个 1 分钟滑动窗口处理流

在填充应用程序内部流 SOURCE_SQL_STREAM_001 时,请考虑“入门”练习中的演示流。下面是架构。

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说,对于出现在流中的每个新记录,您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。

您可以使用以下基于时间的窗口式查询。查询使用 WINDOW 子句定义 1 分钟范围间隔。WINDOW 子句中的 PARTITION BY 按照滑动窗口中的股票行情机值对记录进行分组。

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
测试查询
  1. 按照入门练习设置应用程序。

  2. 使用先前的 SELECT 查询替换应用程序代码中的 SELECT 语句。生成的应用程序代码如下。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

示例 2:对滑动窗口应用聚合的查询

针对演示流的以下查询将返回一个 10 秒窗口中,每个股票行情机的价格的平均百分比变化。

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

测试查询
  1. 按照入门练习设置应用程序。

  2. 使用先前的 SELECT 查询替换应用程序代码中的 SELECT 语句。生成的应用程序代码如下。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

示例 3:从同一流的多个滑动窗口查询数据

您可以编写查询以发送输出,其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。

在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2a10 列值派生自 2 行和 10 行滑动窗口。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

要针对演示流测试此查询,请按照示例 1中介绍的测试过程操作。