JOIN 子句 - Amazon Kinesis Data Analytics
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

JOIN 子句

SELECT 语句中的 JOIN 子句合并了来自一个或多个流或引用表中的列。

流-流联接

Amazon Kinesis Data Analytics 支持使用 SQL 将应用程序内流与另一个应用程序内流相联接,从而将这一重要的传统数据库功能带入流上下文中。

本节介绍 Kinesis Data Analytics 支持的联接类型,包括基于时间和基于行的窗口联接,以及有关流式联接的详细信息。

联接类型

有五种联接类型:

INNER JOIN (或仅 JOIN)

返回来自左侧和来自右侧并且联接条件的计算结果为 TRUE 的所有行对。

LEFT OUTER JOIN (或仅 LEFT JOIN)

作为 INNER JOIN,但即使左侧的行不与右侧的任何行匹配,也会保留左侧的行。在右侧生成 NULL 值。

RIGHT OUTER JOIN (或仅 RIGHT JOIN)

作为 INNER JOIN,但即使右侧的行不与左侧的任何行匹配,也会保留右侧的行。在左侧为这些行生成 NULL 值。

FULL OUTER JOIN (或仅 FULL JOIN)

作为 INNER JOIN,但即使两侧的行不与任一侧的任何行匹配,也会保留两侧的行。在另一侧为这些行生成 NULL 值。

CROSS JOIN

返回输入的笛卡尔积:左侧的每个行均与右侧的每个行配对。

基于时间的窗口与基于行的窗口联接

将左侧流的整个历史记录与右侧流的整个历史记录相联接是不切实际的。因此,您必须使用 OVER 子句将至少一个流限制到一个时间窗口。OVER 子句将定义一个在给定时间进行联接时要考虑的行窗口。

该窗口可以基于时间,也可以基于行:

  • 基于时间的窗口使用 RANGE 关键字。它将窗口定义为其 ROWTIME 列落在查询的当前时间的特定时间间隔内的一组行。

    例如,以下子句指定窗口包含其 ROWTIME 在流当前时间之前的那个小时内的所有行:

    OVER (RANGE INTERVAL '1' HOUR PRECEDING)
  • 基于行的窗口使用 ROWS 关键字。它将窗口定义为具有当前时间戳的行之前或之后的给定行计数。

    例如,以下子句指定窗口中仅包含最新的 10 行:

    OVER (ROWS 10 PRECEDING)
注意

如果在联接的一侧没有指定时间窗口或基于行的窗口,则只有该侧的当前行参与联接计算。

流到流联接的示例

以下示例演示应用程序内流到流联接的工作原理、何时返回联接的结果以及联接结果的行时间。

示例数据集

本节中的示例基于以下数据集和流定义:

Orders 数据示例
{ "orderid":"101", "orders":"1" }
Shipments 数据示例
{ "orderid":"101", "shipments":"2" }
创建 ORDERS_STREAM 应用程序内流
CREATE OR REPLACE STREAM "ORDERS_STREAM" ("orderid" int, "orderrowtime" timestamp); CREATE OR REPLACE PUMP "ORDERS_STREAM_PUMP" AS INSERT INTO "ORDERS_STREAM" SELECT STREAM "orderid", "ROWTIME" FROM "SOURCE_SQL_STREAM_001" WHERE "orders" = 1;
创建 SHIPMENTS_STREAM 应用程序内流
CREATE OR REPLACE STREAM "SHIPMENTS_STREAM" ("orderid" int, "shipmentrowtime" timestamp); CREATE OR REPLACE PUMP "SHIPMENTS_STREAM_PUMP" AS INSERT INTO "SHIPMENTS_STREAM" SELECT STREAM "orderid", "ROWTIME" FROM "SOURCE_SQL_STREAM_001" WHERE "shipments" = 2;

示例 1: JOIN (INNER JOIN) 一侧的时间窗口

此示例演示了一个查询,该查询返回在最后一分钟执行发货的所有订单。

联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.rowtime as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o JOIN SHIPMENTS_STREAM AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM SHIPMENTS_STREAM OUTPUT_STREAM
ROWTIME orderid ROWTIME orderid resultrowtime orderid shipmenttime OrderTime
10:00:00 101 10:00:00 100
10:00:20 102
10:00:30 103
10:00:40 104
10:00:45 104
10:00:45 100* 10:00:45 104 10:00:45 10:00:40
10:00:50 105

* - orderid = 100 的记录是 Orders 流中的较迟事件。

联接的可视化表示

下图表示一个查询,该查询返回在最后一分钟执行发货的所有订单。


              最后一分钟发生的所有订单(orders_stream) 与发货 (shipments_stream) 之间的联接图。
触发结果

下面介绍了触发查询结果的事件。

  • 由于未在 Shipments 流上指定时间或行窗口,因此只有Shipments 流的当前行参与联接。

  • 由于对 Orders 流的查询指定了前一分钟的窗口,因此 Orders 流中最后一分钟带有 ROWTIME 的行将参与联接。

  • 当 Shipments 流中的记录对于 orderid 104 在 10:00:45到达时,将触发 JOIN 结果,因为前一分钟在 Orders 流中对于 orderid 存在匹配项。

  • Orders 流中 Orderid 为 100 的记录到达较迟,因此 Shipments 流中的对应记录不是最新记录。由于未在 Shipments 流上指定任何窗口,因此只有Shipments 流的当前行参与联接。因此,JOIN 语句不会针对 orderid 100 返回任何记录。有关在 JOIN 语句中包含较迟行的信息,请参阅 示例 2

  • 因为在 Shipments 流中对于 Orderid 105 没有匹配的记录,所以不会发出任何结果,并且该记录将被忽略。

结果的 ROWTIME
  • 输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。

示例 2: JOIN (INNER JOIN) 两侧的时间窗口

此示例演示了一个查询,该查询返回最后一分钟执行的所有订单以及最后一分钟执行的发货。

联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.rowtime as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o JOIN SHIPMENTS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM SHIPMENTS_STREAM OUTPUT_STREAM
ROWTIME orderid ROWTIME orderid resultrowtime orderid shipmenttime OrderTime
10:00:00 101 10:00:00 100
10:00:20 102
10:00:30 103
10:00:40 104
10:00:45 104
10:00:45 100* 10:00:45 104 10:00:45 10:00:40
10:00:45 100 10:00:00 10:00:45
10:00:50 105

* - orderid = 100 的记录是 Orders 流中的较迟事件。

联接的可视化表示

下图表示一个查询,该查询返回最后一分钟执行的所有订单以及最后一分钟执行的发货。


              最后一分钟发生的所有订单 (orders_stream) 与最后一分钟发生的发货 (shipments_stream) 之间的联接图。
触发结果

下面介绍了触发查询结果的事件。

  • 在联接的两侧指定窗口。因此,Orders 流和 Shipments 流的当前行之前的一分钟内的所有行都参与联接。

  • 当 Shipments 流中与 orderid 104 对应的记录到达时,Orders 流中的对应记录在一分钟窗口内。因此,一条记录返回到 Output 流。

  • 即使 orderid 100 的订单事件在 Orders 流中到达较晚,也返回了联接结果。这是因为 Shipments 流中的窗口包括过去一分钟的订单,其中包括对应的记录。

  • 在联接的两侧设置一个窗口有助于在联接的两侧包含迟到的记录;例如,如果订单或发货记录延迟收到或出现问题。

结果的 ROWTIME
  • 输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。

示例 3:RIGHT JOIN (RIGHT OUTER JOIN) 一侧的时间窗口

此示例演示了一个查询,该查询返回最后一分钟执行的所有发货,无论最后一分钟是否存在对应的订单。

联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o RIGHT JOIN SHIPMENTS_STREAM AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM SHIPMENTS_STREAM OUTPUT_STREAM
ROWTIME orderid ROWTIME orderid resultrowtime orderid OrderTime
10:00:00 101 10:00:00 100
10:00:00 100 null
10:00:20 102
10:00:30 103
10:00:40 104
10:00:45 104
10:00:45 100* 10:00:45 104 10:00:40
10:00:50 105
10:00:50 105 null

* - orderid = 100 的记录是 Orders 流中的较迟事件。

联接的可视化表示

下图表示一个查询,该查询返回最后一分钟执行的所有装运,无论是否在最后一分钟内存在对应的订单。


              查询的图表,该查询返回最后一分钟发生的所有发货 (shipments_stream),无论最后一分钟是否存在对应的订单 (orders_stream)
触发结果

下面介绍了触发查询结果的事件。

  • 当 Shipments 流中与 orderid 104 对应的记录到达时,将在 Output 流中发出结果。

  • 只要 Shipments 流中与 orderid 105 对应的记录到达,就在 Output 流中发出一条记录。但是,Orders 流中没有匹配的记录,因此 OrderTime 值为 null。

结果的 ROWTIME
  • 输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。

  • 因为联接(Shipments 流)的右侧没有窗口,所以具有不匹配联接的结果的 ROWTIME 是不匹配行的 ROWTIME。

示例 4:RIGHT JOIN (RIGHT OUTER JOIN) 两侧的时间窗口

此示例演示一个查询,该查询返回最后一分钟执行的所有发货,无论它们是否具有对应的订单。

联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.ROWTIME as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o RIGHT JOIN SHIPMENTS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM SHIPMENTS_STREAM OUTPUT_STREAM
ROWTIME orderid ROWTIME orderid resultrowtime orderid shipmenttime OrderTime
10:00:00 101 10:00:00 100
10:00:20 102
10:00:30 103
10:00:40 104
10:00:45 104
10:00:45 100* 10:00:45 104 10:00:40 10:00:45
10:00:45 100 10:00:45 10:00:00
10:00:50 105
 
10:01:50 105 10:00:50 null

* - orderid = 100 的记录是 Orders 流中的较迟事件。

联接的可视化表示

下图表示一个查询,该查询返回最后一分钟执行的所有发货,无论它们是否具有对应的订单。


              查询的图表,该查询返回最后一分钟的所有发货 (shipments_stream),无论是否具有对应的订单 (orders_stream)。
触发结果

下面介绍了触发查询结果的事件。

  • 当 Shipments 流中与 orderid 104 对应的记录到达时,将在 Output 流中发出结果。

  • 即使 orderid 100 的订单事件在 Orders 流中到达较晚,也会返回联接结果。这是因为 Shipments 流中的窗口包括过去一分钟的订单,其中包括对应的记录。

  • 对于未找到其订单的发货(对于 orderid 105),直到 Shipments 流上的一分钟窗口结束时,结果才会发送到 Output 流。

结果的 ROWTIME
  • 输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。

  • 对于没有匹配的订单记录的发货记录,结果的 ROWTIME 是窗口末尾的 ROWTIME。这是因为联接的右侧(来自 Shipments 流)现在是事件的一分钟窗口,并且服务正在等待窗口结束以确定是否有任何匹配的记录到达。当窗口结束且没有找到匹配的记录时,将以与窗口结束对应的 ROWTIME 发出结果。

示例 5:LEFT JOIN (LEFT OUTER JOIN) 一侧的时间窗口

此示例演示了一个查询,该查询返回最后一分钟执行的所有订单,无论最后一分钟是否存在对应的发货。

联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", o."orderid", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o LEFT JOIN SHIPMENTS_STREAM AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM SHIPMENTS_STREAM OUTPUT_STREAM
ROWTIME orderid ROWTIME orderid resultrowtime orderid OrderTime
10:00:00 101 10:00:00 100
10:00:20 102
10:00:30 103
10:00:40 104
10:00:45 104
10:00:45 100* 10:00:45 104 10:00:40
10:00:50 105
 
10:01:00 101 10:00:00
10:01:20 102 10:00:20
10:01:30 103 10:00:30
10:01:40 104 10:00:40
10:01:45 100 10:00:45

* - orderid = 100 的记录是 Orders 流中的较迟事件。

联接的可视化表示

下图表示一个查询,该查询返回最后一分钟执行的所有订单,无论最后一分钟是否存在对应的发货。


              查询的图表,该查询返回最后一分钟执行的所有订单 (orders_stream),无论最后一分钟是否存在对应的发货 (shipments_stream)。
触发结果

下面介绍了触发查询结果的事件。

  • 当 Shipments 流中与 orderid 104 对应的记录到达时,将在 Output 流中发出结果。

  • 对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,直到一分钟窗口结束时才会将记录发出到 Output 流。这是因为服务正在等待直到窗口结束以获取匹配的记录。

结果的 ROWTIME
  • 输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。

  • 对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,结果的 ROWTIME 是当前窗口结束的 ROWTIME。

示例 6:LEFT JOIN (LEFT OUTER JOIN) 两侧的时间窗口

此示例演示一个查询,该查询返回最后一分钟执行的所有订单,无论它们是否具有对应的发货。

联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.ROWTIME as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o LEFT JOIN SHIPMENTS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM SHIPMENTS_STREAM OUTPUT_STREAM
ROWTIME orderid ROWTIME orderid resultrowtime orderid shipmenttime OrderTime
10:00:00 101 10:00:00 100
10:00:20 102
10:00:30 103
10:00:40 104
10:00:45 104
10:00:45 100* 10:00:45 104 10:00:40 10:00:45
10:00:50 105 10:00:45 100 10:00:00 10:00:45
 
10:01:00 101 null 10:00:00
10:01:20 102 null 10:00:20
10:01:30 103 null 10:00:30
10:01:40 104 null 10:00:40
10:01:45 100 null 10:00:45

* - orderid = 100 的记录是 Orders 流中的较迟事件。

联接的可视化表示

下图表示一个查询,该查询返回最后一分钟执行的所有订单,无论它们是否具有对应的发货。


              查询的图表,该查询返回最后一分钟执行的所有订单 (orders_stream),无论它们是否具有对应的发货 (shipments_stream)。
触发结果

下面介绍了触发查询结果的事件。

  • 当 Shipments 流中与 orderid 104 和 100 对应的记录到达时,将在 Output 流中发出结果。即使 Orders 流中与 orderid 100 对应的记录较迟到达,也会发生这种情况。

  • 对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,将在一分钟窗口结束时在 Output 流中发出。这是因为该服务一直等到窗口结束,以获取 Shipments 流中的对应记录。

结果的 ROWTIME
  • 输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。

  • 对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,订单的 ROWTIME 是与窗口结束相对应的 ROWTIME。

摘要

  • Kinesis Data Analytics 始终以 ROWTIME 的升序从联接返回行。

  • 对于内部联接,输出行的 ROWTIME 是两个输入行的 ROWTIME 的较迟者。这也适用于在其中发现了匹配项的输入行的外部联接。

  • 对于未发现其匹配项的外部联接,输出行的 ROWTIME 是以下两个时间的较迟者:

    • 未发现其匹配项的输入行的 ROWTIME。

    • 在已发现任何可能的匹配项的时间点,另一个输入流的窗口的较迟边界。

流-表联接

如果其中一个关系为流,其他关系为有限关系,则称之为流-表联接。对于流中的每个行,查询将查找表中与联接条件匹配的行。

例如,Orders 是流,PriceList 是表。联接的效果是将价目表信息添加到订单。

有关创建参考数据源并将流加入参考表的信息,请参阅 《Amazon Kinesis Data Analytics 开发人员指南》 中的示例:添加参考数据源