示例:检测流媒体上的数据异常(random_cut_forestfunction) - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

示例:检测流媒体上的数据异常(random_cut_forestfunction)

Amazon Kinesis Data Analytics 提供了一个函数 (RANDOM_CUT_FOREST),它可以根据数值列中的值将异常分数分配给每个记录。有关详细信息,请参阅 RANDOM_CUT_FOREST 功能Amazon Kinesis Data Analytics SQL 参考.

在本练习中,您将编写应用程序代码以将异常分数分配给应用程序的流式传输源中的记录。要设置应用程序,请执行以下操作:

  1. 设置流式传输源 – 您设置一个 Kinesis 数据流并写入示例 heartRate 数据,如下所示:

    {"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}

    此过程提供用于填充流的 Python 脚本。heartRate 值将随机生成,99% 的记录具有的 heartRate 值介于 60 和 100 之间,仅 1% 的记录具有的 heartRate 值介于 150 和 200 之间。因此,heartRate 值介于 150 和 200 之间的记录是异常情况。

  2. 配置输入 – 通过使用控制台,您创建一个 Kinesis Data Analytics 应用程序,并将流式传输源映射到应用程序内部流 (SOURCE_SQL_STREAM_001) 以配置应用程序输入。在应用程序启动时,Kinesis Data Analytics 持续读取流式传输源,并将记录插入到应用程序内部流中。

  3. 指定应用程序代码 – 该示例使用以下应用程序代码:

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

    此代码读取 SOURCE_SQL_STREAM_001 中的行,分配异常分数,并将结果行写入另一个应用程序内部流 (TEMP_STREAM)。随后,应用程序代码将对 TEMP_STREAM 中的记录进行排序,并将结果保存到另一个应用程序内部流 (DESTINATION_SQL_STREAM)。您使用数据泵将流插入到应用程序内部流。有关更多信息,请参阅 应用程序内部流和数据泵。)

  4. 配置输出 – 您配置应用程序输出以将 DESTINATION_SQL_STREAM 中的数据永久保存到外部目标(另一个 Kinesis 数据流)中。查看分配给每条记录的异常分数并确定哪个分数指示应用程序外部的异常情况 (您需要收到这些异常情况的警报)。您可以使用 AWS Lambda 函数处理这些异常分数并配置警报。

本练习使用 美国东部(弗吉尼亚北部) (us-east-1) AWS 区域创建这些流和您的应用程序。如果您使用任何其他区域,则必须相应地更新代码。

下一步

步骤 1. 准备