示例:从查询中聚合部分结果 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

示例:从查询中聚合部分结果

如果 Amazon Kinesis 数据流包含的记录具有与提取时间不完全匹配的事件时间,在滚动窗口中选择的结果将包含在窗口中到达但未必发生的记录。在这种情况下,滚动窗口只包含您需要的部分结果集。您可以通过多种方法来纠正这一问题:

  • 仅使用滚动窗口,并使用 upsert 通过数据库或数据仓库在后处理中聚合部分结果。这种方法在处理应用程序时很有效。它为聚合运算符(summinmax 等)无限期地处理后期数据。这种方法的缺点是,您必须在数据库层开发和维护额外的应用程序逻辑。

  • 使用滚动和滑动窗口,这会在早期生成部分结果,还会继续在滑动窗口期间生成完整的结果。此方法使用 overwrite 操作而非 upsert 操作来处理新近数据,这样就不需要在数据库层添加任何其他应用程序逻辑。这种方法的缺点是,它使用更多 Kinesis 处理单元 (KPU),并且仍生成两个结果,这可能不适用于某些使用案例。

有关滚动和滑动窗口的更多信息,请参阅窗口式查询

在以下过程中,滚动窗口聚合会生成两个部分结果(发送到 CALC_COUNT_SQL_STREAM 应用程序内部流),它们必须合并以生成最终结果。然后,应用程序生成第二个聚合(发送到 DESTINATION_SQL_STREAM 应用程序内部流),它合并这两个部分结果。

创建使用事件时间聚合部分结果的应用程序

  1. 登录 AWS 管理控制台并通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 在导航窗格中,选择 Data Analytics (数据分析)。按照教程中所述,创建一个 Kinesis Data Analytics 应用程序。

  3. 在 SQL 编辑器中,将应用程序代码替换为以下内容:

    CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);

    应用程序代码中的 SELECT 语句将在 SOURCE_SQL_STREAM_001 中筛选出显示股票价格更改大于 1% 的行,并使用数据泵将这些行插入另一个应用程序内部流 CHANGE_STREAM

  4. 选择 Save and run SQL

第一个数据泵将流输出到与以下内容类似的 CALC_COUNT_SQL_STREAM。请注意,结果集不完整:


                显示部分结果的控制台屏幕截图。

然后,第二个泵将流输出到 DESTINATION_SQL_STREAM,其中包含完整的结果集:


                显示完整结果的控制台屏幕截图。