示例:转换 DateTime 值 - Amazon Kinesis Data Analytics for SQL 应用程序开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

示例:转换 DateTime 值

Amazon Kinesis Data Analytics 支持将列转换为时间戳。例如,除了 ROWTIME 列以外,您可能希望将自己的时间戳作为 GROUP BY 子句中的另一个基于时间的窗口。Kinesis Data Analytics 提供运算和 SQL 函数以处理日期和时间字段。

  • 日期和时间运算符 – 您可以对日期、时间和间隔数据类型执行算术运算。有关详细信息,请参阅 日期、时间戳和Interval操作符Amazon Kinesis Data Analytics SQL 参考.

     

  • SQL 函数 – 其中包括以下函数。有关更多信息,请参阅 https://docs.amazonaws.cn/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html 中的Amazon Kinesis Data Analytics SQL 参考日期和时间函数

    • EXTRACT() – 从日期、时间、时间戳或间隔表达式中提取一个字段。

    • CURRENT_TIME – 返回执行查询的时间 (UTC)。

    • CURRENT_DATE – 返回执行查询的日期 (UTC)。

    • CURRENT_TIMESTAMP – 返回执行查询的时间戳 (UTC)。

    • LOCALTIME – 返回执行查询的当前时间 (UTC),由运行 Kinesis Data Analytics 的环境定义。

    • LOCALTIMESTAMP – 返回当前时间戳 (UTC),由运行 Kinesis Data Analytics 的环境定义。

       

  • SQL 扩展 – 其中包括以下扩展。有关更多信息,请参阅 https://docs.amazonaws.cn/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html 中的日期和时间函数和Amazon Kinesis Data Analytics SQL 参考日期时间转换函数

    • CURRENT_ROW_TIMESTAMP – 为流中的每一行返回新的时间戳。

    • TSDIFF – 返回两个时间戳的差异(以毫秒为单位)。

    • CHAR_TO_DATE – 将字符串转换为日期。

    • CHAR_TO_TIME – 将字符串转换为时间。

    • CHAR_TO_TIMESTAMP – 将字符串转换为时间戳。

    • DATE_TO_CHAR – 将日期转换为字符串。

    • TIME_TO_CHAR – 将时间转换为字符串。

    • TIMESTAMP_TO_CHAR – 将时间戳转换为字符串。

上面大多数 SQL 函数都采用一种格式来转换列。格式是灵活多变的。例如,您可以指定采用格式 yyyy-MM-dd hh:mm:ss 将输入字符串 2009-09-16 03:15:24 转换为时间戳。有关更多信息,请参阅 https://docs.amazonaws.cn/kinesisanalytics/latest/sqlref/sql-reference-char-to-timestamp.html 中的Amazon Kinesis Data Analytics SQL 参考字符到时间戳 (Sys)

示例:转换日期

在本示例中,您将以下记录写入到 Amazon Kinesis 数据流中:

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

然后,您在控制台上创建一个 Amazon Kinesis 数据分析应用程序,并将 Kinesis 流作为流式传输源。发现过程读取流式传输源中的示例记录,并推断出具有两个列 (EVENT_TIMETICKER) 的如下应用程序内部架构。


                    控制台屏幕截图,显示具有事件时间和股票代码列的应用程序内部架构。

然后,将该应用程序代码与 SQL 函数结合使用,以多种方式转换 EVENT_TIME 时间戳字段。随后将结果数据插入另一个应用程序内部流,如下面的屏幕截图所示:


                    控制台屏幕截图,显示应用程序内部流中的结果数据。

步骤 1. 创建 Kinesis 数据流

创建一个 Amazon Kinesis 数据流并填充事件时间和股票代码记录,如下所示:

  1. 登录 AWS 管理控制台并通过以下网址打开 Kinesis 控制台:https://console.amazonaws.cn/kinesis

  2. 在导航窗格中,选择 Data Streams (数据流)

  3. 选择 Create Kinesis stream (创建 Kinesis 流),然后创建带有一个分片的流。

  4. 运行以下 Python 代码以便用示例数据填充流。此简单代码会不断将具有随机股票代号和当前时间戳的记录写入流中。

    import json import boto3 import random import datetime kinesis = boto3.client('kinesis') def getReferrer(): data = {} now = datetime.datetime.now() str_now = now.isoformat() data['EVENT_TIME'] = str_now data['TICKER'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['PRICE'] = round(price, 2) return data while True: data = json.dumps(getReferrer()) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")

步骤 2. 创建 Amazon Kinesis Data Analytics 应用程序

按如下方式创建应用程序:

  1. 打开 Kinesis Data Analytics 控制台 ( https://console.amazonaws.cn/kinesisanalytics)。

  2. 选择 Create application (创建应用程序),键入应用程序名称,然后选择 Create application (创建应用程序)

  3. 在应用程序详细信息页面上,选择 Connect streaming data (连接流数据),以连接到源。

  4. Connect to source (连接到源) 页面上,执行以下操作:

    1. 选择在上一部分中创建的流。

    2. 选择以创建 IAM 角色。

    3. 选择 Discover schema (发现架构)。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

    4. 选择 Edit Schema (编辑架构)。将 EVENT_TIME 列的 Column type (列类型) 更改为 TIMESTAMP

    5. 选择 Save schema and update stream samples。在控制台保存架构后,选择 Exit (退出)

    6. 选择 Save and continue

  5. 在应用程序详细信息页面上,选择 Go to SQL editor (转到 SQL编辑器)。要启动应用程序,请在显示的对话框中选择 Yes, start application (是,启动应用程序)

  6. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示:

    1. 复制下面的应用程序代码并将其粘贴到编辑器中:

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. 选择 Save and run SQL。在 Real-time analytics (实时分析) 选项卡上,可以查看应用程序已创建的所有应用程序内部流并验证数据。