示例:从查询中聚合部分结果 - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:从查询中聚合部分结果

如果 Amazon Kinesis 数据流包含的记录具有与提取时间不完全匹配的事件时间,在滚动窗口中选择的结果将包含在窗口中到达但未必发生的记录。在这种情况下,滚动窗口只包含您需要的部分结果集。您可以通过多种方法来纠正这一问题:

  • 仅使用滚动窗口,并使用 upsert 通过数据库或数据仓库在后处理中聚合部分结果。这种方法在处理应用程序时很有效。它为聚合运算符(summinmax 等)无限期地处理后期数据。这种方法的缺点是,您必须在数据库层开发和维护额外的应用程序逻辑。

  • 使用滚动和滑动窗口,这会在早期生成部分结果,还会继续在滑动窗口期间生成完整的结果。此方法使用 overwrite 操作而非 upsert 操作来处理新近数据,这样就不需要在数据库层添加任何其他应用程序逻辑。这种方法的缺点是,它使用更多 Kinesis 处理单元 (KPU),并且仍生成两个结果,这可能不适用于某些使用案例。

有关滚动和滑动窗口的更多信息,请参阅窗口式查询

在以下过程中,滚动窗口聚合会生成两个部分结果(发送到 CALC_COUNT_SQL_STREAM 应用程序内部流),它们必须合并以生成最终结果。然后,应用程序生成第二个聚合(发送到 DESTINATION_SQL_STREAM 应用程序内部流),它合并这两个部分结果。

创建使用事件时间聚合部分结果的应用程序
  1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

  2. 在导航窗格中,选择 Data Analytics (数据分析)。按照 Amazon Kinesis Data Analytics·for·SQL 应用程序入门 教程中所述,创建一个 Kinesis Data Analytics 应用程序。

  3. 在 SQL 编辑器中,将应用程序代码替换为以下内容:

    CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);

    应用程序代码中的 SELECT 语句将在 SOURCE_SQL_STREAM_001 中筛选出显示股票价格更改大于 1% 的行,并使用数据泵将这些行插入另一个应用程序内部流 CHANGE_STREAM

  4. 选择 保存并运行 SQL

第一个数据泵将流输出到与以下内容类似的 CALC_COUNT_SQL_STREAM。请注意,结果集不完整:

显示部分结果的控制台屏幕截图。

然后,第二个泵将流输出到 DESTINATION_SQL_STREAM,其中包含完整的结果集:

显示完整结果的控制台屏幕截图。