示例:转换多个数据类型 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

示例:转换多个数据类型

提取、转换和加载 (ETL) 应用程序的共同要求是处理流式传输源中的多种记录。您可以创建一个 Amazon Kinesis 应用程序以处理此类流式传输源。过程如下所述:

  1. 首先,您将流式传输源映射到应用程序内部输入流,与所有其他 Kinesis 数据分析应用程序类似。

  2. 然后,在应用程序代码中编写 SQL 语句,从应用程序内部输入流中检索特定类型的行。然后,将这些行插入单独的应用程序内部流。(您可以在应用程序代码中创建其他应用程序内部流)。

在本练习中,您具有一个可接收两种类型 (OrderTrade) 记录的流式传输源。它们分别表示库存订单和相应交易。每批订单可以有零笔或多笔交易。下面显示了每个类型的示例记录:

Order record

{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}

Trade record

{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}

在使用 AWS 管理控制台创建应用程序时,控制台将为创建的应用程序内部输入流显示以下推断架构。默认情况下,控制台将该应用程序内部流命名为 SOURCE_SQL_STREAM_001


                控制台屏幕截图,显示格式化的应用程序内部流示例。

在保存配置时,Amazon Kinesis Data Analytics 持续从流式传输源中读取数据,并在应用程序内部流中插入行。现在,您可以对应用程序内部流中的数据进行分析。

在本示例中的应用程序代码中,您首先创建了两个额外的应用程序内流, Order_StreamTrade_Stream。然后将行从 SOURCE_SQL_STREAM_001 根据记录类型进行流股,并使用泵将其插入新创建的流。有关此编码模式的信息,请参阅应用程序代码

  1. 将订单和交易行筛选到单独的应用程序内部流:

    1. 筛选 SOURCE_SQL_STREAM_001 中的订单记录,并将订单保存到 Order_Stream

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
    2. 筛选 SOURCE_SQL_STREAM_001 中的交易记录,并将订单保存到 Trade_Stream

      --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
  2. 现在,您可以对这些流进行其他分析。在本示例中,您将按股票在时长一分钟的滚动窗口中计算交易数,并将结果保存到另一个流 DESTINATION_SQL_STREAM

    --do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);

    此时将显示如下结果:

    
                        控制台屏幕截图,在 SQL 结果选项卡中显示结果。

下一步

步骤 1. 准备数据