经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:
1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。
2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:使用正则表达式替换子字符串 (REGEX_REPLACE 函数)
此示例使用 REGEX_REPLACE
函数在 Amazon Kinesis Data Analytics 中转换字符串。REGEX_REPLACE
将子字符串替换为备用子字符串。有关更多信息,请参阅 Amazon Managed Service for Apache Flink SQL 参考中的 REGEX_REPLACE。
在本示例中,您将以下记录写入到 Amazon Kinesis 数据流中:
{ "REFERRER" : "http://www.amazon.com" } { "REFERRER" : "http://www.amazon.com"} { "REFERRER" : "http://www.amazon.com"} ...
然后,您在控制台上创建一个 Kinesis Data Analytics 应用程序,并将 Kinesis 数据流作为流式传输源。发现过程读取有关流式传输源的示例记录,并推断出具有一列 (REFERRER) 的应用程序内部架构,如下所示。
然后,通过 REGEX_REPLACE
函数使用应用程序代码将 URL 转换为使用 https://
而不是 http://
。随后将结果数据插入另一个应用程序内部流,如下所示:
步骤 1:创建 Kinesis 数据流
创建一个 Amazon Kinesis 数据流并填充日志记录,如下所示:
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 数据流。
-
选择 创建 Kinesis 流,然后创建带有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的创建流。
-
运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。
import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"REFERRER": "http://www.amazon.com"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
步骤 2:创建 Kinesis Data Analytics 应用程序
接下来,创建一个 Kinesis Data Analytics 应用程序,如下所示:
打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择 创建应用程序,键入应用程序名称,然后选择 创建应用程序。
-
在应用程序详细信息页面上,选择 连接流数据。
-
在 连接到源 页面上,执行以下操作:
-
选择在上一部分中创建的流。
-
选择创建 IAM 角色的选项。
-
选择 发现架构。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。
-
选择 保存并继续。
-
-
在应用程序详细信息页面上,选择 转到 SQL编辑器。要启动应用程序,请在显示的对话框中选择 是,启动应用程序。
-
在 SQL 编辑器中编写应用程序代码并确认结果,如下所示:
-
复制下面的应用程序代码并将其粘贴到编辑器中:
-- CREATE OR REPLACE STREAM for cleaned up referrer CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) FROM "SOURCE_SQL_STREAM_001";
-
选择 保存并运行 SQL。在 实时分析 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。
-