You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

216 lines
21 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 05 | 调度系统:“数据不动代码动”到底是什么意思?
你好,我是吴磊。
在日常的开发与调优工作中为了充分利用硬件资源我们往往需要手工调节任务并行度来提升CPU利用率控制任务并行度的参数是Spark的配置项spark.default.parallelism。增加并行度确实能够充分利用闲置的CPU线程但是parallelism数值也不宜过大过大反而会引入过多的调度开销得不偿失。
这个调优技巧可以说是老生常谈了网上到处都可以搜得到。那你知道为什么parallelism数值过大调度开销会呈指数级增长吗调度开销具体又是指什么呢以及如果不想一个数值一个数值的尝试parallelism数值究竟该怎么设置才能以最少的时间获得最好的效果如果你还没有答案或者说还没有把握答对接下来你就要好好听我讲。
这一讲,我会通过一个机器学习案例,来和你一起聊聊调度系统是什么,它是怎么工作的,从而帮助你摆脱调优总是停留在知其然、不知其所以然的尴尬境地。
## 案例对用户兴趣特征做Label Encoding
在机器学习应用中特征工程几乎占据了算法同学80%的时间和精力毕竟一份质量优良的训练样本限定了模型效果的上限和天花板我们要讲的案例就来自特征工程中一个典型的处理场景Label Encoding标签编码
什么是Label encoding呢模型特征按照是否连续可以分为两类连续性数值特征和离散型特征离散型特征往往以字符串的形式存在比如用户兴趣特征就包括体育、政治、军事和娱乐等。对于很多机器学习算法来说字符串类型的数据是不能直接消费的需要转换为数值才行例如把体育、政治、军事、娱乐映射为0、1、2、3这个过程在机器学习领域有个术语就叫Label encoding。
我们这一讲的案例就是要对用户兴趣特征做Label encoding简单来说就是以固定的模板把字符串转换为数值然后将千亿条样本中的用户兴趣转换为对应的索引值。固定模板是离线模型训练与线上模型服务之间的文件接口内容仅包含用户兴趣这一列字符串已按事先约定好的规则进行排序。我们需要注意的是用户兴趣包含4个层级因此这个模板文件较大记录数达到万级别。
```
//模板文件
//用户兴趣
体育-篮球-NBA-湖人
军事-武器-步枪-AK47
```
那具体怎么转换呢?例如,我们可以将用户兴趣“体育-篮球-NBA-湖人”映射为0将兴趣“军事-武器-步枪-AK47”映射为1以此类推。应该说需求还是相当明确的我身边的同学们拿到需求之后奔儿都没打以迅雷不及掩耳之势就实现了如下的处理函数。
```
/**
实现方式1
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
//函数定义
def findIndex(templatePath: String, interest: String): Int = {
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
searchMap.getOrElse(interest, -1)
}
//Dataset中的函数调用
findIndex(filePath, "体育-篮球-NBA-湖人")
```
我们可以看到这个函数有两个形参一个是模板文件路径另一个是训练样本中的用户兴趣。处理函数首先读取模板文件然后根据文件中排序的字符串构建一个从兴趣到索引的Map映射最后在这个Map中查找第二个形参传入的用户兴趣如果能找到则返回对应的索引找不到的话则返回-1。
这段代码看上去似乎没什么问题同学们基于上面的函数对千亿样本做Label encoding在20台机型为C5.4xlarge AWS EC2的分布式集群中花费了5个小时。坦白说这样的执行性能我是不能接受的。你可能会说“需求就是这个样子还能有什么别的办法呢”我们不妨来看另外一种实现方式。
```
/**
实现方式2
输入参数:模板文件路径,用户兴趣字符串
返回值:用户兴趣字符串对应的索引值
*/
//函数定义
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)
//Dataset中的函数调用
partFunc("体育-篮球-NBA-湖人")
```
同学们基于第二种方式对相同的数据集做Label encoding之后在10台同样机型的分布式集群中花了不到20分钟就把任务跑完了。可以说执行性能的提升是显而易见的。那么两份代码有什么区别呢
我们可以看到相比于第一份代码第二份代码的函数体内没有任何变化还是先读取模板文件、构建Map映射、查找用户兴趣最后返回索引。最大的区别就是第二份代码对高阶函数的使用具体来说有2点
1. 处理函数定义为高阶函数,形参是模板文件路径,返回结果是从用户兴趣到索引的函数;
2. 封装千亿样本的Dataset所调用的函数不是第一份代码中的findIndex而是用模板文件调用findIndex得到的partFuncpartFunc是形参为兴趣、结果为索引的普通标量函数。
那么,高阶函数真有这么神奇吗?其实,性能的提升并不是高阶函数的功劳,而是调度系统在起作用。
## Spark的调度系统是如何工作的
Spark调度系统的核心职责是**先将用户构建的DAG转化为分布式任务结合分布式集群资源的可用性基于调度规则依序把分布式任务分发到执行器**。这个过程听上去就够复杂的了,为了方便你理解,我们还是先来讲一个小故事。
### 土豆工坊流水线升级
在学完了内存计算的第二层含义之后,土豆工坊的老板决定对土豆加工流水线做升级,来提高工坊的生产效率和灵活性。
这里,我们先对内存计算的第二层含义做个简单地回顾,它指的是**同一Stage中的所有操作会被捏合为一个函数这个函数一次性会被地应用到输入数据上并且一次性地产生计算结果**。
升级之前的土豆加工流程DAG被切分为3个执行阶段Stage它们分别是Stage 0、Stage 1、Stage 2。其中Stage 0产出即食薯片Stage 1分发调味品Stage 2则产出不同尺寸、不同风味的薯片。我们重点关注Stage 0Stage 0有3个加工环节分别是清洗、切片和烘焙。这3个环节需要3种不同的设备即清洗机、切片机和烤箱。
![](https://static001.geekbang.org/resource/image/3f/5f/3fcb3e400db91198a7499c016ccfb45f.jpg "土豆工坊加工流程的3个执行阶段")
工坊有3条流水线每种设备都需要3套在成本方面要花不少钱呢因此工坊老板一直绞尽脑汁想把设备方面的成本降下来。
此时,工头儿建议:“老板,我听说市场上有一种可编程的土豆加工设备,它是个黑盒子并且只有输入口和输出口,从外面看不见里面的操作流程。不过黑盒子受程序控制,给定输入口的食材,我们可以编写程序控制黑盒子的输出。有了这个可编程设备,咱们不但省了钱,将来还可以灵活地扩充产品线。比方想生产各种风味的薯条或是土豆泥,只需要更换一份程序加载到黑盒子里就行啦!”
老板听后大喜,决定花钱购入可编程土豆加工设备,替换并淘汰现有的清洗机、切片机和烤箱。
于是工坊的加工流水线就变成了如下的样子。工人们的工作也从按照DAG流程图的关键步骤在流水线上安装相应的设备变成了把关键步骤编写相应的程序加载到黑盒内。这样一来这家工坊的生产力也从作坊式的生产方式升级到了现代化流水线的作业模式。
![](https://static001.geekbang.org/resource/image/dc/46/dc4f5f39a166ca93080c5a7c0ea0d446.jpg "演进的土豆加工流水线")
那么这个故事跟我们今天要讲的调度系统有什么关系呢事实上Spark调度系统的工作流程包含如下5个步骤
**1\. 将DAG拆分为不同的运行阶段Stages**
**2\. 创建分布式任务Tasks和任务组TaskSet**
**3\. 获取集群内可用****的****硬件资源情况;**
**4\. 按照调度规则决定优先调度哪些任务/组;**
**5\. 依序将分布式任务分发到执行器Executor。**
除了第4步以外其他几步和土豆工坊流水线上的关键步骤都是一一对应的它们的对应关系如下
![](https://static001.geekbang.org/resource/image/63/83/635462108ee2fc09991708f0856bcb83.jpg)
现在,你可能会觉得用故事来记这几个步骤好像多此一举,但当我们学完了所有的原理之后,再回过头来把故事的主线串联起来,你就会惊喜地发现,所有的原理你都能轻松地记住和理解,这可比死记硬背的效率要高得多。
### 调度系统中的核心组件有哪些?
接下来我们深入到流程中的每一步去探究Spark调度系统是如何工作的。不过在此之前我们得先弄清楚调度系统都包含哪些关键组件不同组件之间如何交互它们分别担任了什么角色才能更好地理解流程中的每一步。
Spark调度系统包含3个核心组件分别是DAGScheduler、TaskScheduler和SchedulerBackend。这3个组件都运行在Driver进程中它们通力合作将用户构建的DAG转化为分布式任务再把这些任务分发给集群中的Executors去执行。不过它们的名字都包含Scheduler光看名字还真是丈二和尚摸不着头脑所以我把它们和调度系统流程中5个步骤的对应关系总结在了下表中你可以看一看。
![](https://static001.geekbang.org/resource/image/46/52/46bb66fed5d52b09407d66881cf0df52.jpeg)
#### 1\. DAGScheduler
DAGScheduler的主要职责有二一是把用户DAG拆分为Stages如果你不记得这个过程可以回顾一下[上一讲](https://time.geekbang.org/column/article/353808)的内容二是在Stage内创建计算任务Tasks这些任务囊括了用户通过组合不同算子实现的数据转换逻辑。然后执行器Executors接收到Tasks会将其中封装的计算函数应用于分布式数据分片去执行分布式的计算过程。
不过如果我们给集群中处于繁忙或者是饱和状态的Executors分发了任务执行效果会大打折扣。因此**在分发任务之前,调度系统得先判断哪些节点的计算资源空闲,然后再把任务分发过去**。那么,调度系统是怎么判断节点是否空闲的呢?
#### 2\. SchedulerBackend
SchedulerBackend就是用来干这个事的它是对于资源调度器的封装与抽象为了支持多样的资源调度模式如Standalone、YARN和MesosSchedulerBackend提供了对应的实现类。在运行时Spark根据用户提供的MasterURL来决定实例化哪种实现类的对象。MasterURL就是你通过各种方式指定的资源管理器如--master spark://ip:hostStandalone 模式)、--master yarnYARN 模式)。
对于集群中可用的计算资源SchedulerBackend会用一个叫做ExecutorDataMap的数据结构来记录每一个计算节点中Executors的资源状态。ExecutorDataMap是一种HashMap它的Key是标记Executor的字符串Value是一种叫做ExecutorData的数据结构ExecutorData用于封装Executor的资源状态如RPC地址、主机地址、可用CPU核数和满配CPU核数等等它相当于是对Executor做的“资源画像”。
![](https://static001.geekbang.org/resource/image/a7/a9/a7f8d49bbf1f8b0a125ffca87f079aa9.jpg "ExecutorDataMap映射表")
总的来说对内SchedulerBackend用ExecutorData对Executor进行资源画像对外SchedulerBackend以WorkerOffer为粒度提供计算资源WorkerOffer封装了Executor ID、主机地址和CPU核数用来表示一份可用于调度任务的空闲资源。显然基于Executor资源画像SchedulerBackend可以同时提供多个WorkerOffer用于分布式任务调度。WorkerOffer这个名字起得蛮有意思Offer的字面意思是公司给你提供的工作机会结合Spark调度系统的上下文就变成了使用硬件资源的机会。
好了到此为止要调度的计算任务有了就是DAGScheduler通过Stages创建的Tasks可用于调度任务的计算资源也有了即SchedulerBackend提供的一个又一个WorkerOffer。如果从供需的角度看待任务调度DAGScheduler就是需求端SchedulerBackend就是供给端。
#### 3\. TaskScheduler
左边有需求右边有供给如果把Spark调度系统看作是一个交易市场的话那么中间还需要有个中介来帮它们对接意愿、撮合交易从而最大限度地提升资源配置的效率。在Spark调度系统中这个中介就是TaskScheduler。**TaskScheduler的职责是基于既定的规则与策略达成供需双方的匹配与撮合**。
![](https://static001.geekbang.org/resource/image/82/yy/82e86e1b3af101100015bcfd81f0f7yy.jpg "Spark分布式任务调度流程")
显然TaskScheduler的核心是任务调度的规则和策略**TaskScheduler的调度策略分为两个层次一个是不同Stages之间的调度优先级一个是Stages内不同任务之间的调度优先级**。
首先对于两个或多个Stages如果它们彼此之间不存在依赖关系、互相独立在面对同一份可用计算资源的时候它们之间就会存在竞争关系。这个时候先调度谁、或者说谁优先享受这份计算资源大家就得基于既定的规则和协议照章办事了。
**对于这种Stages之间的任务调度TaskScheduler提供了2种调度模式分别是FIFO先到先得和FAIR公平调度。** FIFO非常好理解在这种模式下Stages按照被创建的时间顺序来依次消费可用计算资源。这就好比在二手房交易市场中两个人同时看中一套房子不管两个人各自愿意出多少钱谁最先交定金中介就优先给谁和卖家撮合交易。
你可能会说“这不合常理啊如果第二个人愿意出更多的钱卖家自然更乐意和他成交。”没错考虑到开发者的意愿度TaskScheduler提供了FAIR公平调度模式。在这种模式下哪个Stages优先被调度取决于用户在配置文件fairscheduler.xml中的定义。
在配置文件中Spark允许用户定义不同的调度池每个调度池可以指定不同的调度优先级用户在开发过程中可以关联不同作业与调度池的对应关系这样不同Stages的调度就直接和开发者的意愿挂钩也就能享受不同的优先级待遇。对应到二手房交易的例子中如果第二个人乐意付30%的高溢价,中介自然乐意优先撮合他与卖家的交易。
说完了不同Stages之间的调度优先级我们再来说说同一个Stages内部不同任务之间的调度优先级Stages内部的任务调度相对来说简单得多。**当TaskScheduler接收到来自SchedulerBackend的WorkerOffer后TaskScheduler会优先挑选那些满足本地性级别要求的任务进行分发**。众所周知本地性级别有4种Process local < Node local < Rack local < Any。从左到右分别是进程本地性、节点本地性、机架本地性和跨机架本地性。从左到右,计算任务访问所需数据的效率越来越差。
进程本地性表示计算任务所需的输入数据就在某一个Executor进程内,因此把这样的计算任务调度到目标进程内最划算。同理,如果数据源还未加载到Executor进程,而是存储在某一计算节点的磁盘中,那么把任务调度到目标节点上去,也是一个不错的选择。再次,如果我们无法确定输入源在哪台机器,但可以肯定它一定在某个机架上,本地性级别就会退化到Rack local
DAGScheduler划分Stages、创建分布式任务的过程中,会为每一个任务指定本地性级别,本地性级别中会记录该任务有意向的计算节点地址,甚至是Executor进程ID。换句话说,**任务自带调度意愿,它通过本地性级别告诉TaskScheduler自己更乐意被调度到哪里去**。
既然计算任务的个人意愿这么强烈,TaskScheduler作为中间商,肯定要优先满足人家的意愿。这就像一名码农想要租西二旗的房子,但是房产中介App推送的结果都是东三环国贸的房子,那么这个中介的匹配算法肯定有问题。
由此可见,**Spark调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销**。毕竟,分发代码的开销要比分发数据的代价低太多,这也正是“数据不动代码动”这个说法的由来。
总的来说,TaskScheduler根据本地性级别遴选出待计算任务之后,先对这些任务进行序列化。然后,交给SchedulerBackendSchedulerBackend根据ExecutorData中记录的RPC地址和主机地址,再将序列化的任务通过网络分发到目的主机的Executor中去。最后,Executor接收到任务之后,把任务交由内置的线程池,线程池中的多线程则并发地在不同数据分片之上执行任务中封装的数据处理函数,从而实现分布式计算。
## 性能调优案例回顾
知道了调度系统是如何工作的,我们就可以回过头来说说开头Label encoding的开发案例中,2种实现方式的差别到底在哪儿了。我们先来回顾案例中处理函数的主要计算步骤:
1. 读取并遍历模板文件内容,建立从字符串到数值的字典;
2. 根据样本中的用户兴趣,查找字典并返回兴趣字符串对应的数值索引。
**2种实现方式的本质区别在于函数中2个计算步骤的分布式计算过程不同。在第1种实现方式中函数是一个接收两个形参的普通标量函数Dataset调用这个函数在千亿级样本上做Label encoding。**
Spark任务调度流程中,该函数在Driver端交由DAGScheduler打包为Tasks,经过TaskScheduler调度给SchedulerBackend,最后由SchedulerBackend分发到集群中的Executors中去执行。这意味着集群中的每一个Executors都需要执行函数中封装的两个计算步骤,要知道,第一个步骤中遍历文件内容并建立字典的计算开销还是相当大的。
反观第2种实现方式,2个计算步骤被封装到一个高阶函数中。用户代码先在Driver端用模板文件调用这个高阶函数,完成第一步计算建立字典的过程,同时输出一个只带一个形参的标量函数,这个标量函数内携带了刚刚建好的映射字典。最后,Dataset将这个标量函数作用于千亿样本之上做Label encoding
发现区别了吗?在第2种实现中,函数的第一步计算只在Driver端计算一次,分发给集群中所有Executors的任务中封装的是携带了字典的标量函数。然后在Executors端,Executors在各自的数据分片上调用该函数,省去了扫描模板文件、建立字典的开销。最后,我们只需要把样本中的用户兴趣传递进去,函数就能以O(1)的查询效率返回数值结果。
对于一个有着成百上千Executors的分布式集群来说,这2种不同的实现方式带来的性能差异还是相当可观的。因此,如果你能把Spark调度系统的工作原理牢记于心,我相信在代码开发或是review的过程中,你都能够意识到第一个计算步骤会带来的性能问题。**这种开发过程中的反思,其实就是在潜移默化地建立以性能为导向的开发习惯**。
## 小结
今天这一讲,我们先通过一个机器学的案例对比了2种实现方式的性能差异,知道了对于调度系统一知半解,很有可能在开发过程中引入潜在的性能隐患。为此,我梳理了调度系统工作流程的5个主要步骤:
1. DAG拆分为不同的运行阶段Stages
2. 创建分布式任务Tasks和任务组TaskSet
3. 获取集群内可用硬件资源情况;
4. 按照调度规则决定优先调度哪些任务/组;
5. 依序将分布式任务分发到执行器Executor
结合这5个步骤,我们深入分析了Spark调度系统的工作原理,我们可以从核心职责和核心原则这两方面来归纳:
1. Spark调度系统的核心职责是,先将用户构建的DAG转化为分布式任务,结合分布式集群资源的可用性,基于调度规则依序把分布式任务分发到执行器Executors
2. Spark调度系统的核心原则是,尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方(Executors或计算节点),从而最大限度地降低分布式系统中的网络开销。
## 每日一练
1. DAGScheduler在创建Tasks的过程中,是如何设置每一个任务的本地性级别?
2. 在计算与存储分离的云计算环境中,Node local本地性级别成立吗?你认为哪些情况下成立?哪些情况下不成立?
期待在留言区看到你的思考和答案,如果你的朋友也正急需搞清楚调度系统的工作原理,也欢迎你把这一讲转发给他,我们下一讲见!