示例:转换多个数据类型 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:转换多个数据类型

提取、转换和加载 (ETL) 应用程序的共同要求是处理流式传输源中的多种记录。您可以通过创建一个 Kinesis Data Analytics 应用程序来处理此类流式传输源。流程如下:

  1. 首先,您将流式传输源映射到应用程序内部输入流,与所有其他 Kinesis Data Analytics 应用程序类似。

  2. 然后,在应用程序代码中编写 SQL 语句,从应用程序内部输入流中检索特定类型的行。然后,将这些行插入单独的应用程序内部流。(您可以在应用程序代码中创建其他应用程序内部流)。

在本练习中,您具有一个可接收两种类型 (OrderTrade) 记录的流式传输源。它们分别表示库存订单和相应交易。每批订单可以有零笔或多笔交易。下面显示了每个类型的示例记录:

Order record

{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}

Trade record

{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}

使用创建应用程序时 Amazon Web Services Management Console,控制台会显示所创建的应用程序内输入流的以下推断架构。默认情况下,控制台将该应用程序内部流命名为 SOURCE_SQL_STREAM_001

控制台屏幕截图,显示格式化的应用程序内部流示例。

在保存配置时,Amazon Kinesis Data Analytics 持续从流式传输源中读取数据,并在应用程序内部流中插入行。现在,您可以对应用程序内部流中的数据进行分析。

在本示例的应用程序代码中,首先创建两个额外的应用程序内部流:Order_StreamTrade_Stream。然后,根据记录类型筛选 SOURCE_SQL_STREAM_001 流中的行,使用数据泵将这些行插入到新创建的流。有关此编码模式的信息,请参阅应用程序代码

  1. 将订单和交易行筛选到单独的应用程序内部流:

    1. 筛选 SOURCE_SQL_STREAM_001 中的订单记录,并将订单保存到 Order_Stream

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
    2. 筛选 SOURCE_SQL_STREAM_001 中的交易记录,并将订单保存到 Trade_Stream

      --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
  2. 现在,您可以对这些流进行其他分析。在本示例中,您将按股票在时长一分钟的滚动窗口中计算交易数,并将结果保存到另一个流 DESTINATION_SQL_STREAM

    --do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);

    此时将显示如下结果:

    控制台屏幕截图,在 SQL 结果选项卡中显示结果。
下一个步骤

步骤 1:准备数据