经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:分析 Web 日志 (W3C_LOG_PARSE 函数)
此示例使用 W3C_LOG_PARSE
函数在 Amazon Kinesis Data Analytics 中转换字符串。您可以使用 W3C_LOG_PARSE
快速格式化 Apache 日志。有关更多信息,请参阅 Amazon Managed Service for Apache Flink SQL 参考中的 W3C_LOG_PARSE。
在本示例中,您将日志记录写入到 Amazon Kinesis 数据流中。示例日志如下所示:
{"Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pba.gif HTTP/1.1" 304 0"} {"Log":"192.168.254.30 - John [24/May/2004:22:01:03 -0700] "GET /icons/apache_pbb.gif HTTP/1.1" 304 0"} {"Log":"192.168.254.30 - John [24/May/2004:22:01:04 -0700] "GET /icons/apache_pbc.gif HTTP/1.1" 304 0"} ...
然后,您在控制台上创建一个 Kinesis Data Analytics 应用程序,并将 Kinesis 数据流作为流式传输源。发现过程读取流式传输源上的示例记录,并推断出具有一个列(日志)的应用程序内部架构,如下所示:

然后,您将使用应用程序代码和 W3C_LOG_PARSE
函数解析日志,创建另一应用程序内部流(将不同日志字段放置在单独的列中),如下所示:

步骤 1:创建 Kinesis 数据流
创建一个 Amazon Kinesis Data Stream 并填充日志记录,如下所示:
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 数据流。
-
选择 创建 Kinesis 流,然后创建带有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建流。
-
运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。
import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] " '"GET /icons/apache_pb.gif HTTP/1.1" 304 0' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
步骤 2:创建 Kinesis Data Analytics 应用程序
创建一个 Kinesis Data Analytics 应用程序,如下所示:
打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择 创建应用程序,键入应用程序名称,然后选择 创建应用程序。
-
在应用程序详细信息页面上,选择 连接流数据。
-
在 连接到源 页面上,执行以下操作:
-
选择在上一部分中创建的流。
-
选择创建 IAM 角色的选项。
-
选择 发现架构。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。
-
选择 保存并继续。
-
-
在应用程序详细信息页面上,选择 转到 SQL编辑器。要启动应用程序,请在显示的对话框中选择 是,启动应用程序。
-
在 SQL 编辑器中编写应用程序代码并确认结果如下所示:
-
复制下面的应用程序代码并将其粘贴到编辑器中。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), column2 VARCHAR(16), column3 VARCHAR(16), column4 VARCHAR(16), column5 VARCHAR(16), column6 VARCHAR(16), column7 VARCHAR(16)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM l.r.COLUMN1, l.r.COLUMN2, l.r.COLUMN3, l.r.COLUMN4, l.r.COLUMN5, l.r.COLUMN6, l.r.COLUMN7 FROM (SELECT STREAM W3C_LOG_PARSE("log", 'COMMON') FROM "SOURCE_SQL_STREAM_001") AS l(r);
-
选择 保存并运行 SQL。在 实时分析 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。
-