时间戳和 ROWTIME 列 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

时间戳和 ROWTIME 列

应用程序内部流包含名为 ROWTIME 的特殊列。该列存储 Amazon Kinesis Data Analytics 在第一个应用程序内部流中插入行的时间戳。ROWTIME 反映了 Amazon Kinesis Data Analytics 从流式传输源中读取后将记录插入到第一个应用程序内部流的时间戳。之后,该 ROWTIME 值在您的整个应用程序中进行维护。

注意

在将一个应用程序内部流中的记录泵取到另一个应用程序内部流时,您不需要明确复制 ROWTIME 列,Amazon Kinesis Data Analytics 将会为您复制该列。

Amazon Kinesis Data Analytics 会确保 ROWTIME 值是单调递增的。您可以在基于时间的窗口式查询中使用此时间戳。有关更多信息,请参阅窗口式查询

您可以访问 SELECT 语句中的 ROWTIME 列,就像应用程序内部流中的任何其他列一样。例如:

SELECT STREAM ROWTIME, some_col_1, some_col_2 FROM SOURCE_SQL_STREAM_001

了解流式分析中的各种时间

除了 ROWTIME 之外,在实时流式应用程序中还存在其他类型的时间。这些是:

  • 事件时间 - 发生事件时的时间戳。它有时也称为客户端时间。经常需要在分析中使用此时间,因为它是事件发生时的时间。但是,许多事件源(例如手机和 Web 客户端)没有可靠的时钟,这可能会导致时间不准确。此外,连接问题可能会导致记录没有按照事件发生顺序出现在流中。

     

  • 接收时间 - 记录添加到流式传输源时的时间戳。Amazon Kinesis Data Streams 在提供此时间戳的每条记录中都提供了一个名为 APPROXIMATE_ARRIVAL_TIME 的字段。有时这也称为服务器端时间。此接收时间通常非常接近事件时间。如果记录接收到流时存在任何种类的延迟,会导致不准确,这种情况通常很少见。此外,接收时间很少出现顺序问题,但由于流数据的分布特点,它也会出现。因此,接收时间通常准确地反映按顺序排列的事件时间。

     

  • 处理时间 – Amazon Kinesis Data Analytics 在第一个应用程序内部流中插入一行的时间戳。Amazon Kinesis Data Analytics 在每个应用程序内部流的 ROWTIME 列中提供此时间戳。处理时间始终是单调递增的。但如果应用程序滞后,则处理时间不准确。(如果应用程序滞后,则处理时间无法准确反映事件时间)。此 ROWTIME 相对于时钟来说很准确,但可能不是事件实际发生的时间。

在基于时间的窗口式查询中使用这些时间有优点也有缺点。我们建议您选择这些时间中的一个或多个,并根据您的使用案例场景选择一种策略来处理相关缺点。

注意

如果您使用的是基于行的窗口,则时间不是问题,您可以忽略本部分。

我们建议采用双窗口策略,这两个窗口基于不同的时间,即 ROWTIME 和其他时间(接收时间或事件时间)中的一个。

  • 使用 ROWTIME 作为第一个窗口,控制查询发送结果的频率,如以下示例所示。它不用作逻辑时间。

  • 使用其他时间中您希望与分析关联的逻辑时间。该时间表示事件的发生时间。在以下示例中,分析目标是按股票行情机对记录分组并返回计数。

此策略的优势是它可以使用事件发生时间。它可以轻松处理您的应用程序落后或事件无序到达的情况。当将记录放入应用程序内部流时,如果应用程序落后,记录仍然按第二个窗口中的逻辑时间分组。查询使用 ROWTIME 确保处理顺序。落后的任何记录(与 ROWTIME 值相比,接收时间戳显示的值较早)也会成功处理。

针对入门练习中使用的演示流,考虑以下查询。查询使用 GROUP BY 子句,并在一分钟滚动窗口中发送股票行情机计数。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("ingest_time" timestamp, "APPROXIMATE_ARRIVAL_TIME" timestamp, "ticker_symbol" VARCHAR(12), "symbol_count" integer); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "ingest_time", STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND) AS "APPROXIMATE_ARRIVAL_TIME", "TICKER_SYMBOL", COUNT(*) AS "symbol_count" FROM "SOURCE_SQL_STREAM_001" GROUP BY "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND);

GROUP BY 中,您首先在一分钟窗口中基于 ROWTIME 对记录分组,然后基于 APPROXIMATE_ARRIVAL_TIME 分组。

结果中的时间戳值已向下舍入为最接近的 60 秒间隔。查询发送的第一组结果显示第一分钟的记录。发送的第二组结果基于 ROWTIME 显示后续分钟内的记录。最后一条记录指示应用程序在应用程序内部流中放入记录是最晚的(与接收时间戳相比,它显示最晚的 ROWTIME 值)。

ROWTIME INGEST_TIME TICKER_SYMBOL SYMBOL_COUNT --First one minute window. 2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 ABC 10 2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 DEF 15 2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 XYZ 6 –-Second one minute window. 2016-07-19 17:06:00.0 2016-07-19 17:06:00.0 ABC 11 2016-07-19 17:06:00.0 2016-07-19 17:06:00.0 DEF 11 2016-07-19 17:06:00.0 2016-07-19 17:05:00.0 XYZ 1 *** ***late-arriving record, instead of appearing in the result of the first 1-minute windows (based on ingest_time, it is in the result of the second 1-minute window.

您可以将结果推送到下游数据库,以汇总结果来得到每分钟的最终准确计数。例如,您可以将应用程序输出配置为将结果保存到可以写入 Amazon Redshift 表的 Firehose 传输流中。在结果位于 Amazon Redshift 表后,您可以查询该表以计算按 Ticker_Symbol 分组的总数。对于 XYZ,总数是精确的 (6+1),即使记录延迟到达也是如此。