You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
14 KiB
Markdown

2 years ago
# 18 | 数据关联优化都有哪些Join策略开发者该如何取舍
你好,我是吴磊。
在上一讲,我们分别从关联形式与实现机制这两个方面,对数据分析进行了讲解和介绍。对于不同关联形式的用法和实现机制的原理,想必你已经了然于胸。不过,在大数据的应用场景中,数据的处理往往是在分布式的环境下进行的,在这种情况下,数据关联的计算还要考虑网络分发这个环节。
我们知道在分布式环境中Spark支持两类数据分发模式。一类是我们在[第7讲](https://time.geekbang.org/column/article/421566)学过的ShuffleShuffle通过中间文件来完成Map阶段与Reduce阶段的数据交换因此它会引入大量的磁盘与网络开销。另一类是我们在[第10讲](https://time.geekbang.org/column/article/423878)介绍的广播变量Broadcast Variables广播变量在Driver端创建并由Driver分发到各个Executors。
因此从数据分发模式的角度出发数据关联又可以分为Shuffle Join和Broadcast Join这两大类。将两种分发模式与Join本身的3种实现机制相结合就会衍生出分布式环境下的6种Join策略。
那么对于这6种Join策略Spark SQL是如何支持的呢它们的优劣势与适用场景都有哪些开发者能否针对这些策略有的放矢地进行取舍今天这一讲咱们就来聊聊这些话题。
## Join实现机制的优势对比
首先我们先来说一说不同Join实现机制本身的一些特性与适用场景从而为后续的讨论打好基础。需要说明的是咱们这里说的Join实现机制指的是算法层面的工作原理不同的算法有着不同的适用场景与复杂度我们需要对它们有足够认识并有所区分。
我们知道Join支持3种实现机制它们分别是Hash Join、Sort Merge Join和Nested Loop Join。三者之中Hash Join的执行效率最高这主要得益于哈希表O(1)的查找效率。不过在Probe阶段享受哈希表的“性能红利”之前Build阶段得先在内存中构建出哈希表才行。因此Hash Join这种算法对于内存的要求比较高适用于内存能够容纳基表数据的计算场景。
相比之下Sort Merge Join就没有内存方面的限制。不论是排序、还是合并SMJ都可以利用磁盘来完成计算。所以在稳定性这方面SMJ更胜一筹。
而且与Hash Join相比SMJ的执行效率也没有差太多前者是O(M)后者是O(M + N)可以说是不分伯仲。当然O(M + N)的复杂度得益于SMJ的排序阶段。因此如果准备参与Join的两张表是有序表那么这个时候采用SMJ算法来实现关联简直是再好不过了。
与前两者相比Nested Loop Join看上去有些多余嵌套的双层for循环带来的计算复杂度最高O(M \* N)。不过尺有所短寸有所长执行高效的HJ和SMJ只能用于等值关联也就是说关联条件必须是等式像salaries(“id”) < employees(“id”)这样的关联条件,HJSMJ是无能为力的。相反,NLJ既可以处理等值关联(Equi Join),也可以应付不等值关联(Non Equi Join),可以说是数据关联在实现机制上的最后一道防线。
## Shuffle Join与Broadcast Join
分析完不同Join机制的优缺点之后接下来我们再来说说分布式环境下的Join策略。与单机环境不同在分布式环境中两张表的数据各自散落在不同的计算节点与Executors进程。因此要想完成数据关联Spark SQL就必须先要把Join Keys相同的数据分发到同一个Executors中去才行。
我们还是用上一讲的员工信息和薪资表来举例如果我们打算对salaries和employees两张表按照id列做关联那么对于id字段值相同的薪资数据与员工数据**我们必须要保证它们坐落在同样的Executors进程里**Spark SQL才能利用刚刚说的HJ、SMJ、以及NLJ以Executors进程为粒度并行地完成数据关联。
换句话说以Join Keys为基准两张表的数据分布保持一致是Spark SQL执行分布式数据关联的前提。**而能满足这个前提的途径只有两个Shuffle与广播**。这里我额外提醒一下Shuffle和广播变量我们在前面的课程有过详细的介绍如果你记不太清了不妨翻回去看一看。
回到正题开篇咱们说到如果按照分发模式来划分数据关联可以分为Shuffle Join和Broadcast Join两大类。通常来说在执行性能方面相比Shuffle JoinBroadcast Join往往会更胜一筹。为什么这么说呢
接下来我们就一起来分析分析这两大类Join在分布式环境下的执行过程~~然后再来回答这个问题。~~理解了执行过程,你自然就能解答这个问题了。
### Shuffle Join
在没有开发者干预的情况下Spark SQL默认采用Shuffle Join来完成分布式环境下的数据关联。对于参与Join的两张数据表Spark SQL先是按照如下规则来决定不同数据记录应当分发到哪个Executors中去
* 根据Join Keys计算哈希值
* 将哈希值对并行度Parallelism取模
由于左表与右表在并行度分区数上是一致的因此按照同样的规则分发数据之后一定能够保证id字段值相同的薪资数据与员工数据坐落在同样的Executors中。
![图片](https://static001.geekbang.org/resource/image/5a/6a/5a5531dd1dea1b2d5c710f026c5aae6a.jpg?wh=1920x726 "Shuffle Join工作原理")
如上图所示颜色相同的矩形代表Join Keys相同的数据记录可以看到在Map阶段数据分散在不同的Executors当中。经过Shuffle过后Join Keys相同的记录被分发到了同样的Executors中去。接下来在Reduce阶段Reduce Task就可以使用HJ、SMJ、或是NLJ算法在Executors内部完成数据关联的计算。
Spark SQL之所以在默认情况下一律采用Shuffle Join原因在于Shuffle Join的“万金油”属性。也就是说**在任何情况下不论数据的体量是大是小、不管内存是否足够Shuffle Join在功能上都能够“不辱使命”成功地完成数据关联的计算**。然而,有得必有失,功能上的完备性,往往伴随着的是性能上的损耗。
学习过 [Shuffle的原理](https://time.geekbang.org/column/article/420399)第6讲之后不用我多说Shuffle的弊端想必你早已烂熟于心。我们知道从CPU到内存从磁盘到网络Shuffle的计算几乎需要消耗所有类型的硬件资源。尤其是磁盘和网络开销这两座大山往往是应用执行的性能瓶颈。
那么问题来了除了Shuffle Join这种“万金油”式的Join策略开发者还有没有其他效率更高的选择呢答案当然是肯定的Broadcast Join就是用来克制Shuffle的“杀手锏”。
### Broadcast Join
在广播变量那一讲第10讲我们讲过把用户数据结构封装为广播变量的过程。实际上Spark不仅可以在普通变量上创建广播变量在分布式数据集如RDD、DataFrame之上也可以创建广播变量。这样一来对于参与Join的两张表我们可以把其中较小的一个封装为广播变量然后再让它们进行关联。
光说思路你可能体会不深,我们还是结合例子理解。以薪资表和员工表为例,只要对代码稍加改动,我们就能充分利用广播变量的优势。
更改后的代码如下所示。
```scala
import org.apache.spark.sql.functions.broadcast
 
// 创建员工表的广播变量
val bcEmployees = broadcast(employees)
 
// 内关联PS将原来的employees替换为bcEmployees
val jointDF: DataFrame = salaries.join(bcEmployees, salaries("id") === employees("id"), "inner")
```
在Broadcast Join的执行过程中Spark SQL首先从各个Executors收集employees表所有的数据分片然后在Driver端构建广播变量bcEmployees构建的过程如下图实线部分所示。
![图片](https://static001.geekbang.org/resource/image/48/4d/4814b57f9d9e4a98217773f161d9de4d.jpg?wh=1920x697 "Broadcast Join工作原理")
可以看到散落在不同Executors内花花绿绿的矩形代表的正是employees表的数据分片。这些数据分片聚集到一起就构成了广播变量。接下来如图中虚线部分所示携带着employees表全量数据的广播变量bcEmployees被分发到了全网所有的Executors当中去。
在这种情况下体量较大的薪资表数据只要“待在原地、保持不动”就可以轻松关联到跟它保持之一致的员工表数据了。通过这种方式Spark SQL成功地避开了Shuffle这种“劳师动众”的数据分发过程转而用广播变量的分发取而代之。
尽管广播变量的创建与分发同样需要消耗网络带宽但相比Shuffle Join中两张表的全网分发因为仅仅**通过分发体量较小的数据表来完成数据关联Spark SQL的执行性能显然要高效得多**。这种小投入、大产出,用极小的成本去博取高额的性能收益,可以说是“四两拨千斤”!
## Spark SQL支持的Join策略
不论是Shuffle Join还是Broadcast Join一旦数据分发完毕理论上可以采用HJ、SMJ和NLJ这3种实现机制中的任意一种完成Executors内部的数据关联。因此两种分发模式与三种实现机制它们组合起来总共有6种分布式Join策略如下图所示。
![图片](https://static001.geekbang.org/resource/image/ec/74/ece8f8d14fcb5d089d61ac76376f9874.jpg?wh=1920x754 "6种分布式Join策略")
虽然组合起来选择多样,但你也不必死记硬背,抓住里面的规律才是关键,我们一起来分析看看。
在这6种Join策略中Spark SQL支持其中的5种来应对不用的关联场景也即图中蓝色的5个矩形。对于等值关联Equi JoinSpark SQL优先考虑采用Broadcast HJ策略其次是Shuffle SMJ最次是Shuffle HJ。对于不等值关联Non Equi JoinSpark SQL优先考虑Broadcast NLJ其次是Shuffle NLJ。
![图片](https://static001.geekbang.org/resource/image/f6/a2/f661e892f7d1179aff2532d27caa9da2.jpg?wh=1905x504 "Spark SQL对不同Join策略的选择倾向")
不难发现,**不论是等值关联、还是不等值关联只要Broadcast Join的前提条件成立Spark SQL一定会优先选择Broadcast Join相关的策略**。那么问题来了Broadcast Join的前提条件是什么呢
回顾Broadcast Join的工作原理图我们不难发现**Broadcast Join得以实施的基础是被广播数据表图中的表2的全量数据能够完全放入Driver的内存、以及各个Executors的内存**,如下图所示。
![图片](https://static001.geekbang.org/resource/image/19/aa/19429ae459a3fdeeac0b16a76435f1aa.jpeg?wh=1378x394 "Broadcast Join工作原理")
另外为了避免因广播表尺寸过大而引入新的性能隐患Spark SQL要求被广播表的内存大小不能超过8GB。
这里我们简单总结一下。只要被广播表满足上述两个条件我们就可以利用SQL Functions中的broadcast函数来创建广播变量进而利用Broadcast Join策略来提升执行性能。
当然在Broadcast Join前提条件不成立的情况下Spark SQL就会退化到Shuffle Join的策略。在不等值的数据关联中Spark SQL只有Shuffle NLJ这一种选择因此咱们无需赘述。
但在等值关联的场景中Spark SQL有Shuffle SMJ和Shuffle HJ这两种选择。尽管如此Shuffle SMJ与Shuffle HJ的关系就像是关羽和周仓的关系。周仓虽说武艺也不错但他向来只是站在关公后面提刀。大战在即刘备仰仗的自然是站在前面的关羽而很少启用后面的周仓。在Shuffle SMJ与Shuffle HJ的取舍上Spark SQL也是如此。
学习过Shuffle之后我们知道Shuffle在Map阶段往往会对数据做排序而这恰恰正中SMJ机制的下怀。对于已经排好序的两张表SMJ的复杂度是O(M + N)这样的执行效率与HJ的O(M)可以说是不相上下。再者SMJ在执行稳定性方面远胜于HJ在内存受限的情况下SMJ可以充分利用磁盘来顺利地完成关联计算。因此考虑到Shuffle SMJ的诸多优势Shuffle HJ就像是关公后面的周仓Spark SQL向来对之视而不见所以对于HJ你大概知道它的作用就行。
## 重点回顾
好啦到此为止今天的课程就全部讲完了我们一起来做个总结。首先我们一起分析、对比了单机环境中不同Join机制的优劣势我把它们整理到了下面的表格中供你随时查看。
![图片](https://static001.geekbang.org/resource/image/d0/93/d08bb6866c98c5e308ee5f415d1f7d93.jpg?wh=1920x453)
在分布式环境中要想利用上述机制完成数据关联Spark SQL首先需要把两张表中Join Keys一致的数据分发到相同的Executors中。
因此数据分发是分布式数据关联的基础和前提。Spark SQL支持Shuffle和广播两种数据分发模式相应地Join也被分为Shuffle Join和Broadcast Join其中Shuffle Join是默认的关联策略。关于两种策略的优劣势对比我也整理到了如下的表格中供你参考。
![](https://static001.geekbang.org/resource/image/2b/04/2b8d95fe525d6f197f869cfd381c2d04.jpg?wh=2053x702)
结合三种实现机制和两种数据分发模式Spark SQL支持5种分布式Join策略。对于这些不同的Join策略Spark SQL有着自己的选择偏好我把它整理到了如下的表格中供你随时查看。
**其中Broadcast Join的生效前提是基表能够放进内存且存储尺寸小于8GB。只要前提条件成立Spark SQL就会优先选择Broadcast Join。**
![图片](https://static001.geekbang.org/resource/image/f6/a2/f661e892f7d1179aff2532d27caa9da2.jpg?wh=1905x504 "Spark SQL对不同Join策略的选择倾向")
## 每课一练
在6种分布式Join策略中Spark SQL唯独没有支持Broadcast SMJ你能想一想为什么Spark SQL没有选择支持这种Join策略吗提示一下你可以从SMJ与HJ的执行效率入手做分析。
欢迎你在留言区跟我交流互动,也推荐你把这一讲分享给更多同事、朋友。