

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 步骤 2：创建应用程序
<a name="tworecordtypes-create-app"></a>

在此部分中，您创建一个 Kinesis Data Analytics 应用程序。然后，通过添加将您在前一部分中创建的流式传输源映射到应用程序内部输入流的输入配置，您可以更新应用程序。

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.amazonaws.cn/kinesisanalytics)

1. 选择**创建应用程序**。此示例使用应用程序名称 **ProcessMultipleRecordTypes**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在[步骤 1：准备数据](tworecordtypes-prepare.md)中创建的流。

   1. 选择以创建 IAM 角色。

   1. 等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。

   1. 选择 **保存并继续**。

1. 在应用程序中心上，选择 **Go to SQL editor**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  "order_id"     integer, 
                  "order_type"   varchar(10),
                  "ticker"       varchar(4),
                  "order_price"  DOUBLE, 
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Order';
      --********************************************
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 ("trade_id"     integer, 
                  "order_id"     integer, 
                  "trade_price"  DOUBLE, 
                  "ticker"       varchar(4),
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType"
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Trade';
      --*****************************************************************
      --do some analytics on the Trade_Stream and Order_Stream. 
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                  "ticker"  varchar(4),
                  "trade_count"   integer
                  );
      
      CREATE OR REPLACE PUMP "Output_Pump" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM "ticker", count(*) as trade_count
            FROM   "Trade_Stream"
            GROUP BY "ticker",
                      FLOOR("Trade_Stream".ROWTIME TO MINUTE);
      ```

   1. 选择 **保存并运行 SQL**。选择 **Real-time analytics (实时分析)** 选项卡可查看应用程序已创建的所有应用程序内部流并验证数据。

   

**下一个步骤**  
您可以配置应用程序输出以将结果永久保存到外部目标中，例如，另一个 Kinesis 流或 Firehose 数据传输流。