You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

21 KiB

05 | 调度系统:如何把握分布式计算的精髓?

你好,我是吴磊。

在上一讲我们通过“包工头与施工工人”的例子初步认识了Spark进程模型中的Driver和Executors、以及它们之间的交互关系。Driver负责解析用户代码、构建计算流图然后将计算流图转化为分布式任务并把任务分发给集群中的Executors交付运行。

不过你可能会好奇“对于给定的用户代码和相应的计算流图Driver是怎么把计算图拆解为分布式任务又是按照什么规则分发给Executors的呢还有Executors具体又是如何执行分布式任务的呢

我们之前一再强调,分布式计算的精髓,在于如何把抽象的计算图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。深入理解分布式计算,是我们做好大数据开发的关键和前提,它能有效避免我们掉入“单机思维”的陷阱,同时也能为性能导向的开发奠定坚实基础。

而上面的这一系列问题恰恰是我们吃透分布式计算的关键所在。因此今天这一讲我们就顺着这些问题一起去深入探究Spark调度系统进而弄清楚分布式计算的来龙去脉。

角色划分与斯巴克建筑集团

在上一讲我们通过“包工头与施工工人”的类比、以及Word Count的示例其实已经大致厘清了Spark分布式任务调度的核心环节与关键步骤。今天这一讲的核心任务就是带你去深入其中的每一个环节做到“既见森林、也见树木”。这里咱们不妨先把这些环节和涉及的组件梳理出来从而让你在整体上有一个清晰的把握。

图片

不难发现表中的步骤与组件众多要是照本宣科地去讲调度系统先别说你可能看着看着就开始犯困了就连我自己也可能写着写着就睡着了。因此咱们不妨把这些环节与组件融入到一个故事中去让你像读小说一样在捧腹之余弄懂Spark调度系统。

话说很久以前美国有一家名扬海内外的建筑集团名为“斯巴克Spark”。这家建筑集团规模庞大设有一个总公司Driver和多个分公司Executors。斯巴克公司的主要服务对象是建筑设计师开发者建筑设计师负责提供设计图纸用户代码、计算图而斯巴克公司的主营业务是将图纸落地、建造起一栋栋高楼大厦。

要完成主营业务集团公司需要招聘能够看懂图纸、并将其转化为建筑项目的架构师因此斯巴克公司挖角了行业知名架构师“戴格”DAGScheduler。集团公司给戴格安排的职位是总公司的一把手同时要求两位创始元老“塔斯克”和“拜肯德”全力配合戴格的工作。

听到这里,你肯定会问“塔斯克”和“拜肯德”是谁呢?

塔斯克TaskScheduler一毕业就加入了斯巴克公司现任总公司施工经理成功指挥完成了多个大大小小的工程项目业绩非常突出深得公司赏识。拜肯德SchedulerBackend和塔斯克在上大学的时候就是上下铺关系好得穿一条裤子现任总公司人力资源总监负责与分公司协调、安排人力资源。从公司的安排来看三位主管的分工还是比较明确的。

图片

之所以说塔斯克TaskScheduler和拜肯德SchedulerBackend是公司元老原因在于在SparkContext / SparkSession的初始化中TaskScheduler和SchedulerBackend是最早、且同时被创建的调度系统组件。这二者的关系非常微妙SchedulerBackend在构造方法中引用TaskScheduler而TaskScheduler在初始化时会引用SchedulerBackend。

值得一提的是SchedulerBackend组件的实例化取决于开发者指定的Spark MasterURL也就是我们使用spark-shell或是spark-submit时指定的master 参数如“master spark://ip:host”就代表Standalone 部署模式master yarn”就代表YARN 模式等等。

不难发现SchedulerBackend 与资源管理器Standalone、YARN、Mesos等强绑定是资源管理器在 Spark 中的代理。其实硬件资源与人力资源一样都是“干活儿的”。所以如果我们用集团公司的人力资源来类比Spark集群的硬件资源那么“拜肯德”就是名副其实的人力资源总监。

从全局视角来看DAGScheduler是任务调度的发起者DAGScheduler以TaskSet为粒度向TaskScheduler提交任务调度请求。TaskScheduler在初始化的过程中会创建任务调度队列任务调度队列用于缓存 DAGScheduler提交的TaskSets。TaskScheduler结合SchedulerBackend提供的 WorkerOffer按照预先设置的调度策略依次对队列中的任务进行调度。

图片

简而言之DAGScheduler手里有“活儿”SchedulerBackend手里有“人力”TaskScheduler的核心职能就是把合适的“活儿”派发到合适的“人”的手里。由此可见TaskScheduler承担的是承上启下、上通下达的关键角色这也正是我们将“塔斯克”视为斯巴克建筑公司元老之一的重要原因。

那么,了解了这三个主管的角色职责,我们接下来就来详细说说,他们是怎么各自完成自己的工作的。

总架戴格DAGScheduler

回到我们的故事里,戴格在两位元老的协助下,工作开展得还算顺利,然而,冰层之下,暗流涌动,作为一名空降的领导,戴老板还需亲自“露两手”,才能赢得平级的认可与信任。

作为集团公司的“总架”总架构师戴格的核心职责是把计算图DAG拆分为执行阶段StagesStages指的是不同的运行阶段同时还要负责把Stages转化为任务集合TaskSets也就是把“建筑图纸”转化成可执行、可操作的“建筑项目”。

用一句话来概括从 DAG 到 Stages 的拆分过程,那就是:以 Actions 算子为起点,从后向前回溯 DAG以 Shuffle 操作为边界去划分 Stages。

第2讲介绍编程模型的时候我们以Word Count为例提到Spark作业的运行分为两个环节第一个是以惰性的方式构建计算图第二个则是通过Actions算子触发作业的从头计算

图片

对于图中的第二个环节Spark在实际运行的过程中会把它再细化为两个步骤。第一个步骤就是以Shuffle为边界从后向前以递归的方式把逻辑上的计算图DAG转化成一个又一个Stages。

图片

我们还是以Word Count为例Spark以take算子为起点依次把DAG中的RDD划入到第一个Stage直到遇到reduceByKey算子。由于reduceByKey算子会引入Shuffle因此第一个Stage创建完毕且只包含wordCounts这一个RDD。接下来Spark继续向前回溯由于未曾碰到会引入Shuffle的算子因此它把“沿途”所有的RDD都划入了第二个Stage。

在Stages创建完毕之后就到了触发计算的第二个步骤Spark从后向前以递归的方式依次提请执行所有的Stages

图片

具体来说在Word Count的例子中DAGScheduler最先提请执行的是Stage1。在提交的时候DAGScheduler发现Stage1依赖的父Stage也就是Stage0还没有执行过那么这个时候它会把Stage1的提交动作压栈转而去提请执行Stage0。当Stage0执行完毕的时候DAGScheduler通过出栈的动作再次提请执行Stage 1。

对于提请执行的每一个StageDAGScheduler根据Stage内RDD的partitions属性创建分布式任务集合TaskSet。TaskSet包含一个又一个分布式任务TaskRDD有多少数据分区TaskSet就包含多少个Task。换句话说Task与RDD的分区是一一对应的。

你可能会问“Task代表的是分布式任务不过它具体是什么呢”要更好地认识Task我们不妨来看看它的关键属性。

图片

在上表中stageId、stageAttemptId标记了Task与执行阶段Stage的所属关系taskBinary则封装了隶属于这个执行阶段的用户代码partition就是我们刚刚说的RDD数据分区locs属性以字符串的形式记录了该任务倾向的计算节点或是Executor ID。

不难发现taskBinary、partition和locs这三个属性一起描述了这样一件事情Task应该在哪里locs为谁partition执行什么任务taskBinary

到这里我们讲完了戴格的职责让我们来一起简单汇总一下戴格指代的是DAGSchedulerDAGScheduler的主要职责有三个

  • 根据用户代码构建DAG
  • 以Shuffle为边界切割Stages
  • 基于Stages创建TaskSets并将TaskSets提交给TaskScheduler请求调度。

现在,戴格不辱使命,完成了“建筑图纸”到“建筑项目”的转化,接下来,他需要把这些“活儿”下派给塔斯克,由塔斯克进一步完成任务的委派。

图片

不过,对于塔斯克来说,要想把这些“活儿”委派出去,他得先摸清楚集团内有多少“适龄劳动力”才行。要做到这一点,他必须仰仗死党:拜肯德的帮忙。

拜肯德SchedulerBackend

作为集团公司的人力资源总监拜肯德的核心职责就是实时汇总并掌握全公司的人力资源状况。前面我们讲了全公司的人力资源对应的就是Spark的计算资源。对于集群中可用的计算资源SchedulerBackend用一个叫做ExecutorDataMap的数据结构来记录每一个计算节点中Executors的资源状态。

这里的ExecutorDataMap是一种HashMap它的Key是标记 Executor 的字符串Value是一种叫做ExecutorData的数据结构。ExecutorData用于封装Executor的资源状态如RPC地址、主机地址、可用CPU核数和满配CPU核数等等它相当于是对Executor做的“资源画像”。

图片

有了ExecutorDataMap这本“人力资源小册子”对内SchedulerBackend可以就Executor做“资源画像”对外SchedulerBackend以WorkerOffer为粒度提供计算资源。其中WorkerOffer封装了Executor ID、主机地址和CPU核数它用来表示一份可用于调度任务的空闲资源。

显然基于Executor资源画像SchedulerBackend可以同时提供多个WorkerOffer用于分布式任务调度。WorkerOffer这个名字起得很传神Offer的字面意思是公司给你提供的工作机会到了Spark调度系统的上下文它就变成了使用硬件资源的机会。

图片

你可能会好奇坐镇总公司的拜肯德对于整个集团的人力资源他是怎么做到足不出户就如数家珍的一个篱笆三个桩一个好汉三个帮。仅凭拜肯德一己之力自然是力不从心幕后功臣实际上是驻扎在分公司的一众小弟们ExecutorBackend。

SchedulerBackend与集群内所有Executors中的ExecutorBackend保持周期性通信双方通过LaunchedExecutor、RemoveExecutor、StatusUpdate等消息来互通有无、变更可用计算资源。拜肯德正是通过这些小弟发送的“信件”来不停地更新自己手中的那本小册子从而对集团人力资源了如指掌。

图片

塔斯克TaskScheduler

一把手戴格有“活儿”三把手拜肯德出“人力”接下来终于轮到牵线搭桥的塔斯克出马了。作为施工经理塔斯克的核心职责是给定拜肯德提供的“人力”遴选出最合适的“活儿”并派发出去。而这个遴选的过程就是任务调度的核心所在如下图步骤3所示

图片

那么问题来了对于SchedulerBackend提供的一个个WorkerOfferTaskScheduler是依据什么规则来挑选Tasks的呢

用一句话来回答对于给定的WorkerOffer**TaskScheduler是按照任务的本地倾向性来遴选出TaskSet中适合调度的Tasks。**这是什么意思呢听上去比较抽象我们还是从DAGScheduler在Stage内创建任务集TaskSet说起。

我们刚刚说过Task与RDD的partitions是一一对应的在创建Task的过程中DAGScheduler会根据数据分区的物理地址来为Task设置locs属性。locs属性记录了数据分区所在的计算节点、甚至是Executor进程ID。

举例来说当我们调用textFile API从HDFS文件系统中读取源文件时Spark会根据HDFS NameNode当中记录的元数据获取数据分区的存储地址例如node0:/rootPath/partition0-replica0node1:/rootPath/partition0-replica1和node2:/rootPath/partition0-replica2。

那么DAGScheduler在为该数据分区创建Task0的时候会把这些地址中的计算节点记录到Task0的locs属性。

如此一来当TaskScheduler需要调度Task0这个分布式任务的时候根据Task0的locs属性它就知道“Task0所需处理的数据分区在节点node0、node1、node2上存有副本因此如果WorkOffer是来自这3个节点的计算资源那对Task0来说就是投其所好”。

从这个例子我们就能更好地理解,每个任务都是自带本地倾向性的,换句话说,每个任务都有自己的“调度意愿”。

回到斯巴克建筑集团的类比就好比是某个“活儿”并不是所有人都能干而是只倾向于让某些人来做因为他们更专业。比如砌墙这件事更倾向于给工龄3年以上的瓦工来做而吊顶则更倾向于给经验超过5年的木工来做诸如此类。

像上面这种定向到计算节点粒度的本地性倾向Spark中的术语叫做NODE_LOCAL。除了定向到节点Task还可以定向到进程Executor、机架、任意地址它们对应的术语分别是PROCESS_LOCAL、RACK_LOCAL和ANY。

对于倾向PROCESS_LOCAL的Task来说它要求对应的数据分区在某个进程Executor中存有副本而对于倾向RACK_LOCAL的Task来说它仅要求相应的数据分区存在于同一机架即可。ANY则等同于无定向也就是Task对于分发的目的地没有倾向性被调度到哪里都可以。

下图展示的是TaskScheduler依据本地性倾向依次进行任务调度的运行逻辑

图片

不难发现从PROCESS_LOCAL、NODE_LOCAL、到RACK_LOCAL、再到ANYTask的本地性倾向逐渐从严苛变得宽松。TaskScheduler接收到WorkerOffer之后也正是按照这个顺序来遍历TaskSet中的Tasks优先调度本地性倾向为PROCESS_LOCAL的Task而NODE_LOCAL次之RACK_LOCAL为再次最后是ANY。

你可能会问“Spark区分对待不同的本地倾向性它的初衷和意图是什么呢”实际上不同的本地性倾向本质上是用来区分计算代码与数据之间的关系。

**Spark调度系统的核心思想是“数据不动、代码动”。**也就是说在任务调度的过程中为了完成分布式计算Spark倾向于让数据待在原地、保持不动而把计算任务代码调度、分发到数据所在的地方从而消除数据分发引入的性能隐患。毕竟相比分发数据分发代码要轻量得多。

本地性倾向则意味着代码和数据应该在哪里“相会”PROCESS_LOCAL是在JVM进程中NODE_LOCAL是在节点内RACK_LOCAL是不超出物理机架的范围而ANY则代表“无所谓、不重要”。

图片

好啦到此为止结合WorkerOffer与任务的本地性倾向塔斯克TaskScheduler挑选出了适合调度的“活儿”Tasks。接下来TaskScheduler就把这些Tasks通过LaunchTask消息发送给好基友SchedulerBackend。人力资源总监SchedulerBackend拿到这些活儿之后同样使用LaunchTask消息把活儿进一步下发给分公司的小弟ExecutorBackend。

那么小弟ExecutorBackend拿到活之后是怎么工作的呢我们接着往下看吧

付诸执行ExecutorBackend

作为分公司的人力资源主管ExecutorBackend拿到“活儿”之后随即把活儿派发给分公司的建筑工人。这些工人就是Executors线程池中一个又一个的CPU线程每个线程负责处理一个Task。

每当Task处理完毕这些线程便会通过ExecutorBackend向Driver端的SchedulerBackend发送StatusUpdate事件告知Task执行状态。接下来TaskScheduler与SchedulerBackend通过接力的方式最终把状态汇报给DAGScheduler如图中步骤7、8、9所示

图片

对于同一个TaskSet当中的Tasks来说当它们分别完成了任务调度与任务执行这两个环节时也就是上图中步骤1到步骤9的计算过程Spark调度系统就完成了DAG中某一个Stage的任务调度。

不过故事到这里并未结束。我们知道一个DAG会包含多个Stages一个Stage的结束即宣告下一个Stage的开始而这也是戴格起初将DAG划分为Stages的意义所在。只有当所有的Stages全部调度、执行完毕才表示一个完整的Spark作业宣告结束。

路遥知马力,在一起合作了一个又一个建筑项目之后,空降老大戴格终于赢得了元老塔斯克和拜肯德的信任与认可,坐稳了斯巴克建筑集团的头把交椅。来日可期,戴格的前景一片光明。

重点回顾

今天这一讲我们用斯巴克建筑集团的故事介绍了Spark调度系统的工作原理。对于调度系统的工作流程你需要掌握表格中的5个关键环节

图片

具体说来任务调度分为如下5个步骤

1.DAGScheduler以Shuffle为边界将开发者设计的计算图DAG拆分为多个执行阶段Stages然后为每个Stage创建任务集TaskSet。
2.SchedulerBackend通过与Executors中的ExecutorBackend的交互来实时地获取集群中可用的计算资源并将这些信息记录到ExecutorDataMap数据结构。
3.与此同时SchedulerBackend根据ExecutorDataMap中可用资源创建WorkerOffer以WorkerOffer为粒度提供计算资源。
4.对于给定WorkerOfferTaskScheduler结合TaskSet中任务的本地性倾向按照PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL和ANY的顺序依次对TaskSet中的任务进行遍历优先调度本地性倾向要求苛刻的Task。
5.被选中的Task由TaskScheduler传递给SchedulerBackend再由SchedulerBackend分发到Executors中的ExecutorBackend。Executors接收到Task之后即调用本地线程池来执行分布式任务。

今天的内容就是这些调度系统是分布式计算系统的核心掌握了Spark任务调度的来龙去脉你也就把握住了Spark分布式计算引擎的精髓这会为你开发出高性能的Spark分布式应用打下坚实基础。

每课一练

课程的最后我来给你留一道练习题。请你想一想DAGScheduler如何得知一个Stage中所有的Tasks都已调度、执行完毕然后才决定开始调度DAG中的下一个Stage

欢迎你在评论区回答这个问题。如果你觉得这一讲对你有所帮助,也欢迎你把它分享给更多的朋友和同事。我在评论区等你,咱们下一讲见!