读取 - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

从2025年6月20日起,亚马逊Timestream版 LiveAnalytics 将不再向新客户开放。如果您想使用亚马逊 Timestream LiveAnalytics,请在该日期之前注册。现有客户可以继续照常使用该服务。有关更多信息,请参阅 Amazon Timestream 以了解 LiveAnalytics 可用性变更。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

读取

您可以从联网设备、IT 系统和工业设备收集时间序列数据,并将其写入 Timestream 进行实时分析。当时间序列属于同一个表时,Timestream for Live Analytics 允许您在单个写入请求中写入来自单个时间序列的数据点和/或来自多个序列的数据点。为方便起见,Timestream for Live Analytics 为您提供了一个灵活的架构,该架构可根据您在调用数据库写入数据时指定的度量值的维度名称和数据类型,自动检测 Timestream for Live Analytics 表的列名和数据类型。您也可以将批量数据写入 Timestream 以进行实时分析。

注意

Live Analytics 的 Timestream 支持读取的最终一致性语义。这意味着,当您在将一批数据写入 Timestream for Live Analytics 后立即查询数据时,查询结果可能无法反映最近完成的写入操作的结果。结果可能还包括一些陈旧的数据。同样,在写入具有一个或多个新维度的时间序列数据时,查询可以在短时间内返回列的部分子集。如果您在短时间后重复这些查询请求,则结果应返回最新数据。

您可以使用、、或通过Amazon SDKsAmazon CLI、、Amazon LambdaAmazon IoT Core适用于 Apache Flink 的亚马逊托管服务Amazon KinesisAmazon MSK、和写入数据开源 Telegraf

数据类型

Live Analytics 的 Timestream 支持以下数据类型进行写入。

数据类型 描述

BIGINT

表示 64 位有符号整数。

BOOLEAN

代表逻辑的两个真值,即真值和假值。

DOUBLE

实现二进制浮点运算的 IEEE 标准 754 的 64 位可变精度。

注意

有查询语言函数InfinityNaN双精度值可以在查询中使用。但是你不能将这些值写入 Timestream。

VARCHAR

可变长度字符数据,最大长度可选。最大限制为 2 KB。

多度量记录的数据类型。此数据类型包括一个或多个类型为BIGINTBOOLEANDOUBLEVARCHAR、和的度量TIMESTAMP

TIMESTAMP

使用纳秒精度为 UTC 的时间表示实例,跟踪自 Unix 时间以来的时间。目前仅支持多度量记录(即在该类型的度量值内MULTI)使用此数据类型。

YYYY-MM-DD hh:mm:ss.sssssssss

将支持时间戳写入1970-01-01 00:00:00.000000000到的范围内。2262-04-11 23:47:16.854775807

没有预先定义架构

在将数据发送到 Amazon Timestream 进行实时分析之前,您必须使用 Amazon Web Services Management Console、用于实时分析的时间流或用于实时分析 SDKs的时间流 API 操作创建数据库和表。有关更多信息,请参阅创建 数据库创建表。创建表时,您无需预先定义架构。Amazon Timestream for Live Analytics 会根据发送的数据点的度量和维度自动检测架构,因此您无需再离线更改架构即可使其适应快速变化的时间序列数据。

写入数据(插入和向上移动)

Amazon Timestream for Live Analytics 中的写入操作使您能够插入和更新数据。默认情况下,在 Amazon Timestream for Live Analytics 中写入数据遵循第一个写入者获胜的语义,即数据仅存储为附加,重复的记录将被拒绝。虽然第一个写入者获胜语义可以满足许多时间序列应用程序的要求,但在某些情况下,应用程序需要以等性方式更新现有记录和/或使用最后一个写入者获胜语义写入数据,在这种情况下,版本最高的记录存储在服务中。为了解决这些情况,Amazon Timestream for Live Analytics 提供了更新数据的能力。Upsert 是一种在记录不存在时将记录插入系统的操作,或者在记录存在时更新该记录的操作。当记录更新时,它会以等性方式更新。

没有记录级别的删除操作。但是可以删除表和数据库。

将数据写入存储器和磁性存储器

Amazon Timestream for Live Analytics 能够将数据直接写入内存存储和磁性存储。内存存储针对高吞吐量数据写入进行了优化,而磁存储则针对较低吞吐量的延迟数据写入进行了优化。

迟到的数据是指时间戳早于当前时间且超出内存存储保留期的数据。必须通过启用磁存储写入表来明确启用将迟到的数据写入磁存储的功能。此外,MagneticStoreRejectedDataLocation是在创建表时定义的。要写入磁性存储,的调用者WriteRecords必须拥有在创建表MagneticStoreRejectedDataLocation期间对中指定的 S3 存储桶的S3:PutObject权限。有关更多信息,请参阅CreateTableWriteRecordsPutObject

使用单度记录和多度量记录写入数据

Amazon Timestream for Live Analytics 提供了使用两种类型的记录写入数据的功能,即单量记录和多度量记录。

单项测量记录

单项测量记录使您可以为每条记录发送一个度量。当使用这种格式将数据发送到 Timestream 进行实时分析时,实时分析的 Timestream 会为每条记录创建一个表格行。这意味着,如果一台设备发出 4 个指标,并且每个指标都作为单一指标记录发送,则 Timestream for Live Analytics 将在表中创建 4 行来存储这些数据,并且每行都会重复设备属性。如果您想监控应用程序中的单个指标,或者您的应用程序不同时发出多个指标,则建议使用这种格式。

多重测量记录

使用多度量记录,您可以在单个表格行中存储多个度量,而不是在每个表行中存储一个度量。因此,多指标记录使您能够将现有数据从关系数据库迁移到用于实时分析的 Amazon Timestream,只需进行最少的更改。

您还可以在一次写入请求中批处理比单一测量记录更多的数据。这提高了数据写入吞吐量和性能,还降低了数据写入成本。这是因为在写入请求中批量处理更多数据可以让 Amazon Timestream for Live Analytics 在单个写入请求中识别更多可重复的数据(如果适用),并且只需为重复的数据收取一次费用。

多重测量记录

借助多重测量记录,您可以将时间序列数据以更紧凑的格式存储在存储器和磁性存储器中,这有助于降低数据存储成本。此外,紧凑的数据存储有助于编写更简单的查询以进行数据检索,提高查询性能并降低查询成本。

此外,多度量记录还支持 TIMESTAMP 数据类型,用于在时间序列记录中存储多个时间戳。多度量记录中的 TIMESTAMP 属性支持未来或过去的时间戳。因此,多度量记录有助于提高性能、降低成本和简化查询,并为存储不同类型的关联度量提供了更大的灵活性。

优势

以下是使用多重测量记录的好处。

  • 性能和成本-多指标记录使您能够在单个写入请求中写入多个时间序列度量。这增加了写入吞吐量并降低了写入成本。借助多重测量记录,您可以以更紧凑的方式存储数据,这有助于降低数据存储成本。多度量记录的紧凑数据存储减少了查询处理的数据。这旨在提高整体查询性能并帮助降低查询成本。

  • 查询简单性-使用多度量记录,您无需在查询中编写复杂的公用表表达式 (CTEs) 即可读取具有相同时间戳的多个度量。这是因为度量以列的形式存储在单个表行中。因此,多重测量记录可以编写更简单的查询。

  • 灵活的数据建模 — 您可以使用 TIMESTAMP 数据类型和多度量记录将未来的时间戳写入 Timestream 进行实时分析。除了记录中的时间字段外,多度量记录还可以具有多个时间戳数据类型的属性。在多度量记录中,TIMESTAMP 属性可以具有未来或过去的时间戳,其行为类似于时间字段,唯一的不同是实时分析的 Timestream 不会对多度量记录中时间戳类型的值进行索引。

使用案例

您可以将多测量记录用于任何时间序列应用程序,该应用程序在任何给定时间从同一设备生成多个测量值。以下是一些示例应用程序。

  • 一种在给定时间生成数百个指标的视频流媒体平台。

  • 生成血氧水平、心率和脉搏等测量值的医疗设备。

  • 工业设备,例如石油钻井平台,可生成指标、温度和天气传感器。

  • 使用一个或多个微服务架构的其他应用程序。

示例:监控视频流应用程序的性能和运行状况

假设一个在 200 个 EC2实例上运行的视频流应用程序。您想使用 Amazon Timestream for Live Analytics 来存储和分析应用程序发出的指标,这样您就可以了解应用程序的性能和运行状况,快速识别异常情况,解决问题并发现优化机会。

我们将使用单测量记录和多度量记录对这种情况进行建模,然后比较/对比这两种方法。对于每种方法,我们都做出以下假设。

  • 每个 EC2 实例每秒发出四个度量(video_startup_time、rebuffering_ratio、video_playback_failers 和 average_frame_rate)和四个维度(device_id、device_type、os_version和区域)。

  • 您想在存储器中存储 6 小时的数据,在磁性存储中存储 6 个月的数据。

  • 为了识别异常情况,您设置了 10 个查询,每分钟运行一次,以识别过去几分钟内的任何异常活动。您还构建了一个包含八个小部件的仪表板,用于显示最近 6 小时的数据,因此您可以有效地监控应用程序。该仪表板可在任何给定时间由五个用户访问,并且每小时自动刷新一次。

使用单项测量记录

数据建模:使用单一测量记录,我们将为四个测量值(视频启动时间、重新缓冲比率、视频播放失败和平均帧速率)中的每一个创建一条记录。每条记录将有四个维度(device_id、device_type、os_version 和区域)和一个时间戳。

写入:当您将数据写入 Amazon Timestream 进行实时分析时,记录的构造方式如下。

public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension device_id = new Dimension().withName("device_id").withValue("12345678"); final Dimension device_type = new Dimension().withName("device_type").withValue("iPhone 11"); final Dimension os_version = new Dimension().withName("os_version").withValue("14.8"); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); dimensions.add(device_id); dimensions.add(device_type); dimensions.add(os_version); dimensions.add(region); Record videoStartupTime = new Record() .withDimensions(dimensions) .withMeasureName("video_startup_time") .withMeasureValue("200") .withMeasureValueType(MeasureValueType.BIGINT) .withTime(String.valueOf(time)); Record rebufferingRatio = new Record() .withDimensions(dimensions) .withMeasureName("rebuffering_ratio") .withMeasureValue("0.5") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); Record videoPlaybackFailures = new Record() .withDimensions(dimensions) .withMeasureName("video_playback_failures") .withMeasureValue("0") .withMeasureValueType(MeasureValueType.BIGINT) .withTime(String.valueOf(time)); Record averageFrameRate = new Record() .withDimensions(dimensions) .withMeasureName("average_frame_rate") .withMeasureValue("0.5") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); records.add(videoStartupTime); records.add(rebufferingRatio); records.add(videoPlaybackFailures); records.add(averageFrameRate); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }

存储单项测量记录时,数据的逻辑表示方式如下。

Time device_id 设备类型 os_version 区域 measure_name measure_value::bigint measure_value::double

2021-09-07 21:48:44 .000000000

12345678

iPhone 11

14.8

us-east-1

video_startup_time

200

2021-09-07 21:48:44 .000000000

12345678

iPhone 11

14.8

us-east-1

rebuffering_ration

0.5

2021-09-07 21:48:44 .000000000

12345678

iPhone 11

14.8

us-east-1

视频播放失败

0

2021-09-07 21:48:44 .000000000

12345678

iPhone 11

14.8

us-east-1

平均帧速率

0.85

2021-09-07 21:53:44 .000000000

12345678

iPhone 11

14.8

us-east-1

video_startup_time

500

2021-09-07 21:53:44 .000000000

12345678

iPhone 11

14.8

us-east-1

rebuffering_ration

1.5

2021-09-07 21:53:44 .000000000

12345678

iPhone 11

14.8

us-east-1

视频播放失败

10

2021-09-07 21:53:44 .000000000

12345678

iPhone 11

14.8

us-east-1

平均帧速率

0.2

查询:您可以编写一个查询,检索过去 15 分钟内收到的具有相同时间戳的所有数据点,如下所示。

with cte_video_startup_time as ( SELECT time, device_id, device_type, os_version, region, measure_value::bigint as video_startup_time FROM table where time >= ago(15m) and measure_name=”video_startup_time”), cte_rebuffering_ratio as ( SELECT time, device_id, device_type, os_version, region, measure_value::double as rebuffering_ratio FROM table where time >= ago(15m) and measure_name=”rebuffering_ratio”), cte_video_playback_failures as ( SELECT time, device_id, device_type, os_version, region, measure_value::bigint as video_playback_failures FROM table where time >= ago(15m) and measure_name=”video_playback_failures”), cte_average_frame_rate as ( SELECT time, device_id, device_type, os_version, region, measure_value::double as average_frame_rate FROM table where time >= ago(15m) and measure_name=”average_frame_rate”) SELECT a.time, a.device_id, a.os_version, a.region, a.video_startup_time, b.rebuffering_ratio, c.video_playback_failures, d.average_frame_rate FROM cte_video_startup_time a, cte_buffering_ratio b, cte_video_playback_failures c, cte_average_frame_rate d WHERE a.time = b.time AND a.device_id = b.device_id AND a.os_version = b.os_version AND a.region=b.region AND a.time = c.time AND a.device_id = c.device_id AND a.os_version = c.os_version AND a.region=c.region AND a.time = d.time AND a.device_id = d.device_id AND a.os_version = d.os_version AND a.region=d.region

工作量成本:这种工作量的成本估计为每月373.23美元,如果有单一测量记录

使用多度量记录

数据建模:对于多度量记录,我们将创建一个包含所有四个度量(视频启动时间、重新缓冲比率、视频播放失败率和平均帧速率)、所有四个维度(device_id、deversion_type、os_version和区域)以及时间戳的记录。

写入:当您将数据写入 Amazon Timestream 进行实时分析时,记录的构造方式如下。

public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension device_id = new Dimension().withName("device_id").withValue("12345678"); final Dimension device_type = new Dimension().withName("device_type").withValue("iPhone 11"); final Dimension os_version = new Dimension().withName("os_version").withValue("14.8"); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); dimensions.add(device_id); dimensions.add(device_type); dimensions.add(os_version); dimensions.add(region); Record videoMetrics = new Record() .withDimensions(dimensions) .withMeasureName("video_metrics") .withTime(String.valueOf(time)); .withMeasureValueType(MeasureValueType.MULTI) .withMeasureValues( new MeasureValue() .withName("video_startup_time") .withValue("0") .withValueType(MeasureValueType.BIGINT), new MeasureValue() .withName("rebuffering_ratio") .withValue("0.5") .withType(MeasureValueType.DOUBLE), new MeasureValue() .withName("video_playback_failures") .withValue("0") .withValueType(MeasureValueType.BIGINT), new MeasureValue() .withName("average_frame_rate") .withValue("0.5") .withValueType(MeasureValueType.DOUBLE)) records.add(videoMetrics); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }

存储多度量记录时,数据的逻辑表示方式如下。

Time device_id 设备类型 os_version 区域 measure_name video_startup_time rebuffering_ration video_ 播放失败 平均帧速率

2021-09-07 21:48:44 .000000000

12345678

iPhone 11

14.8

us-east-1

视频指标

200

0.5

0

0.85

2021-09-07 21:53:44 .000000000

12345678

iPhone 11

14.8

us-east-1

视频指标

500

1.5

10

0.2

查询:您可以编写一个查询,检索过去 15 分钟内收到的具有相同时间戳的所有数据点,如下所示。

SELECT time, device_id, device_type, os_version, region, video_startup_time, rebuffering_ratio, video_playback_failures, average_frame_rate FROM table where time >= ago(15m)

工作负载成本:如果有多项记录,工作量成本估计为127.43美元。

注意

在这种情况下,使用多指标记录可将每月的总体估计支出减少2.5倍,数据写入成本降低3.3倍,存储成本降低3.3倍,查询成本降低1.2倍。

使用过去或将来存在的时间戳写入数据

Timestream for Live Analytics 提供了通过几种不同的机制写入具有位于内存存储保留窗口之外的时间戳的数据的能力。

  • 磁性存储写入 — 您可以通过磁存储写入将迟到的数据直接写入磁存储。要使用磁存储写入,必须先为表启用磁存储写入。然后,您可以使用与向内存存储中写入数据相同的机制将数据提取到表中。Amazon Timestream for Live Analytics 将根据其时间戳自动将数据写入磁性存储。

    注意

    磁存储的 write-to-read延迟可能长达 6 小时,这与将数据写入内存存储不同,在内存存储中, write-to-read延迟在亚秒范围内。

  • 度@@ 量的 TIMESTAMP 数据类型-您可以使用 TIMESTAMP 数据类型来存储过去、现在或未来的数据。除了记录中的时间字段外,多度量记录还可以具有多个时间戳数据类型的属性。在多度量记录中,TIMESTAMP 属性可以具有未来或过去的时间戳,其行为类似于时间字段,唯一的不同是实时分析的 Timestream 不会对多度量记录中时间戳类型的值进行索引。

    注意

    只有多度量记录支持 TIMESTAMP 数据类型。

读取的最终一致性

Live Analytics 的 Timestream 支持读取的最终一致性语义。这意味着,当您在将一批数据写入 Timestream for Live Analytics 后立即查询数据时,查询结果可能无法反映最近完成的写入操作的结果。如果您在短时间后重复这些查询请求,则结果应返回最新数据。