示例:转换多个数据类型 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 我们将从 2026 年 1 月 27 日起删除您的申请。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:转换多个数据类型

提取、转换和加载 (ETL) 应用程序的共同要求是处理流式传输源中的多种记录。您可以通过创建一个 Kinesis Data Analytics 应用程序来处理此类流式传输源。流程如下:

  1. 首先,您将流式传输源映射到应用程序内部输入流,与所有其他 Kinesis Data Analytics 应用程序类似。

  2. 然后,在应用程序代码中编写 SQL 语句,从应用程序内部输入流中检索特定类型的行。然后,将这些行插入单独的应用程序内部流。(您可以在应用程序代码中创建其他应用程序内部流)。

在本练习中,您具有一个可接收两种类型 (OrderTrade) 记录的流式传输源。它们分别表示库存订单和相应交易。每批订单可以有零笔或多笔交易。下面显示了每个类型的示例记录:

Order record

{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}

Trade record

{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}

使用创建应用程序时 Amazon Web Services Management Console,控制台会显示所创建的应用程序内输入流的以下推断架构。默认情况下,控制台将该应用程序内部流命名为 SOURCE_SQL_STREAM_001

控制台屏幕截图,显示格式化的应用程序内部流示例。

在保存配置时,Amazon Kinesis Data Analytics 持续从流式传输源中读取数据,并在应用程序内部流中插入行。现在,您可以对应用程序内部流中的数据进行分析。

在本示例的应用程序代码中,首先创建两个额外的应用程序内部流:Order_StreamTrade_Stream。然后,根据记录类型筛选 SOURCE_SQL_STREAM_001 流中的行,使用数据泵将这些行插入到新创建的流。有关此编码模式的信息,请参阅应用程序代码

  1. 将订单和交易行筛选到单独的应用程序内部流:

    1. 筛选 SOURCE_SQL_STREAM_001 中的订单记录,并将订单保存到 Order_Stream

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
    2. 筛选 SOURCE_SQL_STREAM_001 中的交易记录,并将订单保存到 Trade_Stream

      --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
  2. 现在,您可以对这些流进行其他分析。在本示例中,您将按股票在时长一分钟的滚动窗口中计算交易数,并将结果保存到另一个流 DESTINATION_SQL_STREAM

    --do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);

    此时将显示如下结果:

    控制台屏幕截图,在 SQL 结果选项卡中显示结果。
下一个步骤

步骤 1:准备数据