经过仔细考虑,我们决定分两个步骤停用 Amazon Kinesis Data Analytics for SQL 应用程序:
1. 从 2025 年 10 月 15 日起,您将无法创建新的 Kinesis Data Analytics for SQL 应用程序。
2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起,将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息,请参阅 Amazon Kinesis Data Analytics for SQL 应用程序停用。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 2:创建应用程序
在此部分中,您创建一个 Kinesis Data Analytics 应用程序。然后,通过添加将您在前一部分中创建的流式传输源映射到应用程序内部输入流的输入配置,您可以更新应用程序。
在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com
-
选择创建应用程序。此示例使用应用程序名称
ProcessMultipleRecordTypes
。 -
在应用程序详细信息页面上,选择 连接流数据,以连接到源。
-
在 连接到源 页面上,执行以下操作:
-
选择在步骤 1:准备数据中创建的流。
-
选择以创建 IAM 角色。
-
等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。
-
选择 保存并继续。
-
-
在应用程序中心上,选择 Go to SQL editor。要启动应用程序,请在显示的对话框中选择 是,启动应用程序。
-
在 SQL 编辑器中编写应用程序代码并确认结果:
-
复制下面的应用程序代码并将其粘贴到编辑器中。
--Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( "order_id" integer, "order_type" varchar(10), "ticker" varchar(4), "order_price" DOUBLE, "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Order'; --******************************************** --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" ("trade_id" integer, "order_id" integer, "trade_price" DOUBLE, "ticker" varchar(4), "record_type" varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType" FROM "SOURCE_SQL_STREAM_001" WHERE "RecordType" = 'Trade'; --***************************************************************** --do some analytics on the Trade_Stream and Order_Stream. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ticker" varchar(4), "trade_count" integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "ticker", count(*) as trade_count FROM "Trade_Stream" GROUP BY "ticker", FLOOR("Trade_Stream".ROWTIME TO MINUTE);
-
选择 保存并运行 SQL。选择 Real-time analytics (实时分析) 选项卡可查看应用程序已创建的所有应用程序内部流并验证数据。
-
下一个步骤
您可以配置应用程序输出以将结果永久保存到外部目标中,例如,另一个 Kinesis 流或 Firehose 数据传输流。