You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
17 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 26 | Join Hints指南不同场景下如何选择Join策略
你好,我是吴磊。
在数据分析领域数据关联可以说是最常见的计算场景了。因为使用的频率很高所以Spark为我们准备了非常丰富的关联形式包括Inner Join、Left Join、Right Join、Anti Join、Semi Join等等。
搞懂不同关联形式的区别与作用可以让我们快速地实现业务逻辑。不过这只是基础要想提高数据关联场景下Spark应用的执行性能更为关键的是我们要能够深入理解Join的实现原理。
所以今天这一讲我们先来说说单机环境中Join都有哪几种实现方式它们的优劣势分别是什么。理解了这些实现方式我们再结合它们一起探讨分布式计算环境中Spark都支持哪些Join策略。对于不同的Join策略Spark是怎么做取舍的。
## Join的实现方式详解
到目前为止数据关联总共有3种Join实现方式。按照出现的时间顺序分别是嵌套循环连接NLJNested Loop Join 、排序归并连接SMJShuffle Sort Merge Join和哈希连接HJHash Join。接下来我们就借助一个数据关联的场景来分别说一说这3种Join实现方式的工作原理。
假设现在有事实表orders和维度表users。其中users表存储用户属性信息orders记录着用户的每一笔交易。两张表的Schema如下
```
// 订单表orders关键字段
userId, Int
itemId, Int
price, Float
quantity, Int
// 用户表users关键字段
id, Int
name, String
type, String //枚举值,分为头部用户和长尾用户
```
我们的任务是要基于这两张表做内关联Inner Join同时把用户名、单价、交易额等字段投影出来。具体的SQL查询语句如下表
```
//SQL查询语句
select orders.quantity, orders.price, orders.userId, users.id, users.name
from orders inner join users on orders.userId = users.id
```
那么对于这样一个关联查询在3种不同的Join实现方式下它是如何完成计算的呢
### NLJ的工作原理
对于参与关联的两张数据表我们通常会根据它们扮演的角色来做区分。其中体量较大、主动扫描数据的表我们把它称作外表或是驱动表体量较小、被动参与数据扫描的表我们管它叫做内表或是基表。那么NLJ是如何关联这两张数据表的呢
**NLJ是采用“嵌套循环”的方式来实现关联的。**也就是说NLJ会使用内、外两个嵌套的for循环依次扫描外表和内表中的数据记录判断关联条件是否满足比如例子中的orders.userId = users.id如果满足就把两边的记录拼接在一起然后对外输出。
![](https://static001.geekbang.org/resource/image/be/13/be0774ffca24f9c20caa2ef6bd88d013.jpg "Nested Loop Join示意图")
在这个过程中外层的for循环负责遍历外表中的每一条数据如图中的步骤1所示。而对于外表中的每一条数据记录内层的for循环会逐条扫描内表的所有记录依次判断记录的Join Key是否满足关联条件如步骤2所示。假设外表有M行数据内表有N行数据那么**NLJ算法的计算复杂度是O(M \* N)**。不得不说尽管NLJ实现方式简单而又直接但它的执行效率实在让人不敢恭维。
### SMJ的工作原理
正是因为NLJ极低的执行效率所以在它推出之后没多久之后就有人用排序、归并的算法代替NLJ实现了数据关联这种算法就是SMJ。**SMJ的思路是先排序、再归并。**具体来说就是参与Join的两张表先分别按照Join Key做升序排序。然后SMJ会使用两个独立的游标对排好序的两张表完成归并关联。
![](https://static001.geekbang.org/resource/image/e2/b2/e2a8f8d1b2572ff456fa83a3f25ccbb2.jpg "Sort Merge Join示意图")
SMJ刚开始工作的时候内外表的游标都会先锚定在两张表的第一条记录上然后再对比游标所在记录的Join Key。对比结果以及后续操作主要分为3种情况
1. 外表Join Key等于内表Join Key满足关联条件把两边的数据记录拼接并输出然后把外表的游标滑动到下一条记录
2. 外表Join Key小于内表Join Key不满足关联条件把外表的游标滑动到下一条记录
3. 外表Join Key大于内表Join Key不满足关联条件把内表的游标滑动到下一条记录
SMJ正是基于这3种情况不停地向下滑动游标直到某张表的游标滑到头即宣告关联结束。对于SMJ中外表的每一条记录由于内表按Join Key升序排序且扫描的起始位置为游标所在位置因此**SMJ算法的计算复杂度为O(M + N)**。
不过SMJ计算复杂度的降低仰仗的是两张表已经事先排好序。要知道排序本身就是一项非常耗时的操作更何况为了完成归并关联参与Join的两张表都需要排序。因此SMJ的计算过程我们可以用“先苦后甜”来形容。苦的是要先花费时间给两张表做排序甜的是有序表的归并关联能够享受到线性的计算复杂度。
### HJ的工作原理
考虑到SMJ对排序的要求比较苛刻所以后来又有人提出了效率更高的关联算法HJ。HJ的设计初衷非常明确**把内表扫描的计算复杂度降低至O(1)**。把一个数据集合的访问效率提升至O(1)也只有Hash Map能做到了。也正因为Join的关联过程引入了Hash计算所以它叫HJ。
![](https://static001.geekbang.org/resource/image/5c/e4/5c81d814591eba9d08e6a3174ffe22e4.jpg)
HJ的计算分为两个阶段分别是Build阶段和Probe阶段。在Build阶段基于内表算法使用既定的哈希函数构建哈希表如上图的步骤1所示。哈希表中的Key是Join Key应用Apply哈希函数之后的哈希值表中的Value同时包含了原始的Join Key和Payload。
在Probe阶段算法遍历每一条数据记录先是使用同样的哈希函数以动态的方式On The Fly计算Join Key的哈希值。然后用计算得到的哈希值去查询刚刚在Build阶段创建好的哈希表。如果查询失败说明该条记录与维度表中的数据不存在关联关系如果查询成功则继续对比两边的Join Key。如果Join Key一致就把两边的记录进行拼接并输出从而完成数据关联。
## 分布式环境下的Join
掌握了这3种最主要的数据关联实现方式的工作原理之后在单机环境中无论是面对常见的Inner Join、Left Join、Right Join还是不常露面的Anti Join、Semi Join你都能对数据关联的性能调优做到游刃有余了。
不过你也可能会说“Spark毕竟是个分布式系统光学单机实现有什么用呀
所谓万变不离其宗,实际上,**相比单机环境分布式环境中的数据关联在计算环节依然遵循着NLJ、SMJ和HJ这3种实现方式只不过是增加了网络分发这一变数**。在Spark的分布式计算环境中数据在网络中的分发主要有两种方式分别是Shuffle和广播。那么不同的网络分发方式对于数据关联的计算又都有哪些影响呢
如果采用Shuffle的分发方式来完成数据关联那么外表和内表都需要按照Join Key在集群中做全量的数据分发。因为只有这样两个数据表中Join Key相同的数据记录才能分配到同一个Executor进程从而完成关联计算如下图所示。
![](https://static001.geekbang.org/resource/image/b1/28/b1b2a574eb7ef33e2315f547ecdc0328.jpg)
如果采用广播机制的话情况会大有不同。在这种情况下Spark只需要把内表基表封装到广播变量然后在全网进行分发。由于广播变量中包含了内表的**全量数据**,因此体量较大的外表只要“待在原地、保持不动”,就能轻松地完成关联计算,如下图所示。
![](https://static001.geekbang.org/resource/image/b3/2a/b3c5ab392c2303bf7923488623b4022a.jpg)
不难发现结合Shuffle、广播这两种网络分发方式和NLJ、SMJ、HJ这3种计算方式对于分布式环境下的数据关联我们就能组合出6种Join策略如下图所示。
![](https://static001.geekbang.org/resource/image/e9/48/e9bf1720ac13289a9e49e0f33a334548.jpg)
这6种Join策略对应图中6个青色圆角矩形从上到下颜色依次变浅它们分别是Cartesian Product Join、Shuffle Sort Merge Join和Shuffle Hash Join。也就是采用Shuffle机制实现的NLJ、SMJ和HJ以及Broadcast Nested Loop Join、Broadcast Sort Merge Join和Broadcast Hash Join。
**从执行性能来说6种策略从上到下由弱变强。**相比之下CPJ的执行效率是所有实现方式当中最差的网络开销、计算开销都很大因而在图中的颜色也是最深的。BHJ是最好的分布式数据关联机制网络开销和计算开销都是最小的因而颜色也最浅。此外你可能也注意到了Broadcast Sort Merge Join被标记成了灰色这是因为Spark并没有选择支持Broadcast + Sort Merge Join这种组合方式。
那么问题来了明明是6种组合策略为什么Spark偏偏没有支持这一种呢要回答这个问题我们就要回过头来对比SMJ与HJ实现方式的差异与优劣势。
相比SMJHJ并不要求参与Join的两张表有序也不需要维护两个游标来判断当前的记录位置只要基表在Build阶段构建的哈希表可以放进内存HJ算法就可以在Probe阶段遍历外表依次与哈希表进行关联。
当数据能以广播的形式在网络中进行分发时说明被分发的数据也就是基表的数据足够小完全可以放到内存中去。这个时候相比NLJ、SMJHJ的执行效率是最高的。因此在可以采用HJ的情况下Spark自然就没有必要再去用SMJ这种前置开销比较大的方式去完成数据关联。
## Spark如何选择Join策略
那么在不同的数据关联场景中对于这5种Join策略来说也就是CPJ、BNLJ、SHJ、SMJ以及BHJSpark会基于什么逻辑取舍呢我们来分两种情况进行讨论分别是等值Join和不等值Join。
### 等值Join下Spark如何选择Join策略
等值Join是指两张表的Join Key是通过等值条件连接在一起的。在日常的开发中这种Join形式是最常见的如t1 inner join t2 on **t1.id = t2.id**
**在等值数据关联中Spark会尝试按照BHJ > SMJ > SHJ的顺序依次选择Join策略。**在这三种策略中执行效率最高的是BHJ其次是SHJ再次是SMJ。其中SMJ和SHJ策略支持所有连接类型如全连接、Anti Join等等。BHJ尽管效率最高但是有两个前提条件一是连接类型不能是全连接Full Outer Join二是基表要足够小可以放到广播变量里面去。
那为什么SHJ比SMJ执行效率高排名却不如SMJ靠前呢这是个非常好的问题。我们先来说结论相比SHJSpark优先选择SMJ的原因在于SMJ的实现方式更加稳定更不容易OOM。
回顾HJ的实现机制在Build阶段算法根据内表创建哈希表。在Probe阶段为了让外表能够成功“探测”Probe到每一个Hash Key哈希表要全部放进内存才行。坦白说这个前提还是蛮苛刻的仅这一点要求就足以让Spark对其望而却步。要知道在不同的计算场景中数据分布的多样性很难保证内表一定能全部放进内存。
而且在Spark中SHJ策略要想被选中必须要满足两个先决条件这两个条件都是对数据尺寸的要求。**首先外表大小至少是内表的3倍。其次内表数据分片的平均大小要小于广播变量阈值。**第一个条件的动机很好理解只有当内外表的尺寸悬殊到一定程度时HJ的优势才会比SMJ更显著。第二个限制的目的是确保内表的每一个数据分片都能全部放进内存。
和SHJ相比SMJ没有这么多的附加条件无论是单表排序还是两表做归并关联都可以借助磁盘来完成。内存中放不下的数据可以临时溢出到磁盘。单表排序的过程我们可以参考Shuffle Map阶段生成中间文件的过程。在做归并关联的时候算法可以把磁盘中的有序数据用合理的粒度依次加载进内存完成计算。这个粒度可大可小大到以数据分片为单位小到逐条扫描。
正是考虑到这些因素相比SHJSpark SQL会优先选择SMJ。事实上在配置项spark.sql.join.preferSortMergeJoin默认为True的情况下Spark SQL会用SMJ策略来兜底确保作业执行的稳定性压根就不会打算去尝试SHJ。开发者如果想通过配置项来调整Join策略需要把这个参数改为False这样Spark SQL才有可能去尝试SHJ。
### 不等值Join下Spark如何选择Join策略
接下来我们再来说说不等值Join它指的是两张表的Join Key是通过不等值条件连接在一起的。不等值Join其实我们在以前的例子中也见过比如像查询语句t1 inner join t2 on **t1.date > t2.beginDate and t1.date <= t2.endDate**,其中的关联关系是依靠不等式连接在一起的。
**由于不等值Join只能使用NLJ来实现因此Spark SQL可选的Join策略只剩下BNLJ和CPJ。**在同一种计算模式下相比Shuffle广播的网络开销更小。显然在两种策略的选择上Spark SQL一定会按照BNLJ > CPJ的顺序进行尝试。当然BNLJ生效的前提自然是内表小到可以放进广播变量。如果这个条件不成立那么Spark SQL只好委曲求全使用笨重的CPJ策略去完成关联计算。
## 开发者能做些什么?
最后我们再来聊聊面对上述的5种Join策略开发者还能做些什么呢通过上面的分析我们不难发现Spark SQL对于这些策略的取舍也基于一些既定的规则。所谓计划赶不上变化预置的规则自然很难覆盖多样且变化无常的计算场景。因此当我们掌握了不同Join策略的工作原理结合我们对于业务和数据的深刻理解完全可以自行决定应该选择哪种Join策略。
![](https://static001.geekbang.org/resource/image/94/b8/9436f0f9352ffa381b238be57d2ecdb8.jpeg)
在最新发布的3.0版本中Spark为开发者提供了多样化的Join Hints允许你把专家经验凌驾于Spark SQL的选择逻辑之上。**在满足前提条件的情况下如等值条件、连接类型、表大小等等Spark会优先尊重开发者的意愿去选取开发者通过Join Hints指定的Join策略。**关于Spark 3.0支持的Join Hints关键字以及对应的适用场景我把它们总结到了如上的表格中你可以直接拿来参考。
简单来说你可以使用两种方式来指定Join Hints一种是通过SQL结构化查询语句另一种是使用DataFrame的DSL语言都很方便。至于更全面的讲解你可以去[第13讲](https://time.geekbang.org/column/article/360837)看看,这里我就不多说了。
## 小结
这一讲我们从数据关联的实现原理到Spark SQL不同Join策略的适用场景掌握这些关键知识点对于数据关联场景中的性能调优至关重要。
首先你需要掌握3种Join实现机制的工作原理。为了方便你对比我把它们总结在了下面的表格里。
![](https://static001.geekbang.org/resource/image/86/bc/86bb13a5b7b96da4f5128df8b54b96bc.jpeg)
掌握了3种关联机制的实现原理你就能更好地理解Spark SQL的Join策略。结合数据的网络分发方式Shuffle和广播Spark SQL支持5种Join策略按照执行效率排序就是BHJ > SHJ > SMJ > BNLJ > CPJ。同样为了方便对比你也可以直接看下面的表格。
![](https://static001.geekbang.org/resource/image/7b/9e/7be7f01b383463f804a2db74a68d5e9e.jpeg)
最后当你掌握了不同Join策略的工作原理结合对于业务和数据的深刻理解实际上你可以自行决定应该选择哪种Join策略不必完全依赖Spark SQL的判断。
Spark为开发者提供了多样化的Join Hints允许你把专家经验凌驾于Spark SQL的选择逻辑之上。比如当你确信外表比内表大得多而且内表数据分布均匀使用SHJ远比默认的SMJ效率高得多的时候你就可以通过指定Join Hints来强制Spark SQL按照你的意愿去选择Join策略。
## 每日一练
1. 如果关联的场景是事实表Join事实表你觉得我们今天讲的Sort Merge Join实现方式还适用吗如果让你来设计算法的实现步骤你会怎么做
2. 你觉得不等值Join可以强行用Sort Merge Join和Hash Join两种机制来实现吗为什么
期待在留言区看到你的思考和答案,我们下一讲见!