步骤 2:创建分析应用程序 - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 2:创建分析应用程序

在本节中,您将创建一个 Amazon Kinesis Data Analytics 应用程序,然后将其配置为使用在 步骤 1:准备数据 中作为流式传输源创建的 Kinesis 数据流。然后,运行使用 RANDOM_CUT_FOREST_WITH_EXPLANATION 函数的应用程序代码。

创建应用程序
  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在导航窗格中选择 Data Analytics (数据分析),然后选择创建应用程序

  3. 提供应用程序名称和描述 (可选),并选择 Create application

  4. 选择 Connect streaming data (连接流数据),然后从列表中选择 ExampleInputStream

  5. 选择 Discover schema,并确保 SystolicDiastolic 显示为 INTEGER 列。如果二者为另一种类型,则选择 Edit schema,并将 INTEGER 类型分配给二者。

  6. Real time analytics 下,选择 Go to SQL editor。出现提示时,选择运行您的应用程序。

  7. 将以下代码粘贴到 SQL 编辑器中,然后选择 Save and run SQL

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "Systolic" INTEGER, "Diastolic" INTEGER, "BloodPressureLevel" varchar(20), "ANOMALY_SCORE" DOUBLE, "ANOMALY_EXPLANATION" varchar(512)); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "Systolic" INTEGER, "Diastolic" INTEGER, "BloodPressureLevel" varchar(20), "ANOMALY_SCORE" DOUBLE, "ANOMALY_EXPLANATION" varchar(512)); -- Compute an anomaly score with explanation for each record in the input stream -- using RANDOM_CUT_FOREST_WITH_EXPLANATION CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true)); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
下一个步骤

步骤 3:检查结果