对于新项目,我们建议您使用新的 Kinesis Data Analytics 工作室,而不是 SQL 应用程序的 Kinesis Data Analytics。Kinesis Data Analytics Studio 将易用性与高级分析功能相结合,使您能够在几分钟内构建复杂的流处理应用程序。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
示例:根据正则表达式分析日志字符串 (REGEX_LOG_PARSE 函数)
此示例使用REGEX_LOG_PARSE
函数在 Amazon Kinesis Data Analytics 中转换字符串。 REGEX_LOG_PARSE
根据默认 Java 正则表达式模式解析字符串。有关更多信息,请参阅 Amazon Kinesis Data Analytics SQL 参考中的 REGEX_LOG_PARSE。
在此示例中,您将以下记录写入Amazon Kinesis 直播:
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""} {"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""} {"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""} ...
然后,您在主机上创建 Amazon Kinesis 数据分析应用程序,将 Kinesis 数据流作为流媒体源。发现过程读取有关流式传输源的示例记录,并推断出具有一列 (LOGENTRY) 的应用程序内部架构,如下所示。

然后,在 REGEX_LOG_PARSE
函数中使用应用程序代码分析日志字符串从而检索数据元素。随后将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:

步骤 1:创建 Kinesis Data Streams
创建 Amazon Kinesis 数据流并按如下方式填充日志记录:
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 Data Streams (数据流)。
-
选择 Create Kinesis stream (创建 Kinesis 流),然后创建带有一个分片的流。有关更多信息,请参阅 Amazon Kinesis Dat a Streams 开发人员指南中的创建流。
-
运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。
import json import boto3 STREAM_NAME = 'ExampleInputStream' def get_data(): return { 'LOGENTRY': '203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] ' '"GET /index.php HTTP/1.1" 200 125 "-" ' '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
步骤 2:创建 Kinesis Data Analytics 应用程序
接下来,按如下方式创建 Amazon Kinesis 数据分析应用程序:
打开 Kinesis Data Analytics 控制台,网址为 https://console.aws.amazon.com/kinesisanalytics
。 -
选择 Create application,并指定应用程序名称。
-
在应用程序详细信息页面上,选择 Connect streaming data (连接流数据)。
-
在 Connect to source (连接到源) 页面上,执行以下操作:
-
选择在上一部分中创建的流。
-
选择创建 IAM 角色的选项。
-
选择 Discover schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。
-
选择 Save and continue。
-
-
在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)。
-
在 SQL 编辑器中编写应用程序代码并确认结果如下所示:
-
复制下面的应用程序代码并将其粘贴到编辑器中。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (logentry VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM T.LOGENTRY, T.REC.COLUMN1, T.REC.COLUMN2 FROM (SELECT STREAM LOGENTRY, REGEX_LOG_PARSE(LOGENTRY, '(\w.+) (\d.+) (\w.+) (\w.+)') AS REC FROM SOURCE_SQL_STREAM_001) AS T;
-
选择 Save and run SQL。在 Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。
-