Group Rank - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

Group Rank

此函数会将 RANK() 函数应用于行的逻辑组,并且(可选)按排序顺序传送该组。

group_rank 的应用包括:

  • 对流式处理 GROUP BY 的结果进行排序。

  • 确定组的结果内的关系。

Group Rank 可执行以下操作:

  • 将排名应用于指定的输入列。

  • 提供已排序或未排序的输出。

  • 使用户能够为数据刷新指定一段不活动的时间。

SQL 声明

函数属性和 DDL 将在后面的部分中介绍。

Group_Rank 的函数属性

此函数的行为方式如下:

  • 在检测到行时间更改或超过指定的空闲时间限制之前收集行。

  • 接受任何流式处理行集。

  • 使用包含基本 SQL 数据类型 INTEGERCHARVARCHAR 的任何列作为排名依据。

  • 按接收行的顺序或者按所选列中的值的升序或降序对输出行进行排序。

Group_Rank 的 DDL

group_rank(c cursor, rankByColumnName VARCHAR(128),    rankOutColumnName VARCHAR(128), sortOrder VARCHAR(10), outputOrder VARCHAR(10),    maxIdle INTEGER, outputMax INTEGER)  returns table(c.*, "groupRank" INTEGER)

下表中列出了该函数的参数。

参数 说明
c 用于流式处理结果集的 CURSOR

rankByColumnName

一个字符串,指定要用于对组进行排名的列。

rankOutColumnName

一个字符串,指定要用于返回排名的列。

此字符串必须与 CREATE FUNCTION 语句的 RETURNS 子句中 groupRank 列的名称匹配。

sortOrder

控制排名分配的行的排序。

有效值如下所示:

  • 'asc' - 根据排名按升序排序。

  • “desc”- 根据排名按降序排序。

outputOrder

控制输出的排序。有效值如下所示:

  • 'asc' - 根据排名按升序输出。

  • “desc”- 根据排名按降序排序。

maxIdle

保留某个组以进行排名的时间限制 (以毫秒为单位)。

maxIdle 过期时,当前组将会释放到流中。值为 0 表示没有空闲超时。

outputMax

该函数将在给定组中输出的最大行数。

值 0 表示无限制。

示例

示例数据集

以下示例基于示例股票数据集,后者是《Amazon Kinesis Data Analytics 开发人员指南》 中的入门练习的一部分。要运行每个示例,您需要具有示例股票代码输入流的 Amazon Kinesis Data Analytics 应用程序。要了解如何创建分析应用程序和配置示例股票代码输入流,请参阅《Amazon Kinesis Data Analytics 开发人员指南》 中的入门

示例股票数据集具有以下架构:

(ticker_symbol VARCHAR(4), sector VARCHAR(16), change REAL, price REAL)

示例 1:对 GROUP BY 子句的结果进行排序

在此示例中,聚合查询在 GROUP BY 上有一个 ROWTIME 子句,该子句将流分组到有限行中。然后,GROUP_RANK 函数对 GROUP BY 子句返回的行进行排序。

CREATE OR REPLACE STREAM "ticker_grouped" ( "group_time" TIMESTAMP, "ticker" VARCHAR(65520), "ticker_count" INTEGER); CREATE OR REPLACE STREAM "destination_sql_stream" ( "group_time" TIMESTAMP, "ticker" VARCHAR(65520), "ticker_count" INTEGER, "group_rank" INTEGER); CREATE OR REPLACE PUMP "ticker_pump" AS INSERT INTO "ticker_grouped" SELECT STREAM FLOOR(SOURCE_SQL_STREAM_001.ROWTIME TO SECOND), "TICKER_SYMBOL", COUNT(TICKER_SYMBOL) FROM SOURCE_SQL_STREAM_001 GROUP BY FLOOR(SOURCE_SQL_STREAM_001.ROWTIME TO SECOND), TICKER_SYMBOL; CREATE OR REPLACE PUMP DESTINATION_SQL_STREAM_PUMP AS INSERT INTO "destination_sql_stream" SELECT STREAM "group_time", "ticker", "ticker_count", "groupRank" FROM TABLE( GROUP_RANK( CURSOR(SELECT STREAM * FROM "ticker_grouped"), 'ticker_count', 'groupRank', 'desc', 'asc', 5, 0));

结果

上一示例输出的流与以下内容类似。

操作概述

行从每个组 (即,具有相同的行时间的行) 的输入游标进行缓冲。在具有不同的行时间的行到达后 (或在空闲超时发生时),将完成行的排名。当对具有相同行时间的一组行执行排名时,将继续读取行。

在分配排名后,outputMax 参数将指定要为每个组返回的最大行数。

默认情况下,group_rank 支持列传递,如示例中所示:使用 c.* 作为标准快捷方式以指示按显示的顺序传递所有输入列。您可以改为使用表示法“c.columName”指定一个子集,以便对列进行重新排序。但是,使用特定的列名称会将 UDX 绑定到一个特定的输入集,而使用 c.* 表示法可让 UDX 处理任何输入集。

rankOutColumnName 参数将指定要用于返回排名的输出列。此列名称必须与 CREATE FUNCTION 语句的 RETURNS 子句中指定的列名称匹配。