处理迟到的数据 - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

从2025年6月20日起,亚马逊Timestream版 LiveAnalytics 将不再向新客户开放。如果您想使用亚马逊 Timestream LiveAnalytics,请在该日期之前注册。现有客户可以继续照常使用该服务。有关更多信息,请参阅 Amazon Timestream 以了解 LiveAnalytics 可用性变更。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

处理迟到的数据

在某些情况下,您的数据可能会延迟很长时间到达,例如,与摄取的行关联的时间戳相比,将数据提取到 Tim LiveAnalytics estream 的时间会明显延迟。在前面的示例中,您已经了解了如何使用 @scheduled_runtime 参数定义的时间范围来解释一些延迟到达的数据。但是,如果您的用例中数据可能会延迟数小时或数天,则可能需要不同的模式来确保派生表中的预计算得到适当更新,以反映此类迟到的数据。有关延迟到达的数据的一般信息,请参阅写入数据(插入和向上移动)

在下文中,您将看到两种不同的方法来处理延迟到达的数据。

  • 如果您的数据到达延迟是可预测的,则可以使用另一个 “追赶” 计划计算来更新聚合,以适应延迟到达的数据。

  • 如果您有不可预测的延迟或偶尔会有延迟到达,则可以使用手动执行来更新派生表。

本讨论涵盖数据延迟到达的情况。但是,同样的原则也适用于数据校正,即您修改了源表中的数据,并且想要更新派生表中的聚合。

定时追赶查询

查询及时到达的聚合数据

以下是一个模式,您将看到在数据到达延迟可预测的情况下,如何使用自动方式更新聚合。以下是前面对实时数据进行定时计算的示例。此计划计算每 30 分钟刷新一次派生表,并且已经考虑了延迟长达一个小时的数据。

{ "Name": "MultiPT30mPerHrPerTimeseriesDPCount", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 1h AND @scheduled_runtime + 1h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0/30 * * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }

Catch-up 查询为迟到的数据更新聚合

现在,如果您考虑一下您的数据可能会延迟大约 12 个小时的情况。以下是同一查询的变体。但是,不同之处在于,与触发计划计算时相比,它会根据延迟最多 12 小时的数据计算聚合。例如,您将在下面的示例中看到查询,此查询的目标时间范围在触发查询之前的 2 小时到 14 小时之间。此外,如果你注意到调度表达式 cron (0 0,12 * *? *),它将在世界标准时间 00:00 和世界标准时间 12:00 触发计算。因此,当查询在 2021-12-01 00:00:00:00 触发时,查询会在 2021-11-30 10:00:00:00 到 2021-11-30 22:00:00:00 之间更新聚合。定时查询使用类似于 Timestream 写入 LiveAnalytics的 upsert 语义,如果窗口中有延迟到达的数据或者发现了较新的聚合(例如,此聚合中出现了一个新的分组,而该分组在触发原始计划计算时并不存在),则此追赶查询将使用较新的值更新聚合值,然后将新的聚合插入到派生表中。同样,当下一个实例在 2021-12-01 12:00:00:00 触发时,该实例将更新 2021-11-30 22:00:00 到 2021-12-01 10:00:00:00 范围内的聚合。

{ "Name": "MultiPT12HPerHrPerTimeseriesDPCountCatchUp", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 14h AND bin(@scheduled_runtime, 1h) - 2h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0 0,12 * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }

前面的示例假设您的延迟到达时间限制为 12 小时,并且对于晚于实时窗口的数据,可以每 12 小时更新一次派生表。您可以调整此模式,每小时更新一次派生表,这样您的派生表就可以更快地反映延迟到达的数据。同样,您可以将时间范围调整为超过 12 小时(例如,一天甚至一周或更长时间),以处理可预测的延迟数据。

手动执行不可预测的延迟到达数据

在某些情况下,您可能会有不可预测的延迟到达,或者您对源数据进行了更改并在事后更新了一些值。在所有这些情况下,您都可以手动触发计划查询以更新派生表。以下是如何实现这一目标的示例。

假设你的用例是将计算写入派生表 dp_per_timeseries_per_per_hr。你在 devops 表中的基础数据已在 2021-11-30 23:00:00-2021-12-01 00:00:00:00 的时间范围内更新。有两种不同的计划查询可用于更新此派生表:Multi PT3 0 mPerHr PerTimeseries DPCount 和 Multi PT12 HPer HrPerTimeseries DPCount CatchUp。您在 Timestream 中为其创建的每个计划计算都 LiveAnalytics 有一个唯一的 ARN,您可以在创建计算或执行列表操作时获得该值。您可以使用 ARN 进行计算,并使用查询获取的参数 @scheduled_runtime 的值来执行此操作。

假设多 PT3 0 的计算mPerHrPerTimeseriesDPCount 具有 ARN arn_1,并且您想使用此计算来更新派生表。由于前面的计划计算会在 @scheduled_runtime 值之前 1 小时和之后 1 小时更新聚合,因此您可以使用 2021-12-01 00:00:00:00 作为 @scheduled_runtime 参数来覆盖更新的时间范围(2021-11-30 23:00:00-2021-12-01 00:00:00)。您可以使用 ExecuteScheduledQuery API 传递此计算的 ARN 和以纪元秒(UTC)为单位的时间参数值,以实现此目的。以下是使用 Amazon CLI 的示例,你可以使用 Timestream SDKs 支持的任何一种来遵循相同的模式。 LiveAnalytics

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1

在前面的示例中,配置文件是具有相应权限进行此 API 调用的 Amazon 配置文件,1638316800 对应于 2021-12-01 00:00:00:00 的时代秒。假设系统在所需时间段触发了此次调用,则此手动触发器的行为几乎类似于自动触发器。

如果你在更长的时间段内进行了更新,比如说 2021-11-30 23:00:00-2021-12-01 11:00:00:00 的基础数据已更新,那么你可以多次触发前面的查询以覆盖整个时间范围。例如,你可以执行六种不同的执行方式,如下所示。

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638324000 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638331200 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638338400 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638345600 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638352800 --profile profile --region us-east-1

前六个命令对应于 2021-12-01 00:00:00、2021-12-01 02:00:00:00、2021-12-01 04:0:00、2021-12-01 06:00:00、2021-12-01 06:00:00、2021-12-01 08:00:00 和 2021-12-01 10:00 调用的预定计算:

或者,您可以使用在 2021-12-01 13:00:00 PT12 HPer HrPerTimeseries DPCount CatchUp 触发的计算单次执行来更新整个 12 小时时间范围内的聚合。例如,如果 arn_2 是该计算的 ARN,则可以从 CLI 中执行以下命令。

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_2 --invocation-time 1638363600 --profile profile --region us-east-1

值得注意的是,对于手动触发器,您可以为调用时间参数使用无需与该自动触发器时间戳对齐的时间戳。例如,在前面的示例中,尽管自动计划仅在时间戳2021-12-01 10:00:00、2021-12-01 12:00:00:00 和 2021-12-01 12:00:00:00 和 2021-12-02 00:00:00 触发计算。Timestream for 使您可以 LiveAnalytics 灵活地根据手动操作的需要使用适当的值来触发它。

以下是使用 ExecuteScheduledQuery API 时的一些重要注意事项。

  • 如果您要触发多个这样的调用,则需要确保这些调用不会在重叠的时间范围内生成结果。例如,在前面的示例中,有六次调用。每次调用都覆盖 2 小时的时间范围,因此,为了避免更新中出现任何重叠,每个调用时间戳都分散了两个小时。这样可以确保派生表中的数据最终处于与源表中的聚合相匹配的状态。如果您无法确保时间范围不重叠,请确保这些执行依次触发。如果您同时触发多个执行的时间范围重叠,则可以看到触发失败,您可能会在这些执行的错误报告中看到版本冲突。将根据调用的触发时间为定时查询调用生成的结果分配一个版本。因此,较新的调用生成的行具有更高的版本。较高版本的记录可以覆盖较低版本的记录。对于自动触发的计划查询,Timestream fo LiveAnalytics r 会自动管理计划,这样即使后续调用的时间范围重叠,您也不会看到这些问题。

  • 前面已经提到,你可以使用 @scheduled_runtime 的任何时间戳值来触发调用。因此,您有责任适当地设置值,以便在派生表中更新相应的时间范围,与源表中数据的更新范围相对应。

  • 您也可以将这些手动触发器用于处于 DISABLED 状态的计划查询。这允许您定义不按自动计划执行的特殊查询,因为它们处于 DISABLED 状态。相反,您可以使用其上的手动触发器来管理数据更正或延迟到达的用例。