使用 Kinesis 进行电动汽车充电站数据流摄取教程 - Amazon Redshift
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Kinesis 进行电动汽车充电站数据流摄取教程

此过程演示如何从名为 ev_station_data 的 Kinesis 流中摄取数据。该流包含来自不同电动汽车充电站的消费数据,采用 JSON 格式。Schema 定义得很好。此示例演示了如何将数据存储为原始 JSON 格式,以及如何在摄取时将 JSON 数据转换为 Amazon Redshift 数据类型。

生产者设置

  1. 使用 Amazon Kinesis Data Streams,按照以下步骤创建一个名为 ev_station_data 的流。对于容量模式,选择按需。有关更多信息,请参阅通过 Amazon 管理控制台创建流

  2. Amazon Kinesis Data Generator 可以帮助您生成测试数据以供您的流使用。按照工具中详细说明的步骤开始使用,然后使用以下数据模板生成数据:

    { "_id" : "{{random.uuid}}", "clusterID": "{{random.number( { "min":1, "max":50 } )}}", "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}", "kWhDelivered": "{{commerce.price}}", "stationID": "{{random.number( { "min":1, "max":467 } )}}", "spaceID": "{{random.word}}-{{random.number( { "min":1, "max":20 } )}}", "timezone": "America/Los_Angeles", "userID": "{{random.number( { "min":1000, "max":500000 } )}}" }

    流数据中的每个 JSON 对象都包含以下属性。

    { "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb", "clusterID": "49", "connectionTime": "2022-01-31 13:17:15", "kWhDelivered": "74.00", "stationID": "421", "spaceID": "technologies-2", "timezone": "America/Los_Angeles", "userID": "482329" }

Amazon Redshift 设置

以下步骤演示了如何配置实体化视图以摄取数据。

  1. 创建外部 Schema 以将 Kinesis 中的数据映射到某个 Redshift 对象。

    CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';

    有关如何配置 IAM 角色的更多信息,请参阅开始使用 Amazon Kinesis Data Streams 流式摄取

  2. 创建一个实体化视图以使用流数据。以下示例演示了定义实体化视图以摄取 JSON 源数据的两种方法。

    首先,以半结构化的 SUPER 格式存储流记录。在此示例中,JSON 源存储在 Redshift 中,并未转换为 Redshift 类型。

    CREATE MATERIALIZED VIEW ev_station_data AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_parse(kinesis_data) as payload FROM evdata."ev_station_data" WHERE can_json_parse(kinesis_data);

    而在下面的实体化视图定义中,实体化视图具有在 Redshift 中定义的 Schema。实体化视图按照来自流的 UUID 值分配,并按 approximatearrivaltimestamp 值存储。

    CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS SELECT refresh_time, approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID, json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID FROM evdata."ev_station_data" WHERE LENGTH(kinesis_data) < 65355;

查询流

  1. 查询刷新后的实体化视图以获取使用情况统计信息。

    SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime ,SUM(kWhDelivered) AS Energy_Consumed ,count(distinct userID) AS #Users from ev_station_data_extract group by to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') order by 1 desc;
  2. 查看结果。

    connectiontime energy_consumed #users 2022-02-08 16:07:21+00 4139 10 2022-02-08 16:07:20+00 5571 10 2022-02-08 16:07:19+00 8697 20 2022-02-08 16:07:18+00 4408 10 2022-02-08 16:07:17+00 4257 10 2022-02-08 16:07:16+00 6861 10 2022-02-08 16:07:15+00 5643 10 2022-02-08 16:07:14+00 3677 10 2022-02-08 16:07:13+00 4673 10 2022-02-08 16:07:11+00 9689 20