示例:在流中检测数据异常情况 (RANDOM_CUT_FOREST 函数) - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:在流中检测数据异常情况 (RANDOM_CUT_FOREST 函数)

Amazon Kinesis Data Analytics 提供了一个函数 (RANDOM_CUT_FOREST),它可以根据数值列中的值将异常分数分配给每个记录。有关更多信息,Amazon Managed Service for Apache Flink SQL 参考中的RANDOM_CUT_FOREST 函数

在本练习中,您将编写应用程序代码以将异常分数分配给应用程序的流式传输源中的记录。要设置应用程序,请执行以下操作:

  1. 设置流式源 - 您设置 Kinesis 数据流并编写示例 heartRate 数据,如下所示:

    {"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}

    此过程提供用于填充流的 Python 脚本。heartRate 值将随机生成,99% 的记录具有的 heartRate 值介于 60 和 100 之间,仅 1% 的记录具有的 heartRate 值介于 150 和 200 之间。因此,heartRate 值介于 150 和 200 之间的记录是异常情况。

  2. 配置输入 - 通过使用控制台,您创建 Kinesis Data Analytics 应用程序,并通过将流式源映射到应用程序内部流 (SOURCE_SQL_STREAM_001) 来配置应用程序输入。在应用程序启动时,Kinesis Data Analytics 持续读取流式传输源,并将记录插入到应用程序内部流中。

  3. 指定应用程序代码 - 示例使用以下应用程序代码:

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

    此代码读取 SOURCE_SQL_STREAM_001 中的行,分配异常分数,并将结果行写入另一个应用程序内部流 (TEMP_STREAM)。随后,应用程序代码将对 TEMP_STREAM 中的记录进行排序,并将结果保存到另一个应用程序内部流 (DESTINATION_SQL_STREAM)。您使用数据泵将流插入到应用程序内部流。有关更多信息,请参阅 应用程序内部流和数据泵

  4. 配置输出 - 您配置应用程序输出以将 DESTINATION_SQL_STREAM 中的数据保存到外部目标 (另一个 Kinesis 数据流)。查看分配给每条记录的异常分数并确定哪个分数指示应用程序外部的异常情况 (您需要收到这些异常情况的警报)。您可以使用 Amazon Lambda 函数处理这些异常分数并配置警报。

此练习使用美国东部 (弗吉尼亚州北部) (us-east-1) 来创建这些流和您的应用程序。如果您使用任何其他区域,则必须相应地更新代码。

下一个步骤

步骤 1:准备