要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
处理延迟到达的数据
您可能会遇到数据严重延迟到达的情况,例如,与摄取的行关联的时间戳相比,数据被摄取到适用于 LiveAnalytics 的 Timestream 的时间存在明显延迟。在之前的示例中,您已经了解如何使用 @scheduled_runtime 参数定义的时间范围来处理部分延迟到达的数据。然而,如果存在数据可能延迟数小时或数天的使用案例,则需要采用不同的模式,以确保派生表中的预先计算能及时更新,从而准确反映这些延迟到达的数据。有关延迟到达数据的一般信息,请参阅 写入数据(插入和更新插入)。
下面将介绍两种处理延迟到达数据的不同方法。
-
如果数据到达存在可预测的延迟,则可使用另一项“追赶”计划计算来更新聚合,以处理延迟到达的数据。
-
如果遇到不可预测的延迟偶发性延迟到达的数据,可通过手动执行来更新派生表。
本讨论涵盖数据延迟到达的各种情况。然而,同样的原则也适用于数据更正,即您修改了源表中的数据,并希望更新派生表中的聚合。
计划追赶查询
查询及时到达的聚合数据
以下模式介绍如何在数据到达存在可预测延迟的情况下,通过自动化方式更新聚合数据。以下是先前对实时数据进行计划计算的示例。此计划计算每 30 分钟刷新一次派生表,并已考虑最多延迟一小时的数据。
{ "Name": "MultiPT30mPerHrPerTimeseriesDPCount", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 1h AND @scheduled_runtime + 1h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0/30 * * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
追赶查询为延迟到达的数据更新聚合
现在,如果考虑数据可能延迟约 12 小时的情况。以下是同一查询的变体。但是,不同之处在于,该计算基于延迟长达 12 小时的数据进行聚合,相较于计划计算触发时所用的数据存在时间差。例如,在以下示例查询中,查询的目标时间范围是触发查询前 2 小时至 14 小时之间。此外,如果您注意到计划表达式 ccron(0 0,12 * * ? *),其每天在 UTC 00:00 和 UTC 12:00 触发计算。因此,当查询在 2021-12-01 00:00:00 触发时,查询会更新 2021-11-30 10:00:00 到 2021-11-30 22:00:00 范围内的聚合。计划查询采用类似于适用于 LiveAnalytics 的 Timestream 写入操作的更新插入语义,当时间窗口内存在延迟到达的数据,或发现新的聚合值(例如,在原始计划计算触发时尚未存在的新分组出现在该聚合中)时,此追赶查询将用最新值更新聚合值。此时,新聚合值将被插入到派生表中。同样,如果下一个实例在 2021-12-01 12:00:00 触发,该实例将更新 2021-11-30 22:00:00 到 2021-12-01 10:00:00 范围内的聚合。
{ "Name": "MultiPT12HPerHrPerTimeseriesDPCountCatchUp", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 14h AND bin(@scheduled_runtime, 1h) - 2h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0 0,12 * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
上述示例假设延迟到达时间不超过 12 小时,且对于超出实时窗口的数据,可以每 12 小时更新一次衍生表。您可以调整此模式,使派生表每小时更新一次,从而让派生表更快地反映延迟到达的数据。同样,可将时间范围调整为超过 12 小时,例如一天甚至一周或更长时间,以处理可预测的延迟到达数据。
手动执行不可预测的延迟到达数据
在某些情况下,可能会出现不可预测的延迟到达,或者您对源数据进行了更改并在事后更新了一些值。所有这些情况下,您都可以手动触发计划查询以更新派生表。以下是如何实现这一目标的示例。
假设使用案例是将计算写入派生表 dp_per_timeseries_per_hr。您在 devops 表中的基础数据已在 2021-11-30 23:00:00 到 2021-12-01 00:00:00 的时间范围内进行更新。有两种不同的计划查询可用于更新此派生表:MultiPT30mPerHrPerTimeseriesDPCount 和 MultiPT12HPerHrPerTimeseriesDPCountCatchUp。您在适用于 LiveAnalytics 的 Timestream 中创建的每个计划计算都有唯一的 ARN,这可在创建计算时或执行列表操作时获取。您可以使用计算的 ARN 以及查询中 @scheduled_runtime 参数的值来执行此操作。
假设 MultiPT30mPerHrPerTimeseriesDPCount 计算的 ARN 为 arn_1,而且您希望使用该计算来更新派生表。由于前面的计划计算会更新 @scheduled_runtime 值前 1 小时和后 1 小时的聚合数据,因此您可以使用 2021-12-01 00:00:00 的 @scheduled_runtime 参数值,覆盖更新的时间范围(2021-11-30 23:00:00 到 2021-12-01 00:00:00) 您可以使用 ExecuteScheduledQuery API,传递此计算的 ARN 及以 epoch 秒为单位的时间参数值(以 UTC 表示),从而实现此功能。以下是使用 Amazon CLI 的示例,您也可以使用适用于 LiveAnalytics 的 Timestream 支持的任何 SDK 遵循相同模式。
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1
在前面的示例中,配置文件是具有执行此 API 调用所需权限的 Amazon 配置文件,1638316800 对应于 2021-12-01 00:00:00 的 epoch 秒。假设系统在所需时间段触发了此调用,则此手动触发器的工作方式几乎与自动触发器相同。
如果更新周期较长,例如基础数据更新时间为2021-11-30 23:00:00 到 2021-12-01 11:00:00,则可多次触发前置查询以覆盖整个时间范围。例如,您可以执行以下六种不同的操作。
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638324000 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638331200 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638338400 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638345600 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638352800 --profile profile --region us-east-1
前六个命令对应于在 2021-12-01 00:00:00、2021-12-01 02:00:00、2021-12-01 04:0:00、2021-12-01 06:00:00、2021-12-01 08:00:00、2021-12-01 10:00 调用的计划计算:
或者,您可以使用在 2021-12-01 13:00:00 触发的计算 MultiPT12HPerHrPerTimeseriesDPCountCatchUp 执行一次,以此更新整个 12 小时时间范围内的聚合。例如,如果该计算的 ARN 为 arn_2,则可通过 CLI 执行以下命令。
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_2 --invocation-time 1638363600 --profile profile --region us-east-1
值得注意的是,对于手动触发器,您可以为调用时间参数使用时间戳,该时间戳无需与自动触发器的时间戳保持一致。例如,在前面的示例中,您在时间戳 2021-12-01 13:00:00 触发了计算,尽管自动计划仅在 2021-12-01 10:00:00、2021-12-01 12:00:00 以及 2021-12-02 00:00:00 这三个时间戳触发。适用于 LiveAnalytics 的 Timestream 让您能够灵活地根据手动操作需求,使用合适的值触发该功能。
以下是使用 ExecuteScheduledQuery API 时的一些重要注意事项。
-
如果您要触发多个此类调用,则需确保这些调用不会在重叠的时间范围内生成结果。例如,在前面的示例中,共有六次调用。每次调用都覆盖 2 小时的时间范围,因此调用时间戳间隔设置为每两小时一次,以避免更新内容产生任何重叠。这可确保派生表中的数据最终状态与源表中的聚合相匹配。如果无法确保时间范围不重叠,请确保这些执行按顺序依次触发。如果您同时触发多个执行且其时间范围存在重叠,则可能观察到触发器失败,在这些执行的错误报告中,您可能会看到版本冲突的提示。计划查询调用生成的结果会根据调用触发的时间分配版本号。因此,由较新调用生成的行具有更高的版本号。较高版本的记录可覆盖较低版本的记录。对于自动触发的计划查询,适用于 LiveAnalytics 的 Timestream 会自动管理计划,因此即使后续调用的时间范围存在重叠,您也不会遇到这些问题。
-
如前所述,您可以使用 @scheduled_runtime 的任何时间戳值触发调用。因此,您有责任正确设置这些值,以便派生表中与源表中数据更新范围相对应的时间范围得到相应更新。
-
您也可以使用这些手动触发器以处理处于 DISABLED 状态的计划查询。这使您能够定义特殊查询,这些查询不会在自动化计划中执行,因为其处于 DISABLED 状态。相反,您可以使用这些查询的手动触发器,以管理数据更正或数据延迟到达的使用案例。