经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 我们将从 2026 年 1 月 27 日起删除您的申请。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:转换多个数据类型
提取、转换和加载 (ETL) 应用程序的共同要求是处理流式传输源中的多种记录。您可以通过创建一个 Kinesis Data Analytics 应用程序来处理此类流式传输源。流程如下:
-
首先,您将流式传输源映射到应用程序内部输入流,与所有其他 Kinesis Data Analytics 应用程序类似。
-
然后,在应用程序代码中编写 SQL 语句,从应用程序内部输入流中检索特定类型的行。然后,将这些行插入单独的应用程序内部流。(您可以在应用程序代码中创建其他应用程序内部流)。
在本练习中,您具有一个可接收两种类型 (Order
和 Trade
) 记录的流式传输源。它们分别表示库存订单和相应交易。每批订单可以有零笔或多笔交易。下面显示了每个类型的示例记录:
Order record
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
Trade record
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
使用创建应用程序时 Amazon Web Services Management Console,控制台会显示所创建的应用程序内输入流的以下推断架构。默认情况下,控制台将该应用程序内部流命名为 SOURCE_SQL_STREAM_001
。
在保存配置时,Amazon Kinesis Data Analytics 持续从流式传输源中读取数据,并在应用程序内部流中插入行。现在,您可以对应用程序内部流中的数据进行分析。
在本示例的应用程序代码中,首先创建两个额外的应用程序内部流:Order_Stream
和 Trade_Stream
。然后,根据记录类型筛选 SOURCE_SQL_STREAM_001
流中的行,使用数据泵将这些行插入到新创建的流。有关此编码模式的信息,请参阅应用程序代码。
-
将订单和交易行筛选到单独的应用程序内部流:
-
筛选
SOURCE_SQL_STREAM_001
中的订单记录,并将订单保存到Order_Stream
。--Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
-
筛选
SOURCE_SQL_STREAM_001
中的交易记录,并将订单保存到Trade_Stream
。--Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
-
-
现在,您可以对这些流进行其他分析。在本示例中,您将按股票在时长一分钟的滚动窗口中计算交易数,并将结果保存到另一个流
DESTINATION_SQL_STREAM
。--do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);
此时将显示如下结果: