对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:使用事件时间戳的滚动窗口
当一个窗口式查询以非重叠方式处理每个窗口时,这样的窗口称为滚动窗口。有关详细信息,请参阅 滚动窗口(使用 GROUP BY 组的聚合)。此 Amazon Kinesis Data Analytics 示例说明了一个使用事件时间戳的滚动窗口,这是一个用户创建的时间戳,它包含在流数据中。它使用这种方法而不是只使用 ROWTIME,这是 Kinesis Data Analytics 在应用程序收到记录时创建的时间戳。如果您想根据事件发生的时间而非应用程序收到此事件的时间创建一个聚合,则可以在流数据中使用事件时间戳。在本例中,ROWTIME
值每分钟触发一次此聚合,ROWTIME
和所包含事件时间都会聚合记录。
在本示例中,您将以下记录写入到 Amazon Kinesis 流中。EVENT_TIME
值在过去设置为 5 秒以模拟处理和传输滞后,而滞后可能导致在发生事件和将记录提取到 Kinesis Data Analytics 之间出现延迟。
{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65} {"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61} {"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48} {"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64} ...
然后,您在 Amazon Web Services Management Console 内创建一个 Kinesis Data Analytics 应用程序,并将 Kinesis 数据流作为流式传输源。发现过程读取流式传输源中的示例记录,并推断出具有三个列(EVENT_TIME
、TICKER
和 PRICE
)的如下所示的应用程序内部架构。
您使用应用程序代码以及 MIN
和 MAX
函数以创建数据的窗口式聚合。然后,将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:
在以下过程中,您创建一个 Kinesis Data Analytics 应用程序,它在基于事件时间的滚动窗口中聚合输入流中的值。
步骤 1:创建 Kinesis 数据流
创建一个 Amazon Kinesis Data Stream 并填充记录,如下所示:
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 数据流。
-
选择 创建 Kinesis 流,然后创建具有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建流。
-
要在生产环境中将记录写入到 Kinesis 数据流,我们建议您使用 Kinesis 客户端库或 Kinesis 数据流 API。为简单起见,此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码不断地将随机的股票代码记录写入到流中。让脚本保持运行,以便可以在后面的步骤中生成应用程序架构。
import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
步骤 2:创建 Kinesis Data Analytics 应用程序
创建一个 Kinesis Data Analytics 应用程序,如下所示:
打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择 创建应用程序,输入应用程序名称,然后选择 创建应用程序。
-
在应用程序详细信息页面上,选择 连接流数据,以连接到源。
-
在 连接到源 页面上,执行以下操作:
-
选择在上一部分中创建的流。
-
选择 发现架构。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有三列。
-
选择 编辑架构。将 EVENT_TIME 列的 列类型 更改为
TIMESTAMP
。 -
选择 保存架构并更新流示例。在控制台保存架构后,选择 退出。
-
选择 保存并继续。
-
-
在应用程序详细信息页面上,选择 转到 SQL编辑器。要启动应用程序,请在显示的对话框中选择 是,启动应用程序。
-
在 SQL 编辑器中编写应用程序代码并确认结果如下所示:
-
复制下面的应用程序代码并将其粘贴到编辑器中。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND), TICKER, MIN(PRICE) AS MIN_PRICE, MAX(PRICE) AS MAX_PRICE FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
-
选择 保存并运行 SQL。
在 实时分析 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。
-