

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 示例：转换多个数据类型
<a name="app-tworecordtypes"></a>

 提取、转换和加载 (ETL) 应用程序的共同要求是处理流式传输源中的多种记录。您可以通过创建一个 Kinesis Data Analytics 应用程序来处理此类流式传输源。流程如下：

1. 首先，您将流式传输源映射到应用程序内部输入流，与所有其他 Kinesis Data Analytics 应用程序类似。

1. 然后，在应用程序代码中编写 SQL 语句，从应用程序内部输入流中检索特定类型的行。然后，将这些行插入单独的应用程序内部流。(您可以在应用程序代码中创建其他应用程序内部流)。

在本练习中，您具有一个可接收两种类型 (`Order` 和 `Trade`) 记录的流式传输源。它们分别表示库存订单和相应交易。每批订单可以有零笔或多笔交易。下面显示了每个类型的示例记录：

**Order record**

```
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
```

**Trade record**

```
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
```

使用创建应用程序时 Amazon Web Services 管理控制台，控制台会显示所创建的应用程序内输入流的以下推断架构。默认情况下，控制台将该应用程序内部流命名为 `SOURCE_SQL_STREAM_001`。

![控制台屏幕截图，显示格式化的应用程序内部流示例。](http://docs.amazonaws.cn/kinesisanalytics/latest/dev/images/two-record-types-10.png)


在保存配置时，Amazon Kinesis Data Analytics 持续从流式传输源中读取数据，并在应用程序内部流中插入行。现在，您可以对应用程序内部流中的数据进行分析。

在本示例的应用程序代码中，首先创建两个额外的应用程序内部流：`Order_Stream` 和 `Trade_Stream`。然后，根据记录类型筛选 `SOURCE_SQL_STREAM_001` 流中的行，使用数据泵将这些行插入到新创建的流。有关此编码模式的信息，请参阅[应用程序代码](how-it-works-app-code.md)。

1. 将订单和交易行筛选到单独的应用程序内部流:

   1. 筛选 `SOURCE_SQL_STREAM_001` 中的订单记录，并将订单保存到 `Order_Stream`。

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  order_id     integer, 
                  order_type   varchar(10),
                  ticker       varchar(4),
                  order_price  DOUBLE, 
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM oid, otype,oticker, oprice, recordtype 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Order';
      ```

   1. 筛选 `SOURCE_SQL_STREAM_001` 中的交易记录，并将订单保存到 `Trade_Stream`。

      ```
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 (trade_id     integer, 
                  order_id     integer, 
                  trade_price  DOUBLE, 
                  ticker       varchar(4),
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM tid, toid, tprice, tticker, recordtype
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Trade';
      ```

1. 现在，您可以对这些流进行其他分析。在本示例中，您将按股票在时长一分钟的[滚动窗口](https://docs.amazonaws.cn/kinesisanalytics/latest/dev/tumbling-window-concepts.html)中计算交易数，并将结果保存到另一个流 `DESTINATION_SQL_STREAM`。

   ```
   --do some analytics on the Trade_Stream and Order_Stream. 
   -- To see results in console you must write to OPUT_SQL_STREAM.
   
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
               ticker  varchar(4),
               trade_count   integer
               );
   
   CREATE OR REPLACE PUMP "Output_Pump" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker, count(*) as trade_count
         FROM   "Trade_Stream"
         GROUP BY ticker,
                   FLOOR("Trade_Stream".ROWTIME TO MINUTE);
   ```

   此时将显示如下结果：  
![控制台屏幕截图，在 SQL 结果选项卡中显示结果。](http://docs.amazonaws.cn/kinesisanalytics/latest/dev/images/two-record-types-20.png)

**Topics**
+ [步骤 1：准备数据](tworecordtypes-prepare.md)
+ [步骤 2：创建应用程序](tworecordtypes-create-app.md)

**下一个步骤**  
[步骤 1：准备数据](tworecordtypes-prepare.md)