对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 2:创建 应用程序
在本部分中,创建 Amazon Kinesis Data Analytion Analyto 然后,通过添加将您在前一部分中创建的流式传输源映射到应用程序内部输入流的输入配置,您可以更新应用程序。
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择 Create application(创建应用程序)。此示例使用应用程序名称
ProcessMultipleRecordTypes
。 -
在应用程序详细信息页面上,选择 Connect streaming data (连接流数据),以连接到源。
-
在 Connect to source (连接到源) 页面上,执行以下操作:
-
选择在步骤 1:准备数据中创建的流。
-
选择以创建 IAM 角色。
-
等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。
-
选择 Save and continue。
-
-
在应用程序中心上,选择 Go to SQL editor。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)。
-
在 SQL 编辑器中编写应用程序代码并确认结果:
-
复制下面的应用程序代码并将其粘贴到编辑器中。
--Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( "order_id" integer, "order_type" varchar(10), "ticker" varchar(4), "order_price" DOUBLE, "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Order'; --******************************************** --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" ("trade_id" integer, "order_id" integer, "trade_price" DOUBLE, "ticker" varchar(4), "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Trade'; --***************************************************************** --do some analytics on the Trade_Stream and Order_Stream. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ticker" varchar(4), "trade_count" integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "ticker", count(*) as trade_count FROM "Trade_Stream" GROUP BY "ticker", FLOOR("Trade_Stream".ROWTIME TO MINUTE);
-
选择 Save and run SQL。选择 Real-time analytics (实时分析) 选项卡可查看应用程序已创建的所有应用程序内部流并验证数据。
-
下一个步骤
您可以将应用程序输出配置为将结果保存到外部目的地,例如另一个 Kinesis 数据流或 Kinesis Data Firehose 数据传输流。