# 15 | 内存视角(一):如何最大化内存的使用效率? 你好,我是吴磊。 上一讲我们说,想要提升CPU利用率,最重要的就是合理分配执行内存,但是,执行内存只是Spark内存分区的一部分。因此,想要合理分配执行内存,我们必须先从整体上合理划分好Spark所有的内存区域。 可在实际开发应用的时候,身边有不少同学向我抱怨:“Spark划分不同内存区域的原理我都知道,但我还是不知道不同内存区域的大小该怎么设置,纠结来、纠结去。最后,所有跟内存有关的配置项,我还是保留了默认值。” 这种不能把原理和实践结合起来的情况很常见,所以今天这一讲,我就从熟悉的Label Encoding实例出发,**一步步带你去分析不同情况下,不同内存区域的调整办法,**帮你归纳出最大化内存利用率的常规步骤。这样,你在调整内存的时候,就能结合应用的需要,做到有章可循、有的放矢。 ## 从一个实例开始 我们先来回顾一下[第5讲](https://time.geekbang.org/column/article/355028)中讲过的Label Encoding。在Label Encoding的业务场景中,我们需要对用户兴趣特征做Encoding。依据模板中兴趣字符串及其索引位置,我们的任务是把千亿条样本中的用户兴趣转换为对应的索引值。模板文件的内容示例如下所示。 ``` //模板文件 //用户兴趣 体育-篮球-NBA-湖人 军事-武器-步枪-AK47 ``` 实现的代码如下所示,注意啦,这里的代码是第5讲中优化后的版本。 ``` /** 输入参数:模板文件路径,用户兴趣字符串 返回值:用户兴趣字符串对应的索引值 */ //函数定义 val findIndex: (String) => (String) => Int = { (filePath) => val source = Source.fromFile(filePath, "UTF-8") val lines = source.getLines().toArray source.close() val searchMap = lines.zip(0 until lines.size).toMap (interest) => searchMap.getOrElse(interest, -1) } val partFunc = findIndex(filePath) //Dataset中的函数调用 partFunc("体育-篮球-NBA-湖人") ``` 下面,咱们先一起回顾一下代码实现思路,再来分析它目前存在的性能隐患,最后去探讨优化它的方法。 首先,findIndex函数的主体逻辑比较简单,就是读取模板文件和构建Map映射,以及查找用户兴趣并返回索引。不过,findIndex函数被定义成了高阶函数。这样一来,当以模板文件为实参调用这个高阶函数的时候,我们会得到一个内置了Map查找字典的标量函数partFunc,最后在千亿样本上调用partFunc完成数据转换。**利用高阶函数,我们就避免了让Executor中的每一个Task去读取模板文件,以及从头构建Map字典这种执行低效的做法。** 在运行时,这个函数在Driver端会被封装到一个又一个的Task中去,随后Driver把这些Task分发到Executor,Executor接收到任务之后,交由线程池去执行(调度系统的内容可以回顾[第5讲)](https://time.geekbang.org/column/article/355028)。这个时候,每个Task就像是一架架小飞机,携带着代码“乘客”和数据“行李”,从Driver飞往Executor。Task小飞机在Executor机场着陆之后,代码“乘客”乘坐出租车或是机场大巴,去往JVM stack;数据“行李”则由专人堆放在JVM Heap,也就是我们常说的堆内内存。 回顾Label encoding中的findIndex函数不难发现,其中大部分都是代码“乘客”,唯一的数据“行李”是名为searchMap的Map字典。像这样用户自定义的数据结构,消耗的内存区域就是堆内内存的User Memory(Spark对内存区域的划分内容可以回顾一下[第7讲](https://time.geekbang.org/column/article/355662))。 ### User Memory性能隐患 回顾到这里,你觉得findIndex函数有没有性能隐患呢?你可以先自己思考一下,有了答案之后再来看我下面的分析。 答案当然是“有”。首先,每架小飞机都携带这么一份数据“大件行李”,自然需要消耗更多的“燃油”,这里的“燃油”指的**是Task分发过程中带来的网络开销**。其次,因为每架小飞机着陆之后,都会在Executor的“旅客行李专区”User Memory寄存上这份同样的数据“行李”,所以,**User Memory需要确保有足够的空间可以寄存所有旅客的行李,也就是大量的重复数据**。 那么,User Memory到底需要准备出多大的内存空间才行呢?我们不妨来算一算。这样的计算并不难,只需要用飞机架次乘以行李大小就可以了。 用户自定义的数据结构往往是用于辅助函数完成计算任务的,所以函数执行完毕之后,它携带的数据结构的生命周期也就告一段落。**因此,在Task的数量统计上,我们不必在意一个Executor总共需要处理多少个Task,只需要关注它在同一时间可以并行处理的Task数量,也就是Executor的线程池大小即可**。 我们说过,Executor线程池大小由spark.executor.cores和spark.task.cpus这两个参数的商(spark.executor.cores/spark.task.cpus)决定,我们暂且把这个商记作#threads。 接下来是估算数据“行李”大小,由于searchMap并不是分布式数据集,因此我们不必采用先Cache,再提取Spark执行计划统计信息的方式。对于这样的Java数据结构,我们完全可以在REPL中,通过Java的常规方法估算数据存储大小,估算得到的searchMap大小记为#size。 好啦!现在,我们可以算出,User Memory至少需要提供#threads \* #size这么大的内存空间,才能支持分布式任务完成计算。但是,对于User Memory内存区域来说,使用#threads \* #size的空间去重复存储同样的数据,本身就是降低了内存的利用率。那么,我们该怎么省掉#threads \* #size的内存消耗呢? ## 性能调优 学习过广播变量之后,想必你头脑中已经有了思路。没错,咱们可以尝试使用广播变量,来对示例中的代码进行优化。 仔细观察findIndex函数,我们不难发现,函数的核心计算逻辑有两点。一是读取模板文件、创建Map映射字典;二是以给定字符串对字典进行查找,并返回查找结果。显然,千亿样本转换的核心需求是其中的第二个环节。既然如此,我们完全可以把创建好的Map字典封装成广播变量,然后分发到各个Executors中去。 有了广播变量的帮忙,凡是发往同一个Executor的Task小飞机,都无需亲自携带数据“行李”,这些大件行李会由“联邦广播快递公司”派货机专门发往各个Executors,Driver和每个Executors之间,都有一班这样的货运专线。思路说完了,优化后的代码如下所示。 ``` /** 广播变量实现方式 */ //定义广播变量 val source = Source.fromFile(filePath, "UTF-8") val lines = source.getLines().toArray source.close() val searchMap = lines.zip(0 until lines.size).toMap val bcSearchMap = sparkSession.sparkContext.broadcast(searchMap) //在Dataset中访问广播变量 bcSearchMap.value.getOrElse("体育-篮球-NBA-湖人", -1) ``` 上面代码的实现思路很简单:第一步还是读取模板文件、创建Map字典;第二步,把Map字典封装为广播变量。这样一来,在对千亿样本进行转换时,我们直接通过bcSearchMap.value读取广播变量内容,然后,通过调用Map字典的getOrElse方法来获取用户兴趣对应的索引值。 相比最开始的第一种实现方式,第二种实现方式的代码改动还是比较小的,那这一版代码对内存的消耗情况有什么改进呢? 我们发现,Task小飞机的代码“乘客”换人了!**小飞机之前需要携带函数findIndex,现在则换成了一位“匿名的乘客”**:一个读取广播变量并调用其getOrElse方法的匿名函数。由于这位“匿名的乘客”将大件行李托运给了“联邦广播快递公司”的专用货机,因此,Task小飞机着陆后,没有任何“行李”需要寄存到User Memory。换句话说,优化后的版本不会对User Memory内存区域进行占用,所以第一种实现方式中#threads \* #size的内存消耗就可以省掉了。 ### Storage Memory规划 这样一来,原来的内存消耗转嫁到了广播变量身上。但是,广播变量也会消耗内存,这会不会带来新的性能隐患呢?那我们就来看看,广播变量消耗的具体是哪块内存区域。 [回顾存储系统那一讲](https://time.geekbang.org/column/article/355081),我们说过,Spark存储系统主要有3个服务对象,分别是Shuffle中间文件、RDD缓存和广播变量。它们都由Executor上的BlockManager进行管理,对于数据在内存和磁盘中的存储,BlockManager利用MemoryStore和DiskStore进行抽象和封装。 那么,广播变量所携带的数据内容会物化到MemoryStore中去,以Executor为粒度为所有Task提供唯一的一份数据拷贝。MemoryStore产生的内存占用会被记入到Storage Memory的账上。**因此,广播变量消耗的就是Storage Memory内存区域**。 接下来,我们再来盘算一下,第二种实现方式究竟需要耗费多少内存空间。由于广播变量的分发和存储以Executor为粒度,因此每个Executor消耗的内存空间,就是searchMap一份数据拷贝的大小。searchMap的大小我们刚刚计算过就是#size。 明确了Storage Memory内存区域的具体消耗之后,我们自然可以根据公式:(spark.executor.memory – 300MB)\* spark.memory.fraction \* spark.memory.storageFraction去有针对性地调节相关的内存配置项。 ## 内存规划两步走 现在,咱们在两份不同的代码实现下,分别定量分析了不同内存区域的消耗与占用。对于这些消耗做到心中有数,我们自然就能够相应地去调整相关的配置项参数。基于这样的思路,想要最大化内存利用率,我们需要遵循两个步骤: * **预估内存占用** * **调整内存配置项** 我们以堆内内存为例,来讲一讲内存规划的两步走具体该如何操作。我们都知道,堆内内存划分为Reserved Memory、User Memory、Storage Memory和Execution Memory这4个区域。预留内存固定为300MB,不用理会,其他3个区域需要你去规划。 ### 预估内存占用 首先,我们来说内存占用的预估,主要分为三步。 第一步,计算User Memory的内存消耗。我们先汇总应用中包含的自定义数据结构,并估算这些对象的总大小#size,然后**用#size乘以Executor的线程池大小,即可得到User Memory区域的内存消耗#User**。 第二步,计算Storage Memory的内存消耗。我们先汇总应用中涉及的广播变量和分布式数据集缓存,分别估算这两类对象的总大小,分别记为#bc、#cache。另外,我们把集群中的Executors总数记作#E。这样,**每个Executor中Storage Memory区域的内存消耗的公式就是:#Storage = #bc + #cache / #E**。 第三步,计算执行内存的消耗。学习上一讲,我们知道执行内存的消耗与多个因素有关。第一个因素是Executor线程池大小#threads,第二个因素是数据分片大小,而数据分片大小取决于数据集尺寸#dataset和并行度#N。因此,**每个Executor中执行内存的消耗的计算公式为:#Execution = #threads \* #dataset / #N**。 ### 调整内存配置项 得到这3个内存区域的预估大小#User、#Storage、#Execution之后,调整相关的内存配置项就是一件水到渠成的事情(由公式(spark.executor.memory – 300MB)\* spark.memory.fraction \* spark.memory.storageFraction)可知),这里我们也可以分为3步。 首先,根据定义,**spark.memory.fraction可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到**。 同理,**spark.memory.storageFraction的数值应该参考(#Storage)/(#Storage + #Execution)**。 最后,对于Executor堆内内存总大小spark.executor.memory的设置,我们自然要参考4个内存区域的总消耗,也就是**300MB + #User + #Storage + #Execution。不过,我们要注意,利用这个公式计算的前提是,不同内存区域的占比与不同类型的数据消耗一致**。 总的来说,在内存规划的两步走中,第一步预估不同区域的内存占比尤为关键,因为第二步中参数的调整完全取决于第一步的预估结果。如果你按照这两个步骤去设置相关的内存配置项,相信你的应用在运行时就能够充分利用不同的内存区域,避免出现因参数设置不当而导致的内存浪费现象,从而在整体上提升内存利用率。 ## 小结 合理划分Spark所有的内存区域,是同时提升CPU与内存利用率的基础。因此,掌握内存规划很重要,在今天这一讲,我们把内存规划归纳为两步走。 第一步是预估内存占用。 * 求出User Memory区域的内存消耗,公式为:#User=#size乘以Executor线程池的大小。 * 求出每个Executor中Storage Memory区域的内存消耗,公式为:#Storage = #bc + #cache / #E。 * 求出执行内存区域的内存消耗,公式为:#Execution = #threads \* #dataset / #N。 第二步是调整内存配置项:根据公式得到的3个内存区域的预估大小#User、#Storage、#Execution,去调整(spark.executor.memory – 300MB)\* spark.memory.fraction \* spark.memory.storageFraction公式中涉及的所有配置项。 * spark.memory.fraction可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到。 * spark.memory.storageFraction的数值应该参考(#Storage)/(#Storage + #Execution)。 * spark.executor.memory的设置,可以通过公式300MB + #User + #Storage + #Execution得到。 这里,我还想多说几句,**内存规划两步走终归只是手段,它最终要达到的效果和目的,是确保不同内存区域的占比与不同类型的数据消耗保持一致,从而实现内存利用率的最大化**。 ## 每日一练 1. 你知道估算Java对象存储大小的方法有哪些吗?不同的方法又有哪些优、劣势呢? 2. 对于内存规划的第一步来说,要精确地预估运行时每一个区域的内存消耗,很费时、费力,调优的成本很高。如果我们想省略掉第一步的精确计算,你知道有哪些方法能够粗略、快速地预估不同内存区域的消耗占比吗? 期待在留言区看到你的思考和答案,我们下一讲见!