You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

20 KiB

17 | 数据关联:不同的关联形式与实现机制该怎么选?

你好,我是吴磊。

在上一讲我们学习了Spark SQL支持的诸多算子。其中数据关联Join是数据分析场景中最常见、最重要的操作。毫不夸张地说几乎在所有的数据应用中你都能看到数据关联的“身影”。因此今天这一讲咱们继续详细说一说Spark SQL对于Join的支持。

众所周知Join的种类非常丰富。如果按照**关联形式Join Types**来划分,数据关联分为内关联、外关联、左关联、右关联,等等。对于参与关联计算的两张表,关联形式决定了结果集的数据来源。因此,在开发过程中选择哪种关联形式,是由我们的业务逻辑决定的。

而从实现机制的角度Join又可以分为NLJNested Loop Join、SMJSort Merge Join和HJHash Join。也就是说同样是内关联我们既可以采用NLJ来实现也可以采用SMJ或是HJ来实现。区别在于在不同的计算场景下这些不同的实现机制在执行效率上有着天壤之别。因此了解并熟悉这些机制对咱们开发者来说至关重要。

今天我们就分别从这两个角度来说一说Spark SQL当中数据关联的来龙去脉。

数据准备

为了让你更好地掌握新知识我会通过一个个例子为你说明Spark SQL数据关联的具体用法。在去介绍数据关联之前咱们先把示例中会用到的数据准备好。

import spark.implicits._
import org.apache.spark.sql.DataFrame
 
// 创建员工信息表
val seq = Seq((1, "Mike", 28, "Male"), (2, "Lily", 30, "Female"), (3, "Raymond", 26, "Male"), (5, "Dave", 36, "Male"))
val employees: DataFrame = seq.toDF("id", "name", "age", "gender")
 
// 创建薪资表
val seq2 = Seq((1, 26000), (2, 30000), (4, 25000), (3, 20000))
val salaries:DataFrame = seq2.toDF("id", "salary")

如上表所示我们创建了两个DataFrame一个用于存储员工基本信息我们称之为员工表另一个存储员工薪水我们称之为薪资表。

数据准备好之后我们有必要先弄清楚一些数据关联的基本概念。所谓数据关联它指的是这样一个计算过程给定关联条件Join Conditions将两张数据表以不同关联形式拼接在一起的过程。关联条件包含两层含义一层是两张表中各自关联字段Join Key的选择另一层是关联字段之间的逻辑关系。

上一讲我们说到Spark SQL同时支持DataFrame算子与SQL查询因此咱们不妨结合刚刚准备好的数据分别以这两者为例来说明数据关联中的基本概念。

图片

首先约定俗成地我们把主动参与Join的数据表如上图中的salaries表称作“左表”而把被动参与关联的数据表如employees表称作是“右表”。

然后我们来关注图中蓝色的部分。可以看到两张表都选择id列作为关联字段而两者的逻辑关系是“相等”。这样的一个等式就构成了我们刚刚说的关联条件。接下来我们再来看图中绿色的部分inner指代的就是内关联的关联形式。

关联形式是我们今天要学习的重点内容之一。接下来我们还是一如既往地绕过SQL查询这种开发方式以DataFrame算子这种开发模式为例说一说Spark SQL都支持哪些关联形式以及不同关联形式的效果是怎样的。

关联形式Join Types

在关联形式这方面Spark SQL的支持比较全面为了让你一上来就建立一个整体的认知我把Spark SQL支持的Joint Types都整理到了如下的表格中你不妨先粗略地过一遍。

图片

结合已经准备好的数据,我们分别来说一说每一种关联形式的用法,以及它们各自的作用与效果。我们先从最简单、最基础、也是最常见的内关联说起。

内关联Inner Join

对于登记在册的员工,如果我们想获得他们每个人的薪资情况,就可以使用内关联来实现,如下所示。

// 内关联
val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "inner")
 
jointDF.show
 
/** 结果打印
+---+------+---+-------+---+------+
| id|salary| id| name|age|gender|
+---+------+---+-------+---+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 3| 20000| 3|Raymond| 26| Male|
+---+------+---+-------+---+------+
*/
 
// 左表
salaries.show
 
/** 结果打印
+---+------+
| id|salary|
+---+------+
| 1| 26000|
| 2| 30000|
| 4| 25000|
| 3| 20000|
+---+------+
*/
 
// 右表
employees.show
 
/** 结果打印
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| Mike| 28| Male|
| 2| Lily| 30|Female|
| 3|Raymond| 26| Male|
| 5| Dave| 36| Male|
+---+-------+---+------+
*/

可以看到基于join算子的一般用法我们只要在第3个参数中指定“inner”这种关联形式就可以使用内关联的方式来达成两表之间的数据拼接。不过如果仔细观察上面打印的关联结果集以及原始的薪资表与员工表你会发现左表和右表的原始数据并没有都出现在结果集当中。

例如在原始的薪资表中有一条id为4的薪资记录而在员工表中有一条id为5、name为“Dave”的数据记录。这两条数据记录都没有出现在内关联的结果集中而这正是“内关联”这种关联形式的作用所在。

内关联的效果,是仅仅保留左右表中满足关联条件的那些数据记录。以上表为例关联条件是salaries(“id”) === employees(“id”)而在员工表与薪资表中只有1、2、3这三个值同时存在于他们各自的id字段中。相应地结果集中就只有id分别等于1、2、3的这三条数据记录。

理解了内关联的含义与效果之后,你再去学习其他的关联形式,比如说外关联,就会变得轻松许多。

外关联Outer Join

外关联还可以细分为3种形式分别是左外关联、右外关联、以及全外关联。这里的左、右对应的实际上就是左表、右表。

由简入难我们先来说左外关联。要把salaries与employees做左外关联我们只需要把“inner”关键字替换为“left”、“leftouter”或是“left_outer”即可如下所示。

val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "left")
 
jointDF.show
 
/** 结果打印
+---+------+----+-------+----+------+
| id|salary| id| name| age|gender|
+---+------+----+-------+----+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 4| 25000|null| null|null| null|
| 3| 20000| 3|Raymond| 26| Male|
+---+------+----+-------+----+------+
*/

不难发现左外关联的结果集实际上就是内关联结果集再加上左表salaries中那些不满足关联条件的剩余数据也即id为4的数据记录。值得注意的是由于右表employees中并不存在id为4的记录因此结果集中employees对应的所有字段值均为空值null。

没有对比就没有鉴别为了更好地理解前面学的内关联、左外关联我们再来看看右外关联的执行结果。为了计算右外关联在下面的代码中我们把“left”关键字替换为“right”、“rightouter”或是“right_outer”。

val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "right")
 
jointDF.show
 
/** 结果打印
+----+------+---+-------+---+------+
| id|salary| id| name|age|gender|
+----+------+---+-------+---+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 3| 20000| 3|Raymond| 26| Male|
|null| null| 5| Dave| 36| Male|
+----+------+---+-------+---+------+
*/

仔细观察你会发现与左外关联相反右外关联的结果集恰恰是内关联的结果集再加上右表employees中的剩余数据也即id为5、name为“Dave”的数据记录。同样的由于左表salaries并不存在id等于5的数据记录因此结果集中salaries相应的字段置空以null值进行填充。

理解了左外关联与右外关联全外关联的功用就显而易见了。全外关联的结果集就是内关联的结果再加上那些不满足关联条件的左右表剩余数据。要进行全外关联的计算关键字可以取“full”、“outer”、“fullouter”、或是“full_outer”如下表所示。

val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "full")
 
jointDF.show
 
/** 结果打印
+----+------+----+-------+----+------+
| id|salary| id| name| age|gender|
+----+------+----+-------+----+------+
| 1| 26000| 1| Mike| 28| Male|
| 3| 20000| 3|Raymond| 26| Male|
|null| null| 5| Dave| 36| Male|
| 4| 25000|null| null|null| null|
| 2| 30000| 2| Lily| 30|Female|
+----+------+----+-------+----+------+
*/

到这里,内、外关联的作用我们就讲完了。聪明的你可能早已发现,这里的“内”,它指的是,在关联结果中,仅包含满足关联条件的那些数据记录;而“外”,它的含义是,在关联计算的结果集中,还包含不满足关联条件的数据记录。而外关联中的“左”、“右”、“全”,恰恰是在表明,那些不满足关联条件的记录,来自于哪里。

弄清楚“内”、“外”、“左”、“右”这些说法的含义能够有效地帮我们避免迷失在种类繁多、却又彼此相关的关联形式中。其实除了内关联和外关联Spark SQL还支持左半关联和左逆关联这两个关联又是用来做什么的呢

左半/逆关联Left Semi Join / Left Anti Join

尽管名字听上去拗口但它们的含义却很简单。我们先来说左半关联它的关键字有“leftsemi”和“left_semi”。左半关联的结果集实际上是内关联结果集的子集它仅保留左表中满足关联条件的那些数据记录如下表所示。

// 内关联
val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "inner")
 
jointDF.show
 
/** 结果打印
+---+------+---+-------+---+------+
| id|salary| id| name|age|gender|
+---+------+---+-------+---+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 3| 20000| 3|Raymond| 26| Male|
+---+------+---+-------+---+------+
*/
 
// 左半关联
val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "leftsemi")
 
jointDF.show
 
/** 结果打印
+---+------+
| id|salary|
+---+------+
| 1| 26000|
| 2| 30000|
| 3| 20000|
+---+------+
*/

为了方便你进行对比我分别打印出了内关联与左半关联的计算结果。这里你需要把握左半关联的两大特点首先左半关联是内关联的一个子集其次它只保留左表salaries中的数据。这两个特点叠加在一起很好地诠释了“左、半”这两个字。

有了左半关联的基础左逆关联会更好理解一些。左逆关联同样只保留左表的数据它的关键字有“leftanti”和“left_anti”。但与左半关联不同的是它保留的是那些不满足关联条件的数据记录如下所示。

// 左逆关联
val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "leftanti")
 
jointDF.show
 
/** 结果打印
+---+------+
| id|salary|
+---+------+
| 4| 25000|
+---+------+
*/

通过与上面左半关联的结果集做对比我们一眼就能看出左逆关联和它的区别所在。显然id为4的薪资记录是不满足关联条件salaries(“id”) === employees(“id”)的,而左逆关联留下的,恰恰是这些“不达标”的数据记录。

好啦关于Spark SQL支持的关联形式到这里我们就全部说完了。根据这些不同关联形式的特点与作用再结合实际场景中的业务逻辑相信你可以在日常的开发中做到灵活取舍。

关联机制Join Mechanisms

不过从功能的角度出发使用不同的关联形式来实现业务逻辑可以说是程序员的一项必备技能。要在众多的开发者中脱颖而出咱们还要熟悉、了解不同的关联机制。哪怕同样是内关联不同的Join实现机制在执行效率方面差异巨大。因此掌握不同关联机制的原理与特性有利于我们逐渐培养出以性能为导向的开发习惯。

在这一讲的开头我们提到Join有3种实现机制分别是NLJNested Loop Join、SMJSort Merge Join和HJHash Join。接下来我们以内关联为例结合salaries和employees这两张表来说说它们各自的实现原理与特性。

// 内关联
val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "inner")
 
jointDF.show
 
/** 结果打印
+---+------+---+-------+---+------+
| id|salary| id| name|age|gender|
+---+------+---+-------+---+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 3| 20000| 3|Raymond| 26| Male|
+---+------+---+-------+---+------+
*/

NLJNested Loop Join

对于参与关联的两张表如salaries和employees按照它们在代码中出现的顺序我们约定俗成地把salaries称作“左表”而把employees称作“右表”。在探讨关联机制的时候我们又常常把左表称作是“驱动表”而把右表称为“基表”。

一般来说,驱动表的体量往往较大,在实现关联的过程中,驱动表是主动扫描数据的那一方。而基表相对来说体量较小,它是被动参与数据扫描的那一方

在NLJ的实现机制下算法会使用外、内两个嵌套的for循环来依次扫描驱动表与基表中的数据记录。在扫描的同时还会判定关联条件是否成立如内关联例子中的salaries(“id”) === employees(“id”)。如果关联条件成立,就把两张表的记录拼接在一起,然后对外进行输出。

图片

在实现的过程中,外层的 for 循环负责遍历驱动表的每一条数据,如图中的步骤 1 所示。对于驱动表中的每一条数据记录,内层的 for 循环会逐条扫描基表的所有记录依次判断记录的id字段值是否满足关联条件如步骤 2 所示。

不难发现,假设驱动表有 M 行数据,而基表有 N 行数据,那么 NLJ 算法的计算复杂度是 O(M * N)。尽管NLJ的实现方式简单、直观、易懂但它的执行效率显然很差。

SMJSort Merge Join

鉴于NLJ低效的计算效率SMJ应运而生。Sort Merge Join顾名思义SMJ的实现思路是先排序、再归并。给定参与关联的两张表SMJ先把他们各自排序然后再使用独立的游标对排好序的两张表做归并关联。

图片

具体计算过程是这样的起初驱动表与基表的游标都会先锚定在各自的第一条记录上然后通过对比游标所在记录的id字段值来决定下一步的走向。对比结果以及后续操作主要分为 3 种情况:

  • 满足关联条件两边的id值相等那么此时把两边的数据记录拼接并输出然后把驱动表的游标滑动到下一条记录
  • 不满足关联条件驱动表id值小于基表的id值此时把驱动表的游标滑动到下一条记录
  • 不满足关联条件驱动表id值大于基表的id值此时把基表的游标滑动到下一条记录。

基于这 3 种情况SMJ不停地向下滑动游标直到某张表的游标滑到尽头即宣告关联结束。对于驱动表的每一条记录由于基表已按id字段排序且扫描的起始位置为游标所在位置因此SMJ算法的计算复杂度为 O(M + N)。

然而,计算复杂度的降低,仰仗的其实是两张表已经事先排好了序。但是我们知道,排序本身就是一项很耗时的操作,更何况,为了完成归并关联,参与 Join 的两张表都需要排序。

因此SMJ的计算过程我们可以用“先苦后甜”来形容。苦指的是要先花费时间给两张表做排序而甜指的则是有序表的归并关联能够享受到线性的计算复杂度。

HJHash Join

考虑到SMJ对于排序的苛刻要求后来又有人推出了HJ算法。HJ的设计初衷是以空间换时间力图将基表扫描的计算复杂度降低至O(1)。

图片

具体来说HJ的计算分为两个阶段分别是Build阶段和Probe阶段。在Build阶段在基表之上算法使用既定的哈希函数构建哈希表如上图的步骤 1 所示。哈希表中的Key是id字段应用Apply哈希函数之后的哈希值而哈希表的 Value 同时包含了原始的Join Keyid字段和Payload。

在Probe阶段算法依次遍历驱动表的每一条数据记录。首先使用同样的哈希函数以动态的方式计算Join Key的哈希值。然后算法再用哈希值去查询刚刚在Build阶段创建好的哈希表。如果查询失败则说明该条记录与基表中的数据不存在关联关系相反如果查询成功则继续对比两边的Join Key。如果Join Key一致就把两边的记录进行拼接并输出从而完成数据关联。

好啦到此为止对于Join的3种实现机制我们暂时说到这里。对于它们各自的实现原理想必你已经有了充分的把握。至于这3种机制都适合哪些计算场景以及Spark SQL如何利用这些机制在分布式环境下做数据关联我们留到下一讲再去展开。

重点回顾

今天这一讲我们重点介绍了数据关联中的关联形式Join Types与实现机制Join Mechanisms。掌握了不同的关联形式我们才能游刃有余地满足不断变化的业务需求。而熟悉并理解不同实现机制的工作原理则有利于培养我们以性能为导向的开发习惯。

Spark SQL支持的关联形式多种多样为了方便你查找我把它们的含义与效果统一整理到了如下的表格中。在日后的开发工作中当你需要区分并确认不同的关联形式时只要回顾这张表格就能迅速得到结论。

图片

在此之后我们又介绍了Join的3种实现机制它们分别是Nested Loop Join、Sort Merge Join和Hash Join。这3种实现机制的工作原理我也整理成了表格方便你随时查看。

图片

每课一练

对于Join的3种实现机制也即Nested Loop Join、Sort Merge Join和Hash Join结合其实现原理你能猜一猜它们可能的适用场景都有哪些吗或者换句话说在什么样的情况下更适合使用哪种实现机制来进行数据关联

欢迎你在留言区跟我交流互动,也推荐你把这一讲分享给身边的同事、朋友。