对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 3.4:(可选) 更新应用程序代码
在此步骤中,您将研究如何更新应用程序代码。
更新应用程序代码
-
按如下方式创建另一个应用程序内部流:
-
创建名为
DESTINATION_SQL_STREAM_2
的另一个应用程序内部流。 -
创建数据泵,然后从
DESTINATION_SQL_STREAM
中选择行,以便使用数据泵在新创建的流中插入这些行。
在 SQL 编辑器中,将以下代码附加到现有应用程序代码中:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_2" (ticker_symbol VARCHAR(4), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS INSERT INTO "DESTINATION_SQL_STREAM_2" SELECT STREAM ticker_symbol, change, price FROM "DESTINATION_SQL_STREAM";
保存并运行代码。Real-time analytics 选项卡上将显示其他应用程序内部流。
-
-
创建两个应用程序内部流。根据股票代码筛选
SOURCE_SQL_STREAM_001
中的行,然后将这些行插入到这些单独的流中。将以下 SQL 语句附加到您的应用程序代码:
CREATE OR REPLACE STREAM "AMZN_STREAM" (ticker_symbol VARCHAR(4), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "AMZN_PUMP" AS INSERT INTO "AMZN_STREAM" SELECT STREAM ticker_symbol, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ticker_symbol SIMILAR TO '%AMZN%'; CREATE OR REPLACE STREAM "TGT_STREAM" (ticker_symbol VARCHAR(4), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "TGT_PUMP" AS INSERT INTO "TGT_STREAM" SELECT STREAM ticker_symbol, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ticker_symbol SIMILAR TO '%TGT%';
保存并运行代码。请注意 Real-time analytics 选项卡上的其他应用程序内部流。
您现已创建第一个正常运行的 Amazon Kinesis Data Analytics 应用程序。在本练习中,您已完成以下操作:
-
创建了第一个 Kinesis Data Analytics 应用程序。
-
配置将演示流标识为流式源的应用程序输入,并将其映射到创建的应用程序内部流 (
SOURCE_SQL_STREAM_001
)。Kinesis Data Analytics 持续读取演示流,并在应用程序内部流中插入记录。 -
应用程序代码已查询
SOURCE_SQL_STREAM_001
,并将输出写入到名为DESTINATION_SQL_STREAM
的另一个应用程序内部流。
现在,您可以选择性地配置应用程序输出,以便将应用程序输出写入到外部目标。也就是说,您可配置应用程序输出以便将 DESTINATION_SQL_STREAM
中的记录写入到外部目标。在本练习中,此步骤为可选步骤。要了解如何配置目标,请转到下一步。