

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.amazonaws.cn//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 每个设备的最后一点
<a name="scheduledqueries-patterns-lastpointfromdevice"></a>

应用程序可能需要读取设备发出的最后一次测量值。可能有更一般的用例来获取给定日期/时间之后的设备的最后一次测量值， date/time 或者在给定日期/时间之后获取设备的第一次测量值。当涉及数百万台设备和多年积累的数据时，此搜索可能需要对海量数据进行扫描。

以下示例说明如何使用计划查询以优化搜索设备发出的最后一个数据点。如果应用程序需要，也可以使用相同的模式以优化第一个点查询。

**Topics**
+ [根据源表计算](#scheduledqueries-patterns-lastpointfromdevice-computedfromsrctable)
+ [按每日粒度预先计算的派生表](#scheduledqueries-patterns-lastpointfromdevice-derivedttabletoprecompute)
+ [根据派生表计算](#scheduledqueries-patterns-lastpointfromdevice-computedfromderivedtable)
+ [合并源表和派生表](#scheduledqueries-patterns-lastpointfromdevice-combinesourceandderived)

## 根据源表计算
<a name="scheduledqueries-patterns-lastpointfromdevice-computedfromsrctable"></a>

以下是一个查询示例，用于查找特定部署中服务发出的最后一次测量值（例如，在给定区域、单元格、筒仓和可用区内某个微服务的特定服务器）。在示例应用程序中，此查询将返回数百台服务器的最近一次测量值。另请注意，该查询包含无界时间谓词，用于查找所有早于指定时间戳的数据。

**注意**  
有关 `max` 和 `max_by` 函数的信息，请参阅[聚合函数](aggregate-functions.md)。

```
SELECT instance_name, MAX(time) AS time, MAX_BY(gc_pause, time) AS last_measure
FROM "raw_data"."devops"
WHERE time < from_milliseconds(1636685271872)
    AND measure_name = 'events'
    AND region = 'us-east-1'
    AND cell = 'us-east-1-cell-10'
    AND silo = 'us-east-1-cell-10-silo-3'
    AND availability_zone = 'us-east-1-1'
    AND microservice_name = 'hercules'
GROUP BY region, cell, silo, availability_zone, microservice_name,
    instance_name, process_name, jdk_version
ORDER BY instance_name, time DESC
```

## 按每日粒度预先计算的派生表
<a name="scheduledqueries-patterns-lastpointfromdevice-derivedttabletoprecompute"></a>

您可以将上述使用案例转换为计划计算。如果应用程序要求需要获取整个实例集在多个区域、单元、筒仓、可用区和微服务中的这些值，您可以使用单次计划计算来预先计算整个实例集的值。这就是 Timestream LiveAnalytics 的无服务器定时查询的强大功能，它允许这些查询根据应用程序的扩展要求进行扩展。

以下是一个查询，用于预先计算给定日期所有服务器上的最后一点。请注意，该查询仅包含时间谓词，而未包含维度谓词。时间谓词将查询范围限定在基于指定计划表达式触发计算时点的过去一天内。

```
SELECT region, cell, silo, availability_zone, microservice_name, 
    instance_name, process_name, jdk_version, 
    MAX(time) AS time, MAX_BY(gc_pause, time) AS last_measure 
FROM raw_data.devops 
WHERE time BETWEEN bin(@scheduled_runtime, 1d) - 1d AND bin(@scheduled_runtime, 1d) 
    AND measure_name = 'events' 
GROUP BY region, cell, silo, availability_zone, microservice_name, 
    instance_name, process_name, jdk_version
```

以下是基于前述查询的计划计算配置，该配置每天 UTC 01:00 执行该查询，以计算过去一天的聚合结果。计划表达式 cron(0 1 \$1 \$1 ? \$1) 控制此行为，并在当天结束一小时后运行，以处理可能延迟一天到达的数据。

```
{
    "Name": "PT1DPerInstanceLastpoint",
    "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_name, process_name, jdk_version, MAX(time) AS time, MAX_BY(gc_pause, time) AS last_measure FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1d) - 1d AND bin(@scheduled_runtime, 1d) AND measure_name = 'events' GROUP BY region, cell, silo, availability_zone, microservice_name, instance_name, process_name, jdk_version",
    "ScheduleConfiguration": {
        "ScheduleExpression": "cron(0 1 * * ? *)"
    },
    "NotificationConfiguration": {
        "SnsConfiguration": {
            "TopicArn": "******"
        }
    },
    "TargetConfiguration": {
        "TimestreamConfiguration": {
            "DatabaseName": "derived",
            "TableName": "per_timeseries_lastpoint_pt1d",
            "TimeColumn": "time",
            "DimensionMappings": [
                {
                    "Name": "region",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "cell",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "silo",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "availability_zone",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "microservice_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "instance_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "process_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "jdk_version",
                    "DimensionValueType": "VARCHAR"
                }
            ],
            "MultiMeasureMappings": {
                "TargetMultiMeasureName": "last_measure",
                "MultiMeasureAttributeMappings": [
                    {
                        "SourceColumn": "last_measure",
                        "MeasureValueType": "DOUBLE"
                    }
                ]
            }
        }
    },
    "ErrorReportConfiguration": {
        "S3Configuration" : {
            "BucketName" : "******",
            "ObjectKeyPrefix": "errors",
            "EncryptionOption": "SSE_S3"
        }
    },
    "ScheduledQueryExecutionRoleArn": "******"
}
```

## 根据派生表计算
<a name="scheduledqueries-patterns-lastpointfromdevice-computedfromderivedtable"></a>

当您使用上述配置定义派生表，且至少有一个计划查询实例已将数据具体化到该派生表后，即可查询该派生表以获取最新测量值。以下是在派生表上执行的示例查询。

```
SELECT instance_name, MAX(time) AS time, MAX_BY(last_measure, time) AS last_measure
FROM "derived"."per_timeseries_lastpoint_pt1d"
WHERE time < from_milliseconds(1636746715649)
    AND measure_name = 'last_measure'
    AND region = 'us-east-1'
    AND cell = 'us-east-1-cell-10'
    AND silo = 'us-east-1-cell-10-silo-3'
    AND availability_zone = 'us-east-1-1'
    AND microservice_name = 'hercules'
GROUP BY region, cell, silo, availability_zone, microservice_name,
    instance_name, process_name, jdk_version
ORDER BY instance_name, time DESC
```

## 合并源表和派生表
<a name="scheduledqueries-patterns-lastpointfromdevice-combinesourceandderived"></a>

与前例类似，派生表中的任何数据都不会包含最近的写入记录。因此，您可以再次采用与之前类似的模式，将衍生表中的数据与较早的数据合并，并使用源数据处理剩余的部分。以下是使用类似 UNION 方法的查询示例。由于应用程序要求是查找某个时间段内的最新测量值，且起始时间可能位于过去，因此查询的写入方式是：使用指定时间点，从该时间点起向后最多一天的源数据，然后对更早的数据应用衍生表。如以下查询示例所示，源数据上的时间谓词具有边界限制。这确保对数据量明显更高的源表进行高效处理，然后在派生表上使用无限时间谓词。

```
WITH last_point_derived AS (
    SELECT instance_name, MAX(time) AS time, MAX_BY(last_measure, time) AS last_measure
    FROM "derived"."per_timeseries_lastpoint_pt1d"
    WHERE time < from_milliseconds(1636746715649)
        AND measure_name = 'last_measure'
        AND region = 'us-east-1'
        AND cell = 'us-east-1-cell-10'
        AND silo = 'us-east-1-cell-10-silo-3'
        AND availability_zone = 'us-east-1-1'
        AND microservice_name = 'hercules'
    GROUP BY region, cell, silo, availability_zone, microservice_name,
        instance_name, process_name, jdk_version
), last_point_source AS (
    SELECT instance_name, MAX(time) AS time, MAX_BY(gc_pause, time) AS last_measure
    FROM "raw_data"."devops"
    WHERE time < from_milliseconds(1636746715649) AND time > from_milliseconds(1636746715649) - 26h
        AND measure_name = 'events'
        AND region = 'us-east-1'
        AND cell = 'us-east-1-cell-10'
        AND silo = 'us-east-1-cell-10-silo-3'
        AND availability_zone = 'us-east-1-1'
        AND microservice_name = 'hercules'
    GROUP BY region, cell, silo, availability_zone, microservice_name,
        instance_name, process_name, jdk_version
)
SELECT instance_name, MAX(time) AS time, MAX_BY(last_measure, time) AS last_measure
FROM (
    SELECT * FROM last_point_derived
    UNION
    SELECT * FROM last_point_source
)
GROUP BY instance_name
ORDER BY instance_name, time DESC
```

上述内容仅是构造派生表的一种示例。如果您已积累多年的数据，则可以使用更多级别的聚合。例如，您可以在每日汇总数据之上建立月度汇总数据，还可以在每日汇总之前建立每小时汇总数据。因此，您可以合并最近的数据填充过去一小时，合并每小时数据填充过去一天，合并每日数据填充过去一个月，以及合并每月数据填充更久远的时间段。您设置的层级数量与刷新计划将取决于以下要求：这些查询的执行频率以及同时执行这些查询的用户数量。