You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
20 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 29 | 大表Join大表什么是负隅顽抗的调优思路
你好,我是吴磊。
在上一讲我们说了应对“大表Join大表”的第一种调优思路是分而治之也就是把一个庞大而又复杂的Shuffle Join转化为多个轻量的Broadcast Joins。这一讲我们接着来讲第二种调优思路负隅顽抗。
负隅顽抗指的是当内表没法做到均匀拆分或是外表压根就没有分区键不能利用DPP只能依赖Shuffle Join去完成大表与大表的情况下我们可以采用的调优方法和手段。这类方法比较庞杂适用的场景各不相同。从数据分布的角度出发我们可以把它们分两种常见的情况来讨论分别是数据分布均匀和数据倾斜。
我们先来说说在数据分布均匀的情况下如何应对“大表Join大表”的计算场景。
## 数据分布均匀
在第27讲的最后我们说过当参与关联的大表与小表满足如下条件的时候Shuffle Hash Join的执行效率往往比Spark SQL默认的Shuffle Sort Merge Join更好。
* 两张表数据分布均匀。
* 内表所有数据分片,能够完全放入内存。
实际上这个调优技巧同样适用于“大表Join大表”的场景原因其实很简单这两个条件与数据表本身的尺寸无关只与其是否分布均匀有关。不过为了确保Shuffle Hash Join计算的稳定性我们需要特别注意上面列出的第二个条件也就是内表所有的数据分片都能够放入内存。
那么问题来了我们怎么确保第二个条件得以成立呢其实只要处理好并行度、并发度与执行内存之间的关系我们就可以让内表的每一个数据分片都恰好放入执行内存中。简单来说就是先根据并发度与执行内存计算出可供每个Task消耗的内存上下限然后结合分布式数据集尺寸与上下限倒推出与之匹配的并行度。更详细的内容你可以去看看[第14讲](https://time.geekbang.org/column/article/362710)。
那我们该如何强制Spark SQL在运行时选择Shuffle Hash Join机制呢答案就是利用Join Hints。这个技巧我们讲过很多次了所以这里我直接以上一讲中的查询为例把它的使用方法写在了下面方便你复习。
```
//查询语句中使用Join hints
select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as revenue, o.orderId
from transactions as tx inner join orders as o
on tx.orderId = o.orderId
where o.status = COMPLETE
and o.date between 2020-01-01 and 2020-03-31
group by o.orderId
```
## 数据倾斜
接下来我们再说说当参与Join的两张表存在数据倾斜问题的时候我们该如何应对“大表Join大表”的计算场景。对于“大表Join大表”的数据倾斜问题根据倾斜位置的不同我们可以分为3种情况来讨论。
![](https://static001.geekbang.org/resource/image/be/73/beb46de87f456924fc1414b93f8c0a73.jpeg "大表Join大表数据倾斜的3种情况")
其实,不管哪种表倾斜,它们的调优技巧都是类似的。因此,我们就以第一种情况为例,也就是外表倾斜、内表分布均匀的情况,去探讨数据倾斜的应对方法。
### 以Task为粒度解决数据倾斜
学过AQE之后要应对数据倾斜想必你很快就会想到AQE的特性自动倾斜处理。给定如下配置项参数Spark SQL在运行时可以将策略OptimizeSkewedJoin插入到物理计划中自动完成Join过程中对于数据倾斜的处理。
* spark.sql.adaptive.skewJoin.skewedPartitionFactor判定倾斜的膨胀系数。
* spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes判定倾斜的最低阈值。
* spark.sql.adaptive.advisoryPartitionSizeInBytes以字节为单位定义拆分粒度。
![](https://static001.geekbang.org/resource/image/33/a9/33a112480b1c1bf8b21d26412a7857a9.jpg "AQE中的自动倾斜处理")
Join过程中的自动倾斜处理如上图所示当AQE检测到外表存在倾斜分区之后它会以spark.sql.adaptive.advisoryPartitionSizeInBytes配置的数值为拆分粒度把倾斜分区拆分为多个数据分区。与此同时AQE还需要对内表中对应的数据分区进行复制来保护两表之间的关联关系。
有了AQE的自动倾斜处理特性在应对数据倾斜问题的时候我们确实能够大幅节省开发成本。不过天下没有免费的午餐AQE的倾斜处理是以Task为粒度的这意味着原本Executors之间的负载倾斜并没有得到根本改善。这到底是什么意思呢
![](https://static001.geekbang.org/resource/image/f4/72/f4fe3149112466174bdefcc0ee573d72.jpg "以Task为粒度的负载均衡")
我们来举个例子假设某张表在Shuffle过后有两个倾斜分区如上图它们又刚好都被Shuffle到了同一个执行器Executor 0。在AQE的自动倾斜处理机制下两个倾斜分区分别被拆分变成了4个尺寸适中的数据分区。如此一来Executor 0中所有Task的计算负载都得到了平衡。但是相比Executor 1Executor 0整体的计算负载还是那么多并没有因为AQE的自动处理而得到任何缓解。
### 以Executor为粒度解决数据倾斜
你也许会说“哪会那么凑巧倾斜的分区刚好全都落在同一个Executor上”确实刚才的例子主要是为了帮你解释清楚倾斜粒度这个概念如果实际应用中倾斜分区在集群中的分布比较平均的话AQE的自动倾斜处理机制确实就是开发者的“灵丹妙药”。
然而凡事总有个万一我们在探讨调优方案的时候还是要考虑周全如果你的场景就和咱们的例子一样倾斜分区刚好落在集群中少数的Executors上你该怎么办呢答案是“分而治之”和“两阶段Shuffle”。
这里的分而治之与上一讲的分而治之在思想上是一致的都是以任务分解的方式来解决复杂问题。区别在于我们今天要讲的是以Join Key是否倾斜为依据来拆解子任务。具体来说对于外表中所有的Join Keys我们先按照是否存在倾斜把它们分为两组。一组是存在倾斜问题的Join Keys另一组是分布均匀的Join Keys。因为给定两组不同的Join Keys相应地我们把内表的数据也分为两份。
![](https://static001.geekbang.org/resource/image/c2/e2/c22de99104b0a9cb0d5cfdffebd42ee2.jpg "分而治之按照Join Keys是否倾斜对数据进行分组")
那么分而治之的含义就是对于内外表中两组不同的数据我们分别采用不同的方法做关联计算然后通过Union操作再把两个关联计算的结果集做合并最终得到“大表Join大表”的计算结果整个过程如上图所示。
对于Join Keys分布均匀的数据部分我们可以沿用把Shuffle Sort Merge Join转化为Shuffle Hash Join的方法。对于Join Keys存在倾斜问题的数据部分我们就需要借助“两阶段Shuffle”的调优技巧来平衡Executors之间的工作负载。那么什么是“两阶段Shuffle”呢
#### 如何理解“两阶段Shuffle”
用一句话来概括“两阶段Shuffle”指的是通过“加盐、Shuffle、关联、聚合”与“去盐化、Shuffle、聚合”这两个阶段的计算过程在不破坏原有关联关系的前提下在集群范围内以Executors为粒度平衡计算负载 。
![](https://static001.geekbang.org/resource/image/34/21/348ddabcd5f9980de114ae9d5b96d321.jpg "两阶段Shuffle")
我们先来说说第一阶段也就是“加盐、Shuffle、关联、聚合”的计算过程。显然这个阶段的计算分为4个步骤其中最为关键的就是第一步的加盐。加盐来源于单词Salting听上去挺玄乎实际上就是给倾斜的Join Keys添加后缀。加盐的核心作用就是把原本集中倾斜的Join Keys打散在进行Shuffle操作的时候让原本应该分发到某一个Executor的倾斜数据均摊到集群中的多个Executors上从而以这种方式来消除倾斜、平衡Executors之间的计算负载。
对于加盐操作我们首先需要确定加盐的粒度来控制数据打散的程度粒度越高加盐后的数据越分散。由于加盐的初衷是以Executors为粒度平衡计算负载因此通常来说取Executors总数#N作为加盐粒度往往是一种不错的选择。其次为了保持内外表的关联关系不被破坏外表和内表需要同时做加盐处理但处理方法稍有不同。
外表的处理称作“随机加盐”具体的操作方法是对于任意一个倾斜的Join Key我们都给它加上1到#N之间的一个随机后缀。以Join Key = 黄小乙来举例假设N = 5那么外表加盐之后原先Join Key = 黄小乙的所有数据记录就都被打散成了Join Key为黄小乙\_1黄小乙\_2黄小乙\_3黄小乙\_4黄小乙\_5的数据记录。
![](https://static001.geekbang.org/resource/image/cd/4f/cd1858531a08371047481120b0c3544f.jpg "外表加盐")
内表的处理称为“复制加盐”具体的操作方法是对于任意一个倾斜的Join Key我们都把原数据复制#N 1从而得到#N份数据副本。对于每一份副本我们为其Join Key追加1到#N之间的固定后缀让它与打散后的外表数据保持一致。对于刚刚Join Key = 黄小乙的例子来说在内表中我们需要把黄小乙的数据复制4份然后依次为每份数据的Join Key追加1到5的固定后缀如下图所示。
![](https://static001.geekbang.org/resource/image/8d/22/8d843fb98d834df38080a68064522322.jpg "内表加盐")
内外表分别加盐之后数据倾斜问题就被消除了。这个时候我们就可以使用常规优化方法比如将Shuffle Sort Merge Join转化为Shuffle Hash Join去继续执行Shuffle、关联和聚合操作。到此为止“两阶段Shuffle” 的第一阶段执行完毕我们得到了初步的聚合结果这些结果是以打散的Join Keys为粒度进行计算得到的。
![](https://static001.geekbang.org/resource/image/8d/22/8d843fb98d834df38080a68064522322.jpg "一阶段Shuffle、关联、聚合")
我们刚刚说,第一阶段加盐的目的在于将数据打散、平衡计算负载。现在我们已经得到了数据打散之后初步的聚合结果,离最终的计算结果仅有一步之遥。不过,为了还原最初的计算逻辑,我们还需要把之前加上的“盐粒”再去掉。
![](https://static001.geekbang.org/resource/image/36/53/36d1829e2b550a3079707eee9712d253.jpg "二阶段Shuffle、聚合")
第二阶段的计算包含“去盐化、Shuffle、聚合”这3个步骤。首先我们把每一个Join Key的后缀去掉这一步叫做“去盐化”。然后我们按照原来的Join Key再做一遍Shuffle和聚合计算这一步计算得到的结果就是“分而治之”当中倾斜部分的计算结果。
经过“两阶段Shuffle”的计算优化我们终于得到了倾斜部分的关联结果。将这部分结果与“分而治之”当中均匀部分的计算结果合并我们就能完成存在倾斜问题的“大表Join大表”的计算场景。
#### 以Executors为粒度的调优实战
应该说以Executors为粒度平衡计算负载的优化过程是我们学习过的调优技巧中最复杂的。因此咱们有必要结合实际的应用案例来详细讲解具体的实现方法。为了方便你对不同的调优方法做对比我们不妨以上一讲跨境电商的场景为例来讲。
咱们先来回顾一下这家电商的业务需求给定orders和transactions两张体量都在TB级别的事实表每隔一段时间就计算一次上一个季度所有订单的交易额具体的业务代码如下所示。
```
//统计订单交易额的代码实现
val txFile: String = _
val orderFile: String = _
val transactions: DataFrame = spark.read.parquent(txFile)
val orders: DataFrame = spark.read.parquent(orderFile)
transactions.createOrReplaceTempView(“transactions”)
orders.createOrReplaceTempView(“orders”)
val query: String = “
select sum(tx.price * tx.quantity) as revenue, o.orderId
from transactions as tx inner join orders as o
on tx.orderId = o.orderId
where o.status = COMPLETE
and o.date between 2020-01-01 and 2020-03-31
group by o.orderId
val outFile: String = _
spark.sql(query).save.parquet(outFile)
```
对于这样一个查询语句我们该如何实现刚刚说过的优化过程呢首先我们先遵循“分而治之”的思想把内外表的数据分为两个部分。第一部分包含所有存在倾斜问题的Join Keys及其对应的Payloads第二部分保留的是分布均匀的Join Keys和相应的Payloads。假设我们把所有倾斜的orderId也就是Join Key保存在数组skewOrderIds中而把分布均匀的orderId保持在数组evenOrderIds中我们就可以使用这两个数组把内外表各自拆分为两部分。
```
//根据Join Keys是否倾斜、将内外表分别拆分为两部分
import org.apache.spark.sql.functions.array_contains
//将Join Keys分为两组存在倾斜的、和分布均匀的
val skewOrderIds: Array[Int] = _
val evenOrderIds: Array[Int] = _
val skewTx: DataFrame = transactions.filter(array_contains(lit(skewOrderIds),$"orderId"))
val evenTx: DataFrame = transactions.filter(array_contains(lit(evenOrderIds),$"orderId"))
val skewOrders: DataFrame = orders.filter(array_contains(lit(skewOrderIds),$"orderId"))
val evenOrders: DataFrame = orders.filter(array_contains(lit(evenOrderIds),$"orderId"))
```
拆分完成之后我们就可以延续“分而治之”的思想分别对这两部分应用不同的调优技巧。对于分布均匀的部分我们把Shuffle Sort Merge Join转化为Shuffle Hash Join。
```
//将分布均匀的数据分别注册为临时表
evenTx.createOrReplaceTempView(“evenTx”)
evenOrders.createOrReplaceTempView(“evenOrders”)
val evenQuery: String = “
select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as revenue, o.orderId
from evenTx as tx inner join evenOrders as o
on tx.orderId = o.orderId
where o.status = COMPLETE
and o.date between 2020-01-01 and 2020-03-31
group by o.orderId
val evenResults: DataFrame = spark.sql(evenQuery)
```
对于存在倾斜的部分我们要祭出“两阶段Shuffle”的杀手锏。首先在第一阶段我们需要给两张表分别加盐对外表交易表做“随机加盐”对内表订单表做“复制加盐”。
```
import org.apache.spark.sql.functions.udf
//定义获取随机盐粒的UDF
val numExecutors: Int = _
val rand = () => scala.util.Random.nextInt(numExecutors)
val randUdf = udf(rand)
//第一阶段的加盐操作。注意保留orderId字段用于后期第二阶段的去盐化
//外表随机加盐
val saltedSkewTx = skewTx.withColumn(“joinKey”, concat($“orderId”, lit(“_”), randUdf()))
//内表复制加盐
var saltedskewOrders = skewOrders.withColumn(“joinKey”, concat($“orderId”, lit(“_”), lit(1)))
for (i <- 2 to numExecutors) {
saltedskewOrders = saltedskewOrders union skewOrders.withColumn(“joinKey”, concat($“orderId”, lit(“_”), lit(i)))
}
```
两张表分别做完加盐处理之后我们就可以使用与之前类似的查询语句对它们执行后续的Shuffle、关联与聚合等操作。
```
//将加盐后的数据分别注册为临时表
saltedSkewTx.createOrReplaceTempView(“saltedSkewTx”)
saltedskewOrders.createOrReplaceTempView(“saltedskewOrders”)
val skewQuery: String = “
select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as initialRevenue, o.orderId, o.joinKey
from saltedSkewTx as tx inner join saltedskewOrders as o
on tx.joinKey = o.joinKey
where o.status = COMPLETE
and o.date between 2020-01-01 and 2020-03-31
group by o.joinKey
//第一阶段加盐、Shuffle、关联、聚合后的初步结果
val skewInitialResults: DataFrame = spark.sql(skewQuery)
```
得到第一阶段的初步结果之后我们就可以开始执行第二阶段的计算了也就是“去盐化、Shuffle与聚合”这三个操作。去盐化的目的实际上就是把计算的粒度从加盐的joinKey恢复为原来的orderId。由于在最初加盐的时候我们对orderId字段进行了保留因此在第二阶段的计算中我们只要在orderId字段之上执行聚合操作就能达到我们想要的“去盐化”效果。
```
val skewResults: DataFrame = skewInitialResults.select(“initialRevenue”, “orderId”)
.groupBy(col(“orderId”)).agg(sum(col(“initialRevenue”)).alias(“revenue”))
```
在完成了第二阶段的计算之后我们拿到了“两阶段Shuffle”的计算结果。最终只需要把这份结果与先前均匀部分的关联结果进行合并我们就能实现以Executors为粒度平衡计算负载的优化过程。
```
evenResults union skewResults
```
#### 执行性能与开发成本的博弈
你可能会说“我的天呐为了优化这个场景的计算这得花多大的开发成本啊又是分而治之又是两阶段Shuffle的这么大的开发投入真的值得吗
这个问题非常好。我们要明确的是分而治之外加两阶段Shuffle的调优技巧的初衷是为了解决AQE无法以Executors为粒度平衡计算负载的问题。因此这项技巧取舍的关键就在于Executors之间的负载倾斜是否构成整个关联计算的性能瓶颈。如果这个问题的答案是肯定的我们的投入就是值得的。
## 小结
今天这一讲你需要掌握以Shuffle Join的方式去应对“大表Join大表”的计算场景。数据分布不同应对方法也不尽相同。
当参与Join的两张表数据分布比较均匀而且内表的数据分片能够完全放入内存Shuffle Hash Join的计算效率往往高于Shuffle Sort Merge Join后者是Spark SQL默认的关联机制。你可以使用关键字“shuffle\_hash”的Join Hints强制Spark SQL在运行时选择Shuffle Hash Join实现机制。对于内表数据分片不能放入内存的情况你可以结合“三足鼎立”的调优技巧调整并行度、并发度与执行内存这三类参数来满足这一前提条件。
当参与Join的两张表存在数据倾斜时如果倾斜的情况在集群内的Executors之间较为均衡那么最佳的处理方法就是利用AQE提供的自动倾斜处理机制。你只需要设置好以下三个参数剩下的事情交给AQE就好了。
* spark.sql.adaptive.skewJoin.skewedPartitionFactor判定倾斜的膨胀系数。
* spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes判定倾斜的最低阈值。
* spark.sql.adaptive.advisoryPartitionSizeInBytes以字节为单位定义拆分粒度。
但是如果倾斜问题仅集中在少数的几个Executors中而且这些负载过高的Executors已然成为性能瓶颈我们就需要采用“分而治之”外加“两阶段Shuffle”的调优技巧去应对。“分而治之”指的是根据Join Keys的倾斜与否将内外表的数据分为两部分分别处理。其中均匀的部分可以使用Shuffle Hash Join来完成计算倾斜的部分需要用“两阶段Shuffle”进行处理。
两阶段Shuffle的关键在于加盐和去盐化。加盐的目的是打散数据分布、平衡Executors之间的计算负载从而消除Executors单点瓶颈。去盐化的目的是还原原始的关联逻辑。尽管两阶段Shuffle的开发成本较高但只要获得的性能收益足够显著我们的投入就是值得的。
## 每日一练
1. 当尝试将Join Keys是否倾斜作为“分而治之”的划分依据时你觉得我们该依据什么标准把Join Keys划分为倾斜组和非倾斜组呢
2. 无论是AQE的自动倾斜处理还是开发者的“两阶段Shuffle”本质上都是通过“加盐”与“去盐化”的两步走在维持关联关系的同时平衡不同粒度下的计算负载。那么这种“加盐”与“去盐化”的优化技巧是否适用于所有的关联场景如果不是都有哪些场景没办法利用AQE的自动倾斜处理或是我们的“两阶段Shuffle”呢
期待在留言区看到你的思考和答案,我们下一讲见!