本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
例如:将字符串拆分到多个字段 (VARIABLE_COLUMN_COLUMN_COLUMN/
此示例使用VARIABLE_COLUMN_LOG_PARSE
在 Kinesis Data Analytics 中处理函数。VARIABLE_COLUMN_LOG_PARSE
将输入字符串拆分到多个由分隔符或分隔符字符串分隔的字段。有关更多信息,请参阅 。变量_COLUMN_LOG_PARSE中的Amazon Kinesis Data Analytics.
在本示例中,您将半结构化记录写入到 Amazon Kinesis Data Streams 中。示例记录如下所示:
{ "Col_A" : "string", "Col_B" : "string", "Col_C" : "string", "Col_D_Unstructured" : "value,value,value,value"} { "Col_A" : "string", "Col_B" : "string", "Col_C" : "string", "Col_D_Unstructured" : "value,value,value,value"}
然后,在控制台中创建一个 Amazon Kinesis 数据分析应用程序,并将 Kinesis 流用作流式源。发现过程读取流式传输源上的示例记录,并推断出具有四个列的应用程序内部架构,如下所示:

然后,您将使用应用程序代码和 VARIABLE_COLUMN_LOG_PARSE
函数解析逗号分隔的值,将规范化的行插入到其他应用程序内部流,如下所示:

第 1 步:创建 Kinesis Data Streams
按如下方式创建 Amazon Kinesis Data Streams 并填充日志记录:
登录到 Amazon Web Services Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 Data Streams (数据流)。
-
选择 Create Kinesis stream (创建 Kinesis 流),然后创建带有一个分片的流。有关更多信息,请参阅 。创建流中的Amazon Kinesis Data Streams.
-
运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。
import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'Col_A': 'a', 'Col_B': 'b', 'Col_C': 'c', 'Col_E_Unstructured': 'x,y,z'} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
第 2 步:创建 Kinesis Data Analytics 应用程序
按如下方式创建 Amazon Kinesis Data Analytics 应用程序:
打开 Kinesis Data Analytics 控制台https://console.aws.amazon.com/kinesisanalytics
. -
选择 Create application (创建应用程序),键入应用程序名称,然后选择 Create application (创建应用程序)。
-
在应用程序详细信息页面上,选择 Connect streaming data (连接流数据)。
-
在 Connect to source (连接到源) 页面上,执行以下操作:
-
选择在上一部分中创建的流。
-
选择创建 IAM 角色的选项。
-
选择 Discover schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。请注意推断的架构仅包含一列。
-
选择 Save and continue。
-
-
在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)。
-
在 SQL 编辑器中,编写应用程序代码并确认结果:
-
复制下面的应用程序代码并将其粘贴到编辑器中:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16), "column_B" VARCHAR(16), "column_C" VARCHAR(16), "COL_1" VARCHAR(16), "COL_2" VARCHAR(16), "COL_3" VARCHAR(16)); CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM t."Col_A", t."Col_B", t."Col_C", t.r."COL_1", t.r."COL_2", t.r."COL_3" FROM (SELECT STREAM "Col_A", "Col_B", "Col_C", VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured", 'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)', ',') AS r FROM "SOURCE_SQL_STREAM_001") as t;
-
选择 Save and run SQL。在 Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。
-