

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.amazonaws.cn//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 读取
<a name="writes"></a>

 您可以从联网设备、IT 系统和工业设备收集时间序列数据，并将其写入适用于 LiveAnalytics 的 Timestream。当时间序列属于同一个表时，Timestream for Live Analytics 允许您在单个写入请求中写入来自多个序列的单个时间序列 and/or 数据点的数据点。为方便起见，适用于 LiveAnalytics 的 Timestream 提供灵活的架构，能够根据您在调用数据库写入操作时指定的度量值的维度名称和数据类型，自动检测适用于 LiveAnalytics 的 Timestream 表的列名和数据类型。您也可以将批量数据写入适用于 LiveAnalytics 的 Timestream。

**注意**  
 适用于 LiveAnalytics 的 Timestream 支持读取的最终一致性语义。这意味着，如果您将批量数据写入适用于 LiveAnalytics 的 Timestream 后立即查询数据，查询结果可能不会反映最近完成的写入操作结果。结果可能还包含某些陈旧数据。同样，写入具有一个或多个新维度的时间序列数据时，查询可在短时间内返回列的部分子集。如果您在短时间后重复这些读取请求，结果将返回最新的数据。

 您可以使用、、或通过[Amazon SDKs[Amazon CLI](Tools.CLI.md)](getting-started-sdks.md)、、[Amazon Lambda](Lambda.md)、[Amazon IoT Core](IOT-Core.md)[适用于 Apache Flink 的亚马逊托管服务](ApacheFlink.md)[Amazon Kinesis](Kinesis.md)[Amazon MSK](MSK.md)、和写入数据[开源 Telegraf](Telegraf.md)。

**Topics**
+ [数据类型](#writes.data-types)
+ [无预先架构定义](#writes.no-upfront-schema)
+ [写入数据（插入和更新插入）](#writes.writing-data-inserts-upserts)
+ [读取的最终一致性](#writes.eventual-consistency)
+ [使用 API 批量 WriteRecords 写入](writes.batching-writes.md)
+ [批量加载](batch-load-how.md)
+ [在 WriteRecords API 操作和批量加载之间进行选择](writes.writes-or-batch-load.md)

## 数据类型
<a name="writes.data-types"></a>

 适用于 LiveAnalytics 的 Timestream 支持以下数据类型进行写入。


| 数据类型 | 说明 | 
| --- | --- | 
|  BIGINT  |   表示 64 位有符号整数。  | 
|  BOOLEAN  |   表示逻辑的两个真值，即 true 和 false。  | 
|  DOUBLE  |   64 位可变精度实现二进制浮点运算 IEEE 标准 754。  针对 `Infinity` 和 `NaN` 双精度值，存在可在查询中使用的查询语言函数。但不能将这些值写入 Timestream。   | 
|  VARCHAR  |   具有可选最大长度的可变长度字符数据。最大限制为 2KB。  | 
|  MULTI  |   多度量记录的数据类型。此数据类型包括一个或多个类型为 `BIGINT`、`BOOLEAN`、`DOUBLE`、`VARCHAR` 和 `TIMESTAMP` 的度量。  | 
|  TIMESTAMP  |   表示时间实例，采用 UTC 时间的纳秒级精度，跟踪自 Unix 时间以来的时间。目前仅支持多度量记录（即类型为 `MULTI` 的度量值内）使用此数据类型。 `YYYY-MM-DD hh:mm:ss.sssssssss` 写入支持时间戳，范围为 `1970-01-01 00:00:00.000000000` 至 `2262-04-11 23:47:16.854775807`。  | 

## 无预先架构定义
<a name="writes.no-upfront-schema"></a>

 在将数据发送到 Amazon Timestream 进行实时分析之前，您必须使用 Amazon Web Services 管理控制台、用于实时分析的时间流或用于实时分析 SDKs的时间流 API 操作创建数据库和表。有关更多信息，请参阅[创建数据库](console_timestream.md#console_timestream.db.using-console)和[创建表](console_timestream.md#console_timestream.table.using-console)。创建表时，无需预先定义架构。适用于 LiveAnalytics 的 Amazon Timestream 能够根据发送数据点的度量和维度自动检测架构，因此您无需再为适应快速变化的时间序列数据而离线修改架构。

## 写入数据（插入和更新插入）
<a name="writes.writing-data-inserts-upserts"></a>

 适用于 LiveAnalytics 的 Amazon Timestream 中的写入操作使您能够插入和*更新插入*数据。默认情况下，适用于 LiveAnalytics 的 Amazon Timestream 中的写入遵循*首位写入者胜出*语义，即数据仅存储为附加，重复记录将被拒绝。虽然首位写入者胜出语义可满足许多时间序列应用程序的需求，但在某些场景下，应用程序需要以幂等方式更新现有记录和/或采用最后一位写入者胜出语义写入数据，即服务端存储版本最高的记录。为解决这些情况，适用于 LiveAnalytics 的 Amazon Timestream 提供更新插入数据的功能。更新插入是一种操作，当记录不存在时将其插入系统，当记录存在时则更新该记录。当记录更新时，其更新过程具有幂等性。

删除操作不包含记录级别。但可以删除表和数据库。

**将数据写入内存存储和磁性存储**

适用于 LiveAnalytics 的 Amazon Timestream 能够将数据直接写入内存存储和磁性存储。内存存储针对高吞吐量数据写入进行优化，磁性存储针对延迟到达数据的低吞吐量写入进行优化。

延迟到达的数据是指时间戳早于当前时间且超出内存存储保留期的数据。您必须通过为该表启用磁性存储写入功能，明确允许将延迟到达的数据写入磁性存储。此外，`MagneticStoreRejectedDataLocation` 在创建表时进行定义。要写入磁性存储，`WriteRecords` 的调用者必须在创建表期间对 `MagneticStoreRejectedDataLocation` 中指定的 S3 存储桶具备 `S3:PutObject` 权限。有关更多信息，请参阅 [CreateTable](https://docs.amazonaws.cn/timestream/latest/developerguide/API_CreateTable.html)、[WriteRecords](https://docs.amazonaws.cn/timestream/latest/developerguide/API_WriteRecords.html) 和 [PutObject](https://docs.amazonaws.cn/AmazonS3/latest/API/API_PutObject.html)。

**使用单度量记录和多度量记录写入数据**

适用于 LiveAnalytics 的 Amazon Timestream 支持使用两种类型的记录写入数据，即单度量记录和多度量记录。

**单度量记录**

单度量记录允许您每条记录发送一个度量。当使用这种格式将数据发送到适用于 LiveAnalytics 的 Timestream 时，适用于 LiveAnalytics 的 Timestream 会为每条记录创建一个表行。这意味着，如果某个设备发出 4 个指标，且每个指标都作为单度量记录发送，则适用于 LiveAnalytics 的 Timestream 将在表中创建 4 个行以存储这些数据，并且每行都具有相同的设备属性。如果需要监控应用程序中的单个指标，或者应用程序不会同时发出多个指标，则建议使用此格式。

**多度量记录**

使用多度量记录时，您可以在单个表行中存储多个度量，而非每个表行存储一个度量。因此，多度量记录使您能够将现有数据从关系数据库迁移至适用于 LiveAnalytics 的 Amazon Timestream，且仅需极少修改。

您还可以在单次写入请求中批量处理比单度量记录更多的数据。这可提高数据写入吞吐量和性能，还可降低数据写入成本。这是因为在写入请求中批量处理更多数据，使适用于 LiveAnalytics 的 Amazon Timestream 能够在单次写入请求中识别更多可重复数据（如适用），并对重复数据仅收取一次费用。

**Topics**
+ [多度量记录](#writes.writing-data-multi-measure)
+ [使用过去或将来的时间戳写入数据](#writes.timestamp-past-future)

### 多度量记录
<a name="writes.writing-data-multi-measure"></a>

借助多度量记录，您可以在内存存储和磁性存储器中以更紧凑的格式存储时间序列数据，这有助于降低数据存储成本。此外，紧凑的数据存储有助于写入更简单的查询，以进行数据检索，提升查询性能并降低查询成本。

此外，多度量记录还支持 TIMESTAMP 数据类型，可用于在时间序列记录中存储多个时间戳。多度量记录中，TIMESTAMP 属性支持未来或过去的时间戳。因此，多度量记录有助于提升性能、降低成本并简化查询，同时为存储不同类型的关联度量提供更大的灵活性。

**优势**

使用多度量记录有以下优势。
+ **性能和成本**：多度量记录使您能够在单次写入请求中写入多个时间序列指标。这可提高写入吞吐量，还可降低写入成本。借助多度量记录，您能够以更紧凑的方式存储数据，这有助于降低数据存储成本。多度量记录的紧凑数据存储方式使得查询处理的数据量减少。这旨在提升整体查询性能，并帮助降低查询成本。
+ **查询简单性**-使用多度量记录，您无需在查询中编写复杂的公用表表达式 (CTEs) 即可读取具有相同时间戳的多个度量。这是因为这些度量以列的形式存储在单个表行中。因此，多度量记录可写入更简单的查询。
+ **数据建模灵活性**：您可使用 TIMESTAMP 数据类型和多度量记录，将未来时间戳写入适用于 LiveAnalytics 的 Timestream。除记录中的时间字段以外，多度量记录还可以包含多个 TIMESTAMP 数据类型的属性。在多度量记录中，TIMESTAMP 属性可包含未来或过去的时间戳，其行为类似于时间字段，唯一的不同在于：适用于 LiveAnalytics 的 Timestream 不会对多度量记录中 TIMESTAMP 类型的值进行索引。

**使用案例**

对于任何在给定时间从同一设备生成多个测量值的时间序列应用程序，您都可以使用多度量记录。以下是一些示例应用程序。
+ 视频流媒体平台，可在给定时间生成数百个指标。
+ 医疗设备，可生成血氧水平、心率和脉搏等测量值。
+ 工业设备，例如石油钻井平台，可生成指标、温度和天气传感器数值。
+ 其他应用程序，使用一个或多个微服务进行架构。

#### 示例：监控视频流媒体应用程序的性能和运行状况
<a name="writes.writing-data-multi-measure-example1"></a>

假设在 200 个 EC2 实例上运行的视频流媒体应用程序。您希望使用适用于 LiveAnalytics 的 Amazon Timestream 存储和分析应用程序发出的指标，以便了解应用程序的性能和运行状况，快速识别异常情况，解决问题并发现优化机会。

我们将使用单测量记录和多度量记录对这种场景进行建模，然后使用 compare/contrast 两种方法进行建模。针对每种方法，我们作出以下假设。
+ 每个 EC2 实例每秒发出四个度量（video\$1startup\$1time、rebuffering\$1ratio、video\$1playback\$1failures 和 average\$1frame\$1rate）和四个维度（device\$1id、device\$1type、os\$1version 和 region）。
+ 您希望将 6 小时的数据存储在内存存储中，并将 6 个月的数据存储在磁性存储中。
+ 为识别异常情况，您已设置 10 次查询，每分钟运行一次，以识别过去几分钟内出现的任何异常活动。您还已构建包含八个小部件的控制面板，用于显示过去 6 小时的数据，从而有效监控应用程序。此控制面板可在任何给定时间接受五名用户的访问，且每小时自动刷新一次。

##### 使用单度量记录
<a name="writes.writing-data-multi-measure-example1-sm"></a>

**数据建模**：使用单度量记录，我们将针对四个度量值（视频启动时间、重新缓冲比率、视频播放失败和平均帧速率）分别创建一条记录。每条记录将包含四个维度（device\$1id、device\$1type、os\$1version 和 region）和一个时间戳。

**写入**：将数据写入适用于 LiveAnalytics 的 Amazon Timestream 时，记录的构造方式如下。

```
public void writeRecords() {
    System.out.println("Writing records");
    // Specify repeated values for all records
    List<Record> records = new ArrayList<>();
    final long time = System.currentTimeMillis();
 
    List<Dimension> dimensions = new ArrayList<>();
    
    final Dimension device_id = new Dimension().withName("device_id").withValue("12345678");
    final Dimension device_type = new Dimension().withName("device_type").withValue("iPhone 11");
    final Dimension os_version = new Dimension().withName("os_version").withValue("14.8");
    final Dimension region = new Dimension().withName("region").withValue("us-east-1");
 
    dimensions.add(device_id);
    dimensions.add(device_type);
    dimensions.add(os_version);
    dimensions.add(region);
 
    Record videoStartupTime = new Record()
        .withDimensions(dimensions)
        .withMeasureName("video_startup_time")
        .withMeasureValue("200")
        .withMeasureValueType(MeasureValueType.BIGINT)
        .withTime(String.valueOf(time));
    Record rebufferingRatio = new Record()
        .withDimensions(dimensions)
        .withMeasureName("rebuffering_ratio")
        .withMeasureValue("0.5")
        .withMeasureValueType(MeasureValueType.DOUBLE)
        .withTime(String.valueOf(time));
    Record videoPlaybackFailures = new Record()
        .withDimensions(dimensions)
        .withMeasureName("video_playback_failures")
        .withMeasureValue("0")
        .withMeasureValueType(MeasureValueType.BIGINT)
        .withTime(String.valueOf(time));
    Record averageFrameRate = new Record()
        .withDimensions(dimensions)
        .withMeasureName("average_frame_rate")
        .withMeasureValue("0.5")
        .withMeasureValueType(MeasureValueType.DOUBLE)
        .withTime(String.valueOf(time));

    records.add(videoStartupTime);
    records.add(rebufferingRatio); 
    records.add(videoPlaybackFailures);
    records.add(averageFrameRate);
 
    WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest()
        .withDatabaseName(DATABASE_NAME)
        .withTableName(TABLE_NAME)
        .withRecords(records);
 
    try {
      WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest);
      System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode());
    } catch (RejectedRecordsException e) {
      System.out.println("RejectedRecords: " + e);
      for (RejectedRecord rejectedRecord : e.getRejectedRecords()) {
        System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": "
            + rejectedRecord.getReason());
      }
      System.out.println("Other records were written successfully. ");
    } catch (Exception e) {
      System.out.println("Error: " + e);
    }
  }
```

存储单度量记录时，数据的逻辑表示方式如下。


| 时间 | device\$1id | device\$1type | os\$1version | region | measure\$1name | measure\$1value::bigint | measure\$1value::double | 
| --- | --- | --- | --- | --- | --- | --- | --- | 
|  2021-09-07 21:48:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  video\$1startup\$1time  |  200  |    | 
|  2021-09-07 21:48:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  rebuffering\$1ratio  |    |  0.5  | 
|  2021-09-07 21:48:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  video\$1playback\$1failures  |  0  |    | 
|  2021-09-07 21:48:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  average\$1frame\$1rate  |    |  0.85  | 
|  2021-09-07 21:53:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  video\$1startup\$1time  |  500  |    | 
|  2021-09-07 21:53:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  rebuffering\$1ratio  |    |  1.5  | 
|  2021-09-07 21:53:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  video\$1playback\$1failures  |  10  |    | 
|  2021-09-07 21:53:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  average\$1frame\$1rate  |    |  0.2  | 

**查询**：您可以写入查询，以检索过去 15 分钟内收到的所有具有相同时间戳的数据点，如下所示。

```
with cte_video_startup_time as ( SELECT time, device_id, device_type, os_version, region, measure_value::bigint as video_startup_time FROM table where time >= ago(15m) and measure_name=”video_startup_time”),
cte_rebuffering_ratio as ( SELECT time, device_id, device_type, os_version, region, measure_value::double as rebuffering_ratio FROM table where time >= ago(15m) and measure_name=”rebuffering_ratio”),
cte_video_playback_failures as ( SELECT time, device_id, device_type, os_version, region, measure_value::bigint as video_playback_failures FROM table where time >= ago(15m) and measure_name=”video_playback_failures”),
cte_average_frame_rate as ( SELECT time, device_id, device_type, os_version, region, measure_value::double as average_frame_rate FROM table where time >= ago(15m) and measure_name=”average_frame_rate”)
SELECT a.time, a.device_id, a.os_version, a.region, a.video_startup_time, b.rebuffering_ratio, c.video_playback_failures, d.average_frame_rate FROM cte_video_startup_time a, cte_buffering_ratio b, cte_video_playback_failures c, cte_average_frame_rate d WHERE
a.time = b.time AND a.device_id = b.device_id AND a.os_version = b.os_version AND a.region=b.region AND
a.time = c.time AND a.device_id = c.device_id AND a.os_version = c.os_version AND a.region=c.region AND
a.time = d.time AND a.device_id = d.device_id AND a.os_version = d.os_version AND a.region=d.region
```

**工作负载成本**：该工作负载的成本估算为每月 373.23 美元（基于单度量记录）。

##### 使用多度量记录
<a name="writes.writing-data-multi-measure-example1-mm"></a>

**数据建模**：对于多度量记录，我们将创建一条记录，包含所有四个度量（视频启动时间、重新缓冲比率、视频播放失败率和平均帧速率）、所有四个维度（device\$1id、deversion\$1type、os\$1version 和 region）以及一个时间戳。

**写入**：将数据写入适用于 LiveAnalytics 的 Amazon Timestream 时，记录的构造方式如下。

```
public void writeRecords() {
    System.out.println("Writing records");
    // Specify repeated values for all records
    List<Record> records = new ArrayList<>();
    final long time = System.currentTimeMillis();
 
    List<Dimension> dimensions = new ArrayList<>();
    
    final Dimension device_id = new Dimension().withName("device_id").withValue("12345678");
    final Dimension device_type = new Dimension().withName("device_type").withValue("iPhone 11");
    final Dimension os_version = new Dimension().withName("os_version").withValue("14.8");
    final Dimension region = new Dimension().withName("region").withValue("us-east-1");
 
    dimensions.add(device_id);
    dimensions.add(device_type);
    dimensions.add(os_version);
    dimensions.add(region);
 
    Record videoMetrics = new Record()
        .withDimensions(dimensions)
        .withMeasureName("video_metrics")
        .withTime(String.valueOf(time));
        .withMeasureValueType(MeasureValueType.MULTI)
        .withMeasureValues(
          new MeasureValue()
        	.withName("video_startup_time")
        	.withValue("0")
        	.withValueType(MeasureValueType.BIGINT),
        	new MeasureValue()
		.withName("rebuffering_ratio")
        	.withValue("0.5")
        	.withType(MeasureValueType.DOUBLE),
          new MeasureValue()
        	.withName("video_playback_failures")
        	.withValue("0")
        	.withValueType(MeasureValueType.BIGINT),
		 new MeasureValue()
         	.withName("average_frame_rate")
        	.withValue("0.5")
        	.withValueType(MeasureValueType.DOUBLE))
 
    records.add(videoMetrics);
 
    WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest()
        .withDatabaseName(DATABASE_NAME)
        .withTableName(TABLE_NAME)
        .withRecords(records);
 
    try {
      WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest);
      System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode());
    } catch (RejectedRecordsException e) {
      System.out.println("RejectedRecords: " + e);
      for (RejectedRecord rejectedRecord : e.getRejectedRecords()) {
        System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": "
            + rejectedRecord.getReason());
      }
      System.out.println("Other records were written successfully. ");
    } catch (Exception e) {
      System.out.println("Error: " + e);
    }
  }
```

存储多度量记录时，数据的逻辑表示方式如下。


| 时间 | device\$1id | device\$1type | os\$1version | region | measure\$1name | video\$1startup\$1time | rebuffering\$1ratio | video\$1 playback\$1failures | average\$1frame\$1rate | 
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | 
|  2021-09-07 21:48:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  video\$1metrics  |  200  |  0.5  |  0  |  0.85  | 
|  2021-09-07 21:53:44 .000000000  |  12345678  |  iPhone 11  |  14.8  |  us-east-1  |  video\$1metrics  |  500  |  1.5  |  10  |  0.2  | 

**查询**：您可以写入查询，以检索过去 15 分钟内收到的所有具有相同时间戳的数据点，如下所示。

```
SELECT time, device_id, device_type, os_version, region, video_startup_time, rebuffering_ratio, video_playback_failures, average_frame_rate FROM table where time >= ago(15m)
```

**工作负载成本**：该工作负载的成本估算为 127.43 美元（基于多度量记录）。

**注意**  
在此情况下，使用多度量记录可使整体预估月支出降低 2.5 倍，其中数据写入成本降低 3.3 倍，存储成本降低 3.3 倍，查询成本降低 1.2 倍。

### 使用过去或将来的时间戳写入数据
<a name="writes.timestamp-past-future"></a>

适用于 LiveAnalytics 的 Timestream 通过多种机制，支持将带有超出内存存储保留窗口时间戳的数据写入系统。
+ **磁性存储写入**：您可以通过磁性存储写入，将延迟到达的数据直接写入磁性存储。要使用磁性存储写入，必须先为表启用磁性存储写入。随后，您可以使用与将数据写入内存存储相同的机制将数据摄取到表中。适用于 LiveAnalytics 的 Amazon Timestream 将根据其时间戳自动将数据写入磁性存储。
**注意**  
磁存储的 write-to-read延迟可能长达 6 小时，这与将数据写入内存存储不同，在内存存储中， write-to-read延迟在亚秒范围内。
+ **度量的 TIMESTAMP 数据类型**：您可以使用 TIMESTAMP 数据类型存储过去、现在或未来的数据。除记录中的时间字段以外，多度量记录还可以包含多个 TIMESTAMP 数据类型的属性。在多度量记录中，TIMESTAMP 属性可包含未来或过去的时间戳，其行为类似于时间字段，唯一的不同在于：适用于 LiveAnalytics 的 Timestream 不会对多度量记录中 TIMESTAMP 类型的值进行索引。
**注意**  
仅多度量记录支持 TIMESTAMP 数据类型。

## 读取的最终一致性
<a name="writes.eventual-consistency"></a>

适用于 LiveAnalytics 的 Timestream 支持读取的最终一致性语义。这意味着，如果您将批量数据写入适用于 LiveAnalytics 的 Timestream 后立即查询数据，查询结果可能不会反映最近完成的写入操作结果。如果您在短时间后重复这些读取请求，结果将返回最新的数据。