示例:检测流上的热点 (HOTSPOTS 函数) - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:检测流上的热点 (HOTSPOTS 函数)

Amazon Kinesis Data Analytics 提供了 HOTSPOTS 函数,它可以查找并返回有关数据中的相对密集的区域的信息。有关更多信息,请参阅 Amazon Managed Service for Apache Flink SQL 参考中的 HOTSPOTS

在本练习中,您将编写应用程序代码以查找应用程序的流式传输源上的热点。要设置应用程序,请执行以下步骤:

  1. 设置流式传输源 - 您设置 Kinesis 流并编写示例坐标数据,如下所示:

    {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}

    本示例提供了用于填充流的 Python 脚本。xy 值是随机生成的,一些记录集中在特定位置周围。

    如果脚本有意生成值作为热点的一部分,is_hot 字段将作为指示器提供。这可以帮助您评估热点检测函数是否正常运行。

  2. 创建应用程序 – 使用 Amazon Web Services Management Console,您随后创建一个 Kinesis Data Analytics 应用程序。通过将流式传输源映射到应用程序内部流 (SOURCE_SQL_STREAM_001) 来配置应用程序输入。在应用程序启动时,Kinesis Data Analytics 持续读取流式传输源,并将记录插入到应用程序内部流中。

    在本练习中,您将为应用程序使用以下代码:

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "x" DOUBLE, "y" DOUBLE, "is_hot" VARCHAR(4), HOTSPOTS_RESULT VARCHAR(10000) ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" FROM TABLE ( HOTSPOTS( CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 1000, 0.2, 17) );

    此代码读取 SOURCE_SQL_STREAM_001 中的行,分析它是否有大量热点,并将生成的数据写入到另一个应用程序内部流 (DESTINATION_SQL_STREAM)。您使用数据泵将流插入到应用程序内部流。有关更多信息,请参阅 应用程序内部流和数据泵

  3. 配置输出 - 您配置应用程序输出以将应用程序中的数据发送到外部目标 (另一个 Kinesis 数据流)。查看热点分数并确定哪些分数表明出现了热点 (并且您需要收到警报)。您可以使用 Amazon Lambda 函数进一步处理热点信息并配置警报。

  4. 验证输出 - 示例包含一个 JavaScript 应用程序,该应用程序从输出流读取数据并以图形方式显示它,因此您可以实时查看应用程序生成的热点。

本练习使用美国西部(俄勒冈州)(us-west-2) 区域创建这些流和您的应用程序。如果您使用任何其他区域,请相应地更新代码。