You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
20 KiB
Markdown

2 years ago
# 09 | 调优一筹莫展,配置项速查手册让你事半功倍!(上)
你好,我是吴磊。
对于Spark性能调优来说应用开发和配置项设置是两个最主要也最常用的入口。但在日常的调优工作中每当我们需要从配置项入手寻找调优思路的时候一打开Spark官网的Configuration页面映入眼帘的就是上百个配置项。它们有的需要设置True或False有的需要给定明确的数值才能使用。这难免让我们蒙头转向、无所适从。
所以我经常在想如果能有一份Spark配置项手册上面分门别类地记录着与性能调优息息相关的配置项就好了肯定能省去不少麻烦。
那么,接下来的两讲,我们就来一起汇总这份手册。这份手册可以让你在寻找调优思路的时候,迅速地定位可能会用到的配置项,不仅有章可循,还能不丢不漏,真正做到事半功倍!
## 配置项的分类
事实上能够显著影响执行性能的配置项屈指可数更何况在Spark分布式计算环境中计算负载主要由Executors承担Driver主要负责分布式调度调优空间有限因此对Driver端的配置项我们不作考虑**我们要汇总的配置项都围绕Executors展开**。那么结合过往的实践经验以及对官网全量配置项的梳理我把它们划分为3类分别是硬件资源类、Shuffle类和Spark SQL大类。
为什么这么划分呢?我们一一来说。
**首先硬件资源类包含的是与CPU、内存、磁盘有关的配置项。**我们说过,调优的切入点是瓶颈,定位瓶颈的有效方法之一,就是从硬件的角度出发,观察某一类硬件资源的负载与消耗,是否远超其他类型的硬件,而且调优的过程收敛于所有硬件资源平衡、无瓶颈的状态,所以掌握资源类配置项就至关重要了。这类配置项设置得是否得当,决定了应用能否打破瓶颈,来平衡不同硬件的资源利用率。
**其次Shuffle类是专门针对Shuffle操作的。**在绝大多数场景下Shuffle都是性能瓶颈。因此我们需要专门汇总这些会影响Shuffle计算过程的配置项。同时Shuffle的调优难度也最高汇总Shuffle配置项能帮我们在调优的过程中锁定搜索范围充分节省时间。
**最后Spark SQL早已演化为新一代的底层优化引擎。**无论是在Streaming、Mllib、Graph等子框架中还是在PySpark中只要你使用DataFrame APISpark在运行时都会使用Spark SQL做统一优化。因此我们需要梳理出一类配置项去充分利用Spark SQL的先天性能优势。
我们一再强调硬件资源的平衡才是性能调优的关键所以今天这一讲我们就先从硬件资源类入手去汇总应该设置的配置项。在这个过程中我会带你搞清楚这些配置项的定义与作用是什么以及它们的设置能解决哪些问题让你为资源平衡打下基础。下一讲我们再来讲Shuffle类和Spark SQL大类。
## 哪些配置项与CPU设置有关
首先我们先来说说与CPU有关的配置项**主要包括spark.cores.max、spark.executor.cores和spark.task.cpus这三个参数**。它们分别从集群、Executor和计算任务这三个不同的粒度指定了用于计算的CPU个数。开发者通过它们就可以明确有多少CPU资源被划拨给Spark用于分布式计算。
为了充分利用划拨给Spark集群的每一颗CPU准确地说是每一个CPU核CPU Core你需要设置与之匹配的并行度并行度用spark.default.parallelism和spark.sql.shuffle.partitions这两个参数设置。对于没有明确分区规则的RDD来说我们用spark.default.parallelism定义其并行度spark.sql.shuffle.partitions则用于明确指定数据关联或聚合操作中Reduce端的分区数量。
说到并行度Parallelism就不得不提并行计算任务Paralleled Tasks这两个概念关联紧密但含义大相径庭有不少同学经常把它们弄混。
并行度指的是分布式数据集被划分为多少份,从而用于分布式计算。换句话说,**并行度的出发点是数据,它明确了数据划分的粒度**。并行度越高数据的粒度越细数据分片越多数据越分散。由此可见像分区数量、分片数量、Partitions这些概念都是并行度的同义词。
并行计算任务则不同,它指的是在任一时刻整个集群能够同时计算的任务数量。换句话说,**它的出发点是计算任务、是CPU由与CPU有关的三个参数共同决定**。具体说来Executor中并行计算任务数的上限是spark.executor.cores与spark.task.cpus的商暂且记为#Executor-tasks整个集群的并行计算任务数自然就是#Executor-tasks乘以集群内Executors的数量记为#Executors。因此最终的数值是#Executor-tasks \* #Executors
我们不难发现,**并行度决定了数据粒度,数据粒度决定了分区大小,分区大小则决定着每个计算任务的内存消耗**。在同一个Executor中多个同时运行的计算任务“基本上”是平均瓜分可用内存的每个计算任务能获取到的内存空间是有上限的因此并行计算任务数会反过来制约并行度的设置。你看这两个家伙还真是一对相爱相杀的冤家
至于到底该怎么平衡并行度与并行计算任务两者之间的关系我们留到后面的课程去展开。这里咱们只要记住和CPU设置有关配置项的含义、区别与作用就行了。
![](https://static001.geekbang.org/resource/image/23/ff/234e6b62ff32394f99055c9385988aff.jpeg "与CPU有关的配置项")
## 哪些配置项与内存设置有关?
说完CPU咱们接着说说与内存管理有关的配置项。我们知道在管理模式上Spark分为堆内内存与堆外内存。
堆外内存又分为两个区域Execution Memory和Storage Memory。要想要启用堆外内存我们得先把参数spark.memory.offHeap.enabled置为true然后用spark.memory.offHeap.size指定堆外内存大小。堆内内存也分了四个区域也就是Reserved Memory、User Memory、Execution Memory和Storage Memory。
内存的基础配置项主要有5个它们的含义如下表所示
![](https://static001.geekbang.org/resource/image/67/50/67528832632c392d7e2e4c89a872b350.jpeg "与内存有关的配置项")
简单来说,**这些配置项决定了我们刚才说的这些区域的大小**,这很好理解。工具有了,但很多同学在真正设置内存区域大小的时候还会有各种各样的疑惑,比如说:
* 内存空间是有限的,该把多少内存划分给堆内,又该把多少内存留给堆外呢?
* 在堆内内存里该怎么平衡User Memory和Spark用于计算的内存空间
* 在统一内存管理模式下该如何平衡Execution Memory和Storage Memory
别着急,接下来,咱们一个一个来解决。
### 堆外与堆内的平衡
相比JVM堆内内存off heap堆外内存有很多优势如更精确的内存占用统计和不需要垃圾回收机制以及不需要序列化与反序列化。你可能会说“既然堆外内存这么厉害那我们干脆把所有内存都划分给它不就得了”先别急着下结论我们先一起来看一个例子。
用户表1记录着用户数据每个数据条目包含4个字段整型的用户ID、String类型的姓名、整型的年龄和Char类型的性别。如果现在要求你用字节数组来存储每一条用户记录你该怎么办呢
![](https://static001.geekbang.org/resource/image/13/83/136d2f0618d374f2622e1985984ca783.jpeg "用户表1简单数据模式")
我们一起来做一下。首先除姓名外其它3个字段都是定长数据类型因此可以直接安插到字节数组中。对于变长数据类型如String由于我们事先并不知道每个用户的名字到底有多长因此为了把name字段也用字节数组的形式存储我们只能曲线救国先记录name字段的在整个字节数组内的偏移量再记录它的长度最后把完整的name字符串安插在字节数组的末尾如下图所示。
![](https://static001.geekbang.org/resource/image/51/2c/516c0e41e6757193533c8dfa33f9912c.jpg "用字节数组存储用户记录")
尽管存储String类型的name字段麻烦一些但我们总算成功地用字节数组容纳了每一条用户记录。OK大功告成
你可能会问“做这个小实验的目的是啥呢”事实上Spark开辟的堆外内存就是以这样的方式来存储应用数据的。**正是基于这种紧凑的二进制格式相比JVM堆内内存Spark通过Java Unsafe API在堆外内存中的管理才会有那么多的优势。**
不过成也萧何败也萧何字节数组自身的局限性也很难突破。比如说如果用户表1新增了兴趣列表字段类型为List\[String\]如用户表2所示。这个时候如果我们仍然采用字节数据的方式来存储每一条用户记录不仅越来越多的指针和偏移地址会让字段的访问效率大打折扣而且指针越多内存泄漏的风险越大数据访问的稳定性就值得担忧了。
![](https://static001.geekbang.org/resource/image/07/92/07d010f82edf0bec630fd9ed57ba8892.png "用户表2复杂数据模式")
因此当数据模式Data Schema开始变得复杂时Spark直接管理堆外内存的成本将会非常高。
那么针对有限的内存资源我们该如何平衡JVM堆内内存与off heap堆外内存的划分我想你心中也该有了答案。**对于需要处理的数据集如果数据模式比较扁平而且字段多是定长数据类型就更多地使用堆外内存。相反地如果数据模式很复杂嵌套结构或变长字段很多就更多采用JVM堆内内存会更加稳妥。**
### User Memory与Spark可用内存如何分配
接下来我们再来说说User Memory。我们都知道参数spark.memory.fraction的作用是明确Spark可支配内存占比换句话说就是在所有的堆内空间中有多大比例的内存可供Spark消耗。相应地1 - spark.memory.fraction就是User Memory在堆内空间的占比。
因此,**spark.memory.fraction参数决定着两者如何瓜分堆内内存它的系数越大Spark可支配的内存越多User Memory区域的占比自然越小**。spark.memory.fraction的默认值是0.6也就是JVM堆内空间的60%会划拨给Spark支配剩下的40%划拨给User Memory。
那么User Memory都用来存啥呀需要预留那么大的空间吗简单来说User Memory存储的主要是开发者自定义的数据结构这些数据结构往往用来协助分布式数据集的处理。
举个例子还记得调度系统那一讲Label Encoding的例子吗
```
/**
实现方式2
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)
//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")
```
在这个例子中我们先读取包含用户兴趣的模板文件然后根据模板内容构建兴趣到索引的映射字典。在对千亿样本做Lable Encoding的时候这个字典可以快速查找兴趣字符串并返回对应索引来辅助完成数据处理。像这样的映射字典就是所谓的自定义数据结构这部分数据都存储在User Memory内存区域。
因此,**当在JVM内平衡Spark可用内存和User Memory时你需要考虑你的应用中类似的自定义数据结构多不多、占比大不大然后再相应地调整两块内存区域的相对占比**。如果应用中自定义的数据结构很少不妨把spark.memory.fraction配置项调高让Spark可以享用更多的内存空间用于分布式计算和缓存分布式数据集。
## Execution Memory该如何与Storage Memory平衡
最后咱们再来说说Execution Memory与Storage Memory的平衡。在内存管理那一讲我给你讲了一个黄四郎地主招租的故事并用故事中的占地协议类比了执行内存与缓存内存之间的竞争关系。执行任务与RDD缓存共享Spark可支配内存但是,执行任务在抢占方面有更高的优先级。
因此通常来说在统一内存管理模式下spark.memory.storageFraction的设置就显得没那么紧要因为无论这个参数设置多大执行任务还是有机会抢占缓存内存而且一旦完成抢占就必须要等到任务执行结束才会释放。
不过,凡事都没有绝对,**如果你的应用类型是“缓存密集型”,如机器学习训练任务,就很有必要通过调节这个参数来保障数据的全量缓存**。这类计算任务往往需要反复遍历同一份分布式数据集数据缓存与否对任务的执行效率起着决定性作用。这个时候我们就可以把参数spark.memory.storageFraction调高然后有意识地在应用的最开始把缓存灌满再基于缓存数据去实现计算部分的业务逻辑。
但在这个过程中,**你要特别注意RDD缓存与执行效率之间的平衡**。为什么这么说呢?
首先RDD缓存占用的内存空间多了Spark用于执行分布式计算任务的内存空间自然就变少了而且数据分析场景中常见的关联、排序和聚合等操作都会消耗执行内存这部分内存空间变少自然会影响到这类计算的执行效率。
其次大量缓存引入的GCGarbage Collection垃圾回收负担对执行效率来说是个巨大的隐患。
你还记得黄四郎要招租的土地分为托管田和自管田吗托管田由黄四郎派人专门打理土地秋收后的翻土、整平等杂务为来年种下一茬庄稼做准备。堆内内存的垃圾回收也是一个道理JVM大体上把Heap堆内内存分为年轻代和老年代。年轻代存储生命周期较短、引用次数较低的对象老年代则存储生命周期较长、引用次数高的对象。因此像RDD cache这种一直缓存在内存里的数据一定会被JVM安排到老年代。
年轻代的垃圾回收工作称为Young GC老年代的垃圾回收称为Full GC。每当老年代可用内存不足时都会触发JVM执行Full GC。在Full GC阶段JVM会抢占应用程序执行线程强行征用计算节点中所有的CPU线程也就是“集中力量办大事”。当所有CPU线程都被拿去做垃圾回收工作的时候应用程序的执行只能暂时搁置。只有等Full GC完事之后把CPU线程释放出来应用程序才能继续执行。这种Full GC征用CPU线程导致应用暂停的现象叫做“Stop the world”。
因此Full GC对于应用程序的伤害远大于Young GC并且GC的效率与对象个数成反比对象个数越多GC效率越差。这个时候对于RDD这种缓存在老年代中的数据就很容易引入Full GC问题。
一般来说为了提升RDD cache访问效率很多同学都会采用以对象值的方式把数据缓存到内存因为对象值的存储方式避免了数据存取过程中序列化与反序列化的计算开销。我们在RDD/DataFrame/Dataset之上调用cache方法的时候默认采用的就是这种存储方式。
但是采用对象值的方式缓存数据不论是RDD还是DataFrame、Dataset每条数据样本都会构成一个对象要么是开发者自定义的Case class要么是Row对象。换句话说老年代存储的对象个数基本等于你的样本数。因此当你的样本数大到一定规模的时候你就需要考虑大量的RDD cache可能会引入的Full GC问题了。
基于上面的分析,我们不难发现,**在打算把大面积的内存空间用于RDD cache之前你需要衡量这么做可能会对执行效率产生的影响**。
你可能会说:“我的应用就是缓存密集型,确实需要把数据缓存起来,有什么办法来平衡执行效率吗?”办法还是有的。
**首先,你可以放弃对象值的缓存方式,改用序列化的缓存方式,序列化会把多个对象转换成一个字节数组。**这样,对象个数的问题就得到了初步缓解。
**其次我们可以调节spark.rdd.compress这个参数。**RDD缓存默认是不压缩的启用压缩之后缓存的存储效率会大幅提升有效节省缓存内存的占用从而把更多的内存空间留给分布式任务执行。
通过这两类调整开发者在享用RDD数据访问效率的同时还能够有效地兼顾应用的整体执行效率可谓是两全其美。不过有得必有失尽管这两类调整优化了内存的使用效率但都是以引入额外的计算开销、牺牲CPU为代价的。这也就是我们一直强调的性能调优的过程本质上就是不断地平衡不同硬件资源消耗的过程。
## 哪些配置项与磁盘设置有关?
在存储系统那一讲我们简单提到过spark.local.dir这个配置项这个参数允许开发者设置磁盘目录该目录用于存储RDD cache落盘数据块和Shuffle中间文件。
通常情况下spark.local.dir会配置到本地磁盘中容量比较宽裕的文件系统毕竟这个目录下会存储大量的临时文件我们需要足够的存储容量来保证分布式任务计算的稳定性。不过如果你的经费比较充裕有条件在计算节点中配备足量的SSD存储甚至是更多的内存资源完全可以把SSD上的文件系统目录或是内存文件系统添加到spark.local.dir配置项中去从而提供更好的I/O性能。
## 小结
掌握硬件资源类的配置项是我们打破性能瓶颈,以及平衡不同硬件资源利用率的必杀技。具体来说,我们可以分成两步走。
**第一步理清CPU、内存和磁盘这三个方面的性能配置项都有什么以及它们的含义。**因此,我把硬件资源类配置项的含义都汇总在了一个表格中,方便你随时查看。有了这份手册,在针对硬件资源进行配置项调优时,你就能够做到不重不漏。
![](https://static001.geekbang.org/resource/image/c9/0a/c9cfdb17e3ec6e8d022ff8e91e4a170a.jpg)
**第二步,重点理解这些配置项的作用,以及可以解决的问题。**
首先对于CPU类配置项我们要重点理解并行度与并行计算任务数的区别。并行度从数据的角度出发明确了数据划分的粒度并行度越高数据粒度越细数据越分散CPU资源利用越充分但同时要提防数据粒度过细导致的调度系统开销。
并行计算任务数则不同,它从计算的角度出发,强调了分布式集群在任一时刻并行处理的能力和容量。并行度与并行计算任务数之间互相影响、相互制约。
其次对于内存类配置项我们要知道怎么设置它们来平衡不同内存区域的方法。这里我们主要搞清楚3个问题就可以了
1. 在平衡堆外与堆内内存的时候,我们要重点考察数据模式。如果数据模式比较扁平,而且定长字段较多,应该更多地使用堆外内存。相反地,如果数据模式比较复杂,应该更多地利用堆内内存
2. 在平衡可支配内存和User memory的时候我们要重点考察应用中自定义的数据结构。如果数据结构较多应该保留足够的User memory空间。相反地如果数据结构较少应该让Spark享有更多的可用内存资源
3. 在平衡Execution memory与Storage memory的时候如果RDD缓存是刚需我们就把spark.memory.storageFraction调大并且在应用中优先把缓存灌满再把计算逻辑应用在缓存数据之上。除此之外我们还可以同时调整spark.rdd.compress和spark.memory.storageFraction来缓和Full GC的冲击
## 每日一练
1. 并行度设置过大会带来哪些弊端?
2. 在Shuffle的计算过程中有哪些Spark内置的数据结构可以充分利用堆外内存资源
3. 堆外与堆内的取舍,你还能想到其他的制约因素吗?
4. 如果内存资源足够丰富有哪些方式可以开辟内存文件系统用于配置spark.local.dir参数
期待在留言区看到你的思考和答案,也欢迎你把这份硬件资源配置项手册分享给更多的朋友,我们下一讲见!