示例:转换 DateTime 值 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:转换 DateTime 值

Amazon Kinesis Data Analytics 支持将列转换为时间戳。例如,除了 ROWTIME 列以外,建议您将自己的时间戳用作 GROUP BY 子句中另一个基于时间的窗口。Kinesis Data Analytics 提供用于处理日期和时间字段的操作和 SQL 函数。

  • 日期和时间运算符 - 您可以对日期、时间和间隔数据类型执行算术运算。有关更多信息,请参阅 Amazon Managed Service for Apache Flink SQL 参考中的日期、时间戳和间隔运算符

     

  • SQL 函数 - 其中包括以下各项。有关更多信息,请参阅Amazon Managed Service for Apache Flink SQL 参考中的日期和时间函数

    • EXTRACT() - 从日期、时间、时间戳或间隔表达式中提取一个字段。

    • CURRENT_TIME - 返回执行查询的时间 (UTC)。

    • CURRENT_DATE - 返回执行查询的日期 (UTC)。

    • CURRENT_TIMESTAMP - 返回执行查询的时间戳 (UTC)。

    • LOCALTIME - 返回 Kinesis Data Analytics 运行环境所定义执行查询的当前时间 (UTC)。

    • LOCALTIMESTAMP - 返回 Kinesis Data Analytics 运行环境定义的当前时间戳 (UTC)。

       

  • SQL 扩展 - 其中包括以下各项。有关更多信息,请参阅Amazon Managed Service for Apache Flink SQL 参考中的日期和时间函数以及日期时间转换函数

    • CURRENT_ROW_TIMESTAMP - 为流中的每一行返回一个新的时间戳。

    • TSDIFF - 返回两个时间戳的差异 (以毫秒为单位)。

    • CHAR_TO_DATE - 将字符串转换为日期。

    • CHAR_TO_TIME - 将字符串转换为时间。

    • CHAR_TO_TIMESTAMP - 将字符串转换为时间戳。

    • DATE_TO_CHAR - 将日期转换为字符串。

    • TIME_TO_CHAR - 将时间转换为字符串。

    • TIMESTAMP_TO_CHAR - 将时间戳转换为字符串。

上面大多数 SQL 函数都采用一种格式来转换列。格式是灵活多变的。例如,您可以指定采用格式 yyyy-MM-dd hh:mm:ss 将输入字符串 2009-09-16 03:15:24 转换为时间戳。有关更多信息,请参阅Amazon Managed Service for Apache Flink SQL 参考中的字符到时间戳 (Sys)

示例:转换日期

在本示例中,您将以下记录写入到 Amazon Kinesis 数据流中。

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

然后,您在控制台上创建一个 Kinesis Data Analytics 应用程序,并将 Kinesis 流作为流式传输源。发现过程读取流式传输源中的示例记录,并推断出具有两个列 (EVENT_TIMETICKER) 的如下应用程序内部架构。


                    控制台屏幕截图,显示具有事件时间和股票代码列的应用程序内部架构。

然后,将该应用程序代码与 SQL 函数结合使用,以多种方式转换 EVENT_TIME 时间戳字段。随后将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:


                    控制台屏幕截图,显示应用程序内部流中的结果数据。

步骤 1:创建 Kinesis 数据流

创建一个 Amazon Kinesis Data Stream 并填充事件时间和股票代码记录,如下所示:

  1. 登录 Amazon Web Services Management Console 并打开 Kinesis 控制台,网址为 https://console.aws.amazon.com/kinesis。

  2. 在导航窗格中,选择 数据流

  3. 选择 创建 Kinesis 流,然后创建带有一个分片的流。

  4. 运行以下 Python 代码以便用示例数据填充流。此简单代码会不断将具有随机股票代号和当前时间戳的记录写入流中。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

步骤 2:创建 Amazon Kinesis Data Analytics 应用程序

按如下方式创建应用程序:

  1. 打开适用于 Apache Flink 的托管服务控制台,网址为 https://console.aws.amazon.com/kinesisanalytics

  2. 选择 创建应用程序,键入应用程序名称,然后选择 创建应用程序

  3. 在应用程序详细信息页面上,选择 连接流数据,以连接到源。

  4. 连接到源 页面上,执行以下操作:

    1. 选择在上一部分中创建的流。

    2. 选择以创建 IAM 角色。

    3. 选择 发现架构。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

    4. 选择 编辑架构。将 EVENT_TIME 列的 列类型 更改为 TIMESTAMP

    5. 选择 保存架构并更新流示例。在控制台保存架构后,选择 退出

    6. 选择 保存并继续

  5. 在应用程序详细信息页面上,选择 转到 SQL编辑器。要启动应用程序,请在显示的对话框中选择 是,启动应用程序

  6. 在 SQL 编辑器中编写应用程序代码并确认结果,如下所示:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. 选择 保存并运行 SQL。在 实时分析 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。