对于新项目,我们建议您使用适用于 Apache Flink Studio 的新托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。适用于 Apache Flink Studio 的托管服务将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:检测流上的热点 (HOTSPOTS 函数)
Amazon Kinesis Data Analytics 提供了HOTSPOTS
该功能,该功能可以定位和返回有关数据中相对密集区域的信息。有关更多信息,请参阅适用于 A pache 的亚马逊托管服务 Flink S QL 参考中的热点。
在本练习中,您将编写应用程序代码以查找应用程序的流式传输源上的热点。要设置应用程序,请执行以下步骤:
-
设置直播源 — 您设置 Kinesis 流并写入示例坐标数据,如下所示:
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
本示例提供了用于填充流的 Python 脚本。
x
和y
值是随机生成的,一些记录集中在特定位置周围。如果脚本有意生成值作为热点的一部分,
is_hot
字段将作为指示器提供。这可以帮助您评估热点检测函数是否正常运行。 -
创建应用程序-使用Amazon Web Services Management Console,然后创建 Kinesis Data Analytics 应用程序。通过将流式传输源映射到应用程序内部流 (
SOURCE_SQL_STREAM_001
) 来配置应用程序输入。当应用程序启动时,Kinesis Data Analytics 会持续读取流源并将记录插入到应用程序内部流中。在本练习中,您将为应用程序使用以下代码:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "x" DOUBLE, "y" DOUBLE, "is_hot" VARCHAR(4), HOTSPOTS_RESULT VARCHAR(10000) ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" FROM TABLE ( HOTSPOTS( CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 1000, 0.2, 17) );
此代码读取
SOURCE_SQL_STREAM_001
中的行,分析它是否有大量热点,并将生成的数据写入到另一个应用程序内部流 (DESTINATION_SQL_STREAM
)。您使用数据泵将流插入到应用程序内部流。有关更多信息,请参阅应用程序内部流和数据泵: -
配置输出-将应用程序输出配置为将数据从应用程序发送到外部目标,即另一个 Kinesis 数据流。查看热点分数并确定哪些分数表明出现了热点 (并且您需要收到警报)。您可以使用 Amazon Lambda 函数进一步处理热点信息并配置警报。
-
验证输出-该示例包括一个 JavaScript 应用程序,该应用程序从输出流中读取数据并将其以图形方式显示,因此您可以实时查看该应用程序生成的热点。
本练习使用美国西部(俄勒冈)(us-west-2
) 来创建这些直播和您的应用程序。如果您使用任何其他区域,请相应地更新代码。