用 Kinesis Data Streams 代替 Kinesis Data Firehose 源 - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用 Kinesis Data Streams 代替 Kinesis Data Firehose 源

有关完整教程,请参阅 Converting-KDASQL-KDAStudio/

在以下练习中,您需要更改数据流,以便使用适用于 Apache Flink Studio 的亚马逊托管服务。即从 Amazon Kinesis Data Firehose 切换到Amazon Kinesis Data Streams。

首先,我们提供一个典型的 KDA-SQL 架构,然后演示如何使用适用于 Apache Flink Studio 的亚马逊托管服务和 Amazon Kinesis Data Streams 取而代之。或者,您可以在此处启动 Amazon CloudFormation 模板:

Amazon Kinesis Data Analytics-SQL 和 Amazon Kinesis Data Firehose

以下是 Amazon Kinesis Data Analytics SQL 架构流程:

我们首先研究了传统 Amazon Kinesis Data Analytics-SQL 和 Amazon Kinesis Data Firehose 的设置。用例是一个交易市场,交易数据 (包括股票代码和价格) 从外部来源流向 Amazon Kinesis 系统。适用于 SQL 的 Amazon Kinesis Data Analytics 使用输入流执行窗口式查询,例如滚动窗口,从而确定每只股票在一分钟窗口内的交易量和 minmax 以及 average 交易价格。 

描述配置的并行度 (映射到流式传输源的应用程序内流的数量)。处理后,Amazon Kinesis Data Analytics-SQL 将处理后的数据发送到另一个 Amazon Kinesis Data Firehose,然后由后者将输出保存在 Amazon S3 存储桶中。

在本例中,您将使用 Amazon Kinesis 数据生成器。使用 Amazon Kinesis 数据生成器,您可以将测试数据发送到 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose 传输流。请按照此处的说明进行操作。使用此处的 Amazon CloudFormation 模板代替说明中提供的模板:

运行 Amazon CloudFormation 模板后,输出部分将提供 Amazon Kinesis 数据生成器 url。使用您在此处设置的 Cognito 用户 ID 和密码登录门户。选择区域和目标流名称。当前状态请选择 Amazon Kinesis Data Firehose 传输流。对于新状态,请选择 Amazon Kinesis Data Firehose 流名称。您可以根据需要创建多个模板,然后使用测试模板按钮测试模板,然后再发送到目标流。

以下是使用 Amazon Kinesis 数据生成器的有效负载示例。数据生成器将输入的 Amazon Kinesis Firehose 流作为目标,持续流式处理数据。Amazon Kinesis 软件开发工具包客户端也可以发送来自其他创建器的数据。 

2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763, "AMZN",3352023-02-17 09:28:07.763, "GOOGL",1852023-02-17 09:28:07.763, "AAPL",11162023-02-17 09:28:07.763, "GOOGL",1582

以下 JSON 用于生成一系列随机交易时间和日期、股票代码和股票价格:

date.now(YYYY-MM-DD HH:mm:ss.SSS), "random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])", random.number(2000)

选择发送数据后,生成器将开始发送模拟数据。

外部系统将数据流式传输到 Amazon Kinesis Data Firehose。在适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 中,您可以使用标准 SQL 分析流数据。该服务用于根据流式传输源创建并运行 SQL 代码,以便执行时间序列分析,馈送实时控制面板和创建实时指标。适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 可以根据输入流上的 SQL 查询创建目标流,然后将目标流发送给另一个 Amazon Kinesis Data Firehose。目标 Amazon Kinesis Data Firehose 可以最终状态将分析数据发送到 Amazon S3。

Amazon Kinesis Data Analytics-SQL 传统代码基于 SQL 标准的扩展。

您可以在 Amazon Kinesis Data Analytics-SQL 中使用以下查询。首先为查询输出创建目标流。然后,您将使用 PUMP,一个 Amazon Kinesis Data Analytics 存储库对象 (SQL 标准的扩展),其提供持续运行的 INSERT INTO stream SELECT ... FROM 查询功能,因此,查询结果可持续输入到指定流中。 

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP, INGEST_TIME TIMESTAMP, TICKER VARCHAR(16), VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME", "ticker", COUNT(*) AS VOLUME, AVG("tradePrice") AS AVG_PRICE, MIN("tradePrice") AS MIN_PRICE, MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001" GROUP BY "ticker", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);

以上 SQL 使用两个时间窗口,来自传入流有效负载的 tradeTimestampROWTIME.tradeTimestamp (也称为 Event Timeclient-side time)。经常需要在分析中使用此时间,因为它是事件发生时的时间。但是,许多事件源(例如手机和 Web 客户端)没有可靠的时钟,这可能会导致时间不准确。此外,连接问题可能会导致记录没有按照事件发生顺序出现在流中。 

应用程序内部流也包含名为 ROWTIME 的特殊列。该列存储 Amazon Kinesis Data Analytics 在第一个应用程序内部流中插入行的时间戳。ROWTIME 反映了 Amazon Kinesis Data Analytics 从流式传输源中读取后将记录插入到第一个应用程序内部流的时间戳。之后,该 ROWTIME 值在您的整个应用程序中进行维护。 

SQL 在 60 秒的时间间隔内确定股票的计数:volumeminmaxaverage 价格。 

在基于时间的窗口式查询中使用这些时间有优点也有缺点。选择这些时间中的一个或多个,并根据您的使用案例场景选择一种策略来处理相关缺点。 

双窗口策略基于不同的时间,即 ROWTIME 和其他时间(接收时间或事件时间)中的一个。

  • 使用 ROWTIME 作为第一个窗口,控制查询发送结果的频率,如以下示例所示。它不用作逻辑时间。

  • 使用其他时间中您希望与分析关联的逻辑时间。该时间表示事件的发生时间。在以下示例中,分析目标是按股票行情机对记录分组并返回计数。

适用于 Apache Flink Studio 的亚马逊托管服务 

在更新的架构中,将 Amazon Kinesis Data Firehose 替换为 Amazon Kinesis Data Streams。适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 已替换为 适用于 Apache Flink Studio 的亚马逊托管服务。Apache Flink 代码在 Apache Zeppelin 笔记本中以交互方式运行。Amazon Managed Service for Apache Flink Studio 将聚合的交易数据发送到 Amazon S3 桶中,以便存储。步骤如下:

以下是适用于 Apache Flink Studio 的亚马逊托管服务架构流程:

创建 Kinesis 数据流

使用控制台创建数据流
  1. 登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

  2. 在导航栏中,展开区域选择器并选择一个区域。

  3. 选择创建数据流

  4. 创建 Kinesis 流页面上,输入数据流的名称,然后接受默认的按需容量模式。

    按需模式下,您可以选择创建 Kinesis 流来创建数据流。

    Kinesis stream (Kinesis 流)页面上,当流处于创建中时,流的 Status (状态)Creating (正在创建)。当流可以使用时,Status (状态) 会更改为 Active (有效)

  5. 选择流的名称。Stream Details (流详细信息) 页面显示了流配置摘要以及监控信息。

  6. 在 Amazon Kinesis 数据生成器中,将流/传输流更改为新的 Amazon Kinesis Data Streams:TRADE_SOURCE_STREAM

    JSON 和有效负载不变,即与您在 Amazon Kinesis Data Analytics-SQL 中使用的一致。使用 Amazon Kinesis 数据生成器生成一些交易有效负载示例数据,并将 TRADE_SOURCE_STREAM 数据流作为本次练习的目标:

    {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}}, "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}", {{random.number(2000)}}
  7. 在 Amazon Web Services Management Console 上,转至适用于 Apache Flink 的托管服务,然后选择创建应用程序

  8. 在左侧的导航窗格中,选择 工作室笔记本,然后选择 创建工作室笔记本

  9. 输入 Studio 笔记本的名称。

  10. AmazonGlue 数据库下,提供一个已经存在的 Amazon Glue 数据库,用于定义源和目标的元数据。如果您没有 Amazon Glue 数据库,请选择创建,然后执行以下操作:

    1. 在 Amazon Glue 控制台中,从左侧菜单中选择 Data catalog(数据目录)下的 Databases(数据库)。

    2. 选择 创建数据库

    3. 创建数据库页面中,输入数据库的名称。在 位置 - 可选 部分中,选择 Browse Amazon S3(浏览 Amazon S3),然后选择 Amazon S3 桶。如果尚未设置 Amazon S3 桶,您可以跳过此步骤,稍后再返回。

    4. (可选)。输入数据库的描述。

    5. 选择 Create database(创建数据库)。

  11. 选择 创建笔记本

  12. 创建笔记本后,选择运行

  13. 成功启动笔记本后,选择在 Apache Zeppelin 中打开,启动 Zeppelin 笔记本。

  14. 在 Zeppelin 笔记本页面上,选择创建新笔记并将其命名为 MarketDataFeed

Flink SQL 代码的说明如下,但首先请参阅 Zeppelin 笔记本屏幕外观。笔记本中的每个窗口都是一个单独的代码块,一次只能运行一个。

适用于 Apache Flink 的亚马逊托管服务代码

适用于 Apache Flink 的亚马逊托管服务使用 Zeppelin 笔记本来运行代码。在本示例中,已基于 Apache Flink 1.13 进行 ssql 代码映射。Zeppelin 笔记本中的代码如下所示,一次运行于一个数据块。 

在 Zeppelin 笔记本中运行任何代码之前,必须先运行 Flink 配置命令。如果在运行代码(ssql、Python 或 Scala)后需要更改任何配置设置,则需要停止并重启笔记本。在此示例中,您需要设置检查点。必须设置检查点,以便您可以将数据流式传输到 Amazon S3 中的文件。从而实现向 Amazon S3 进行的数据流式传输刷新到文件。以下语句将间隔设置为 5000 毫秒。 

%flink.conf execution.checkpointing.interval 5000

%flink.conf 表示此数据块属于配置语句。有关 Flink 配置 (包括检查点) 的更多信息,请参阅 Apache Flink 检查点。 

源 Amazon Kinesis Data Streams 的输入表使用下面的 Flink ssql 代码创建。请注意,TRADE_TIME 字段存储数据生成器创建的日期/时间。

%flink.ssql DROP TABLE IF EXISTS TRADE_SOURCE_STREAM; CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, TRADE_TIME TIMESTAMP(3), WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE, STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM', 'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');

您可以使用以下语句查看输入流:

%flink.ssql(type=update)-- testing the source stream select * from TRADE_SOURCE_STREAM;

在将汇总数据发送到 Amazon S3 之前,您可以使用滚动窗口选择查询直接在适用于 Apache Flink 的亚马逊托管服务中查看。进行该操作后,交易数据会汇总在一分钟时间窗口内。请注意,%flink.ssql 语句必须进行 (type=update) 指定:

%flink.ssql(type=update) select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE) as TRADE_WINDOW, TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

然后,您才可以在 Amazon S3 中创建目标表。你需要使用水印。水印是一种进度指标,它表示您确信不会再出现延迟事件的时间点。使用水印的原因是为了考虑到达延迟。间隔 ‘5’ Second 允许交易延迟5秒输入 Amazon Kinesis 数据流,如果窗口内有时间戳,则仍包含在内。有关更多信息,请参阅生成水印。  

%flink.ssql(type=update) DROP TABLE IF EXISTS TRADE_DESTINATION_S3; CREATE TABLE TRADE_DESTINATION_S3 ( TRADE_WINDOW_START TIMESTAMP(3), WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND, TICKER STRING,  VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE) WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');

此语句将数据插入 TRADE_DESTINATION_S3TUMPLE_ROWTIME 是滚动窗口上限 (含) 的时间戳。

%flink.ssql(type=update) insert into TRADE_DESTINATION_S3 select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE), TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

运行语句 10 到 20 分钟,以便在 Amazon S3 中积累一些数据。然后中止语句。

从而关闭 Amazon S3 中的文件以便进行查看。

以下是内容:

您可以使用Amazon CloudFormation 模板来创建基础设施。

Amazon CloudFormation 会在您的 Amazon 账户中创建以下资源:

  • Amazon Kinesis Data Streams

  • 适用于 Apache Flink Studio 的亚马逊托管服务

  • Amazon Glue 数据库

  • Amazon S3 存储桶

  • 适用于 Apache Flink Studio 的亚马逊托管服务访问相应资源的 IAM 角色和策略

导入笔记本并将 Amazon S3 存储桶名称更改为 Amazon CloudFormation 创建的新 Amazon S3 存储桶。

查看更多

以下是一些其他资源,你可以用来详细了解如何使用适用于 Apache Flink Studio 的托管服务: