对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 2:创建应用程序
在此部分中,您创建一个 Amazon Kinesis Data Analytics 应用程序,如下所示:
-
配置应用程序输入以将在 步骤 1:准备 中创建的 Kinesis 数据流作为流式传输源。
-
在控制台上使用 Anomaly Detection (异常检测) 模板。
创建应用程序
-
按照 Kinesis Data Analytics 入门练习中的步骤 1、2 和 3(请参阅 步骤 3.1:创建应用程序)。
-
在源配置中,执行以下操作:
-
指定您在上一部分中创建的流式传输源。
-
在控制台推断架构后,编辑架构并将
heartRate
列类型设置为INTEGER
。大多数心率值是正常的,发现过程最有可能将
TINYINT
类型分配给此列。但有小部分值显示了高心率。如果这些较高的值不适合TINYINT
类型,则 Kinesis Data Analytics 会将这些行发送到错误流。将数据类型更新为INTEGER
,以便能适合所有生成的心率数据。
-
-
在控制台上使用 Anomaly Detection (异常检测) 模板。随后,您更新模板代码以提供适当的列名称。
-
-
通过提供列名称来更新应用程序代码。下面显示了生成的应用程序代码 (将此代码粘贴到 SQL 编辑器中):
--Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
-
运行 SQL 代码并在 Kinesis Data Analytics 控制台中检查结果: