示例:使用正则表达式替换子字符串 (REGEX_REPLACE 函数) - Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:使用正则表达式替换子字符串 (REGEX_REPLACE 函数)

此示例使用REGEX_REPLACE函数转换 Amazon Kinesis Data Analytics 中的字符串。REGEX_REPLACE将子字符串替换为备用子字符串。有关更多信息,请参阅 。正则表达式替换中的Amazon Kinesis Data Analytics SQL 参考

在本示例中,您将以下记录写入到 Amazon Kinesis 数据流中:

{ "REFERRER" : "http://www.amazon.com" } { "REFERRER" : "http://www.amazon.com"} { "REFERRER" : "http://www.amazon.com"} ...

然后,您在控制台中创建一个 Amazon Kinesis 数据分析应用程序,并将 Kinesis 数据流作为流式源。发现过程读取有关流式传输源的示例记录,并推断出具有一列 (REFERRER) 的应用程序内部架构,如下所示。


                控制台屏幕截图,显示具有引用站点列中的 URL 列表的应用程序内部架构。

然后,通过 REGEX_REPLACE 函数使用应用程序代码将 URL 转换为使用 https:// 而不是 http://。随后将结果数据插入另一个应用程序内部流,如下所示:


                控制台屏幕截图,显示具有 ROWTIME、ingest_time 和引用站点列的结果数据表。

第 1 步:创建 Kinesis Data Streams

按如下方式创建 Amazon Kinesis 数据流并填充日志记录:

  1. 登录到Amazon Web Services Management Console,然后打开 Kinesis 控制台https://console.aws.amazon.com/kinesis

  2. 在导航窗格中,选择 Data Streams (数据流)

  3. 选择 Create Kinesis stream (创建 Kinesis 流),然后创建带有一个分片的流。有关更多信息,请参阅 。创建流中的Amazon Kinesis Data Streams 开发者指南

  4. 运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。

    import json import boto3 STREAM_NAME = 'ExampleInputStream' def get_data(): return {'REFERRER': 'http://www.amazon.com'} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey='partitionkey') if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))

第 2 步:创建 Kinesis Data Analytics 应用程序

接下来,按以下步骤创建 Amazon Kinesis Data Analytics 应用程序:

  1. 打开 Kinesis Data Analytics 控制台,网站为https://console.aws.amazon.com/kinesisanalytics

  2. 选择 Create application (创建应用程序),键入应用程序名称,然后选择 Create application (创建应用程序)

  3. 在应用程序详细信息页面上,选择 Connect streaming data (连接流数据)

  4. Connect to source (连接到源) 页面上,执行以下操作:

    1. 选择在上一部分中创建的流。

    2. 选择创建 IAM 角色的选项。

    3. 选择 Discover schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。

    4. 选择 Save and continue

  5. 在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)

  6. 在 SQL 编辑器中编写应用程序代码并确认结果,如下所示:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中:

      -- CREATE OR REPLACE STREAM for cleaned up referrer CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) FROM "SOURCE_SQL_STREAM_001";
    2. 选择 Save and run SQL。在 Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。