滑动窗口 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

滑动窗口

您可以不使用 GROUP BY 对记录分组,而是定义基于时间或基于行的窗口。您应通过添加显式 WINDOW 子句执行此操作。

在这种情况下,当窗口随着时间滑动时,只要流中出现新记录,Amazon Kinesis Data Analytics 就会发送输出。Kinesis Data Analytics 将会通过在此窗口中处理行来发送此输出。窗口在这种类型的处理中可以重叠,一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。

考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中,新记录分别于时间 t1、t2、t6 和 t7 到达,有三个记录于时间 t8 秒时到达。

记住以下内容:

  • 此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。

  • 对于进入窗口的每一行,滑动窗口会发送输出行。应用程序启动后不久,您会看到查询针对出现在流中的每个新记录发送输出,即使尚未经过 5 秒窗口。例如,当记录出现在第一秒和第二秒时,查询会发送输出。稍后,查询会处理 5 秒窗口中的记录。

  • 该窗口随着时间滑动。如果流中的旧记录落后于窗口,查询将不会发送任何输出,除非流中也有一个新记录落在该 5 秒窗口中。

假设查询在 t0 开始执行。那么,将出现以下情况:

  1. 在时间 t0,查询开始执行。查询不发送输出 (计数值),因为此时没有记录。

  2. 在时间 t1 处,新记录出现在流中,并且查询发送计数值 1。

  3. 在时间 t2,出现另一个记录,并且查询发送计数 2。

  4. 此 5 秒窗口随着时间滑动:

    • 在 t3 处,滑动窗口为 t3 到 t0

    • 在 t4 处 (滑动窗口为 t4 到 t0)

    • 在 t5 处,滑动窗口为 t5 到 t0

    在所有这些时间,5 秒窗口具有相同的记录——没有新记录。因此,查询不会发送任何输出。

  5. 在时间 t6 处,此 5 秒窗口为 (t6 到 t1)。查询在 t6 处检测到一个新记录,因此它发送了输出 2。t1 处的记录不再位于窗口中,计数时不考虑。

  6. 在时间 t7 处,此 5 秒窗口为 (t7 到 t2)。查询在 t7 处检测到一个新记录,因此它发送了输出 2。t2 处的记录不再位于 5 秒窗口中,因此计数时不考虑。

  7. 在时间 t8 处,此 5 秒窗口为 (t8 到 t3)。查询检测到三个新记录,因此发送了记录计数 5。

总之,窗口是固定大小,并且随时间滑动。当出现新记录时,查询会发送输出。

注意

我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口,应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。

以下示例查询使用 WINDOW 子句定义窗口和执行聚合。由于查询不指定 GROUP BY,因此查询使用滑动窗口方法处理流中的记录。

示例 1:使用一个 1 分钟滑动窗口处理流

在填充应用程序内部流 SOURCE_SQL_STREAM_001 时,请考虑“入门”练习中的演示流。下面是架构。

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说,对于出现在流中的每个新记录,您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。

您可以使用以下基于时间的窗口式查询。查询使用 WINDOW 子句定义 1 分钟范围间隔。WINDOW 子句中的 PARTITION BY 按照滑动窗口中的股票行情机值对记录进行分组。

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
测试查询
  1. 按照入门练习设置应用程序。

  2. 使用先前的 SELECT 查询替换应用程序代码中的 SELECT 语句。生成的应用程序代码如下。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

示例 2:对滑动窗口应用聚合的查询

针对演示流的以下查询将返回一个 10 秒窗口中,每个股票行情机的价格的平均百分比变化。

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

测试查询
  1. 按照入门练习设置应用程序。

  2. 使用先前的 SELECT 查询替换应用程序代码中的 SELECT 语句。生成的应用程序代码如下。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

示例 3:从同一流的多个滑动窗口查询数据

您可以编写查询以发送输出,其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。

在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2a10 列值派生自 2 行和 10 行滑动窗口。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

要针对演示流测试此查询,请按照示例 1中介绍的测试过程操作。