步骤 2:创建应用程序 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 2:创建应用程序

在此部分中,您创建一个 Amazon Kinesis Data Analytics 应用程序,如下所示:

  • 配置应用程序输入以将在 步骤 1:准备 中创建的 Kinesis 数据流作为流式传输源。

  • 在控制台上使用 Anomaly Detection (异常检测) 模板。

创建应用程序
  1. 按照 Kinesis Data Analytics 入门练习中的步骤 1、2 和 3(请参阅 步骤 3.1:创建应用程序)。

    • 在源配置中,执行以下操作:

      • 指定您在上一部分中创建的流式传输源。

      • 在控制台推断架构后,编辑架构并将 heartRate 列类型设置为 INTEGER

        大多数心率值是正常的,发现过程最有可能将 TINYINT 类型分配给此列。但有小部分值显示了高心率。如果这些较高的值不适合 TINYINT 类型,则 Kinesis Data Analytics 会将这些行发送到错误流。将数据类型更新为 INTEGER,以便能适合所有生成的心率数据。

    • 在控制台上使用 Anomaly Detection (异常检测) 模板。随后,您更新模板代码以提供适当的列名称。

  2. 通过提供列名称来更新应用程序代码。下面显示了生成的应用程序代码 (将此代码粘贴到 SQL 编辑器中):

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

  3. 运行 SQL 代码并在 Kinesis Data Analytics 控制台中检查结果:

    
                            控制台屏幕截图,显示 Real-time analytics (实时分析) 选项卡以及应用程序内部流中的结果数据。

下一个步骤

步骤 3:配置应用程序输出