You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
18 KiB
Markdown

2 years ago
# 14 | 台前幕后DataFrame与Spark SQL的由来
你好,我是吴磊。
在上一讲结合“小汽车倍率分析”的例子我们学习了在Spark SQL子框架下做应用开发的一般模式。我们先是使用SparkSession的read API来创建DataFrame然后以DataFrame为入口通过调用各式各样的算子来完成不同DataFrame之间的转换从而进行数据分析。
尽管我们说过你可以把DataFrame看作是一种特殊的RDD但你可能仍然困惑DataFrame到底跟RDD有什么本质区别。Spark已经有了RDD这个开发入口为什么还要重复造轮子整出个DataFrame来呢
相信学完了上一讲这些问题一定萦绕在你的脑海里挥之不去。别着急今天我们就来高屋建瓴地梳理一下DataFrame的来龙去脉然后再追本溯源看看帮助DataFrame崭露头角的幕后大佬Spark SQL又是怎么回事儿。
## RDD之殇优化空间受限
在RDD算子那一讲[第3讲](https://time.geekbang.org/column/article/418079)我们曾经留过一道思考题像map、mapPartitions、filter、flatMap这些算子它们之间都有哪些共性
今天我们从一个全新的视角来重新审视这个问题。先说结论它们都是高阶函数Higher-order Functions
所谓高阶函数,它指的是形参为函数的函数,或是返回类型为函数的函数。换句话说,高阶函数,首先本质上也是函数,特殊的地方在于它的形参和返回类型,这两者之中只要有一个是函数类型,那么原函数就属于高阶函数。
上面提到的这些算子如map、filter它们都需要一个辅助函数f来作为形参通过调用map(f)、filter(f)才能完成计算。以map为例我们需要函数f来明确对哪些字段做映射以什么规则映射。filter也一样我们需要函数f来指明以什么条件在哪些字段上过滤。
但是这样一来Spark只知道开发者要做map、filter但并不知道开发者打算怎么做map和filter。换句话说对于Spark来说辅助函数f是透明的。**在RDD的开发框架下Spark Core只知道开发者要“做什么”而不知道“怎么做”。**这让Spark Core两眼一抹黑除了把函数f以闭包的形式打发到Executors以外实在是没有什么额外的优化空间。而这就是RDD之殇。
## DataFrame横空出世
针对RDD优化空间受限的问题Spark社区在1.3版本发布了DataFrame。那么相比RDDDataFrame到底有何不同呢我们不妨从两个方面来对比它们的不同一个是数据的表示形式Data Representation另一个是开发算子。
DataFrame与RDD一样都是用来封装分布式数据集的。但在数据表示方面就不一样了DataFrame是携带数据模式Data Schema的结构化数据而RDD是不携带Schema的分布式数据集。恰恰是因为有了Schema提供明确的类型信息Spark才能耳聪目明有针对性地设计出更紧凑的数据结构从而大幅度提升数据存储与访问效率。
在开发API方面RDD算子多采用高阶函数高阶函数的优势在于表达能力强它允许开发者灵活地设计并实现业务逻辑。而DataFrame的表达能力却很弱它定义了一套DSL算子Domain Specific Language如我们上一节课用到的select、filter、agg、groupBy等等它们都属于DSL算子。
DSL语言往往是为了解决某一类特定任务而设计非图灵完备因此在表达能力方面非常有限。DataFrame的算子大多数都是标量函数Scalar Functions它们的形参往往是结构化二维表的数据列Columns
尽管DataFrame算子在表达能力方面更弱但是DataFrame每一个算子的计算逻辑都是确定的比如select用于提取某些字段groupBy用于对数据做分组等等。这些计算逻辑对Spark来说不再是透明的因此Spark可以基于启发式的规则或策略甚至是动态的运行时信息去优化DataFrame的计算过程。
总结下来相比RDDDataFrame通过携带明确类型信息的Schema、以及计算逻辑明确的转换算子为Spark引擎的内核优化打开了全新的空间。
## 幕后英雄Spark SQL
那么问题来了优化空间打开之后真正负责优化引擎内核Spark Core的那个幕后英雄是谁相信不用我说你也能猜到它就是Spark SQL。
想要吃透Spark SQL我们先得弄清楚它跟Spark Core的关系。随着学习进程的推进我们接触的新概念、知识点会越来越多厘清Spark SQL与Spark Core的关系有利于你构建系统化的知识体系和全局视角从而让你在学习的过程中“既见树木、也见森林”。
首先Spark Core特指Spark底层执行引擎Execution Engine它包括了我们在基础知识篇讲过的调度系统、存储系统、内存管理、Shuffle管理等核心功能模块。而Spark SQL则凌驾于Spark Core之上是一层独立的优化引擎Optimization Engine。换句话说Spark Core负责执行而Spark SQL负责优化Spark SQL优化过后的代码依然要交付Spark Core来做执行。
![图片](https://static001.geekbang.org/resource/image/3e/1d/3e410fb54d3b69358ca72ffc321dcd1d.jpg?wh=1920x587 "Spark SQL与Spark Core的关系")
再者从开发入口来说在RDD框架下开发的应用程序会直接交付Spark Core运行。而使用DataFrame API开发的应用则会先过一遍Spark SQL由Spark SQL优化过后再交由Spark Core去做执行。
弄清二者的关系与定位之后接下来的问题是“基于DataFrameSpark SQL是如何进行优化的呢”要回答这个问题我们必须要从Spark SQL的两个核心组件说起Catalyst优化器和Tungsten。
先说Catalyst优化器它的职责在于创建并优化执行计划它包含3个功能模块分别是创建语法树并生成执行计划、逻辑阶段优化和物理阶段优化。Tungsten用于衔接Catalyst执行计划与底层的Spark Core执行引擎它主要负责优化数据结果与可执行代码。
![图片](https://static001.geekbang.org/resource/image/cb/fa/cbfdebe214a4d0f89ff1f4704e5913fa.jpg?wh=1920x849 "Catalyst优化器与Tungsten")
接下来我们结合上一讲“倍率分析”的例子来说一说那段代码在Spark SQL这一层是如何被优化的。我把“倍率分析”完整的代码实现贴在了这里你不妨先简单回顾一下。
```scala
import org.apache.spark.sql.DataFrame
 
val rootPath: String = _
// 申请者数据
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark是spark-shell中默认的SparkSession实例
// 通过read API读取源文件
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
 
// 中签者数据
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// 通过read API读取源文件
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
 
// 过滤2016年以后的中签数据且仅抽取中签号码carNum字段
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")
 
// 摇号数据与中签数据做内关联Join Key为中签号码carNum
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")
 
// 以batchNum、carNum做分组统计倍率系数
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum"))
.agg(count(lit(1)).alias("multiplier"))
 
// 以carNum做分组保留最大的倍率系数
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum")
.agg(max("multiplier").alias("multiplier"))
 
// 以multiplier倍率做分组统计人数
val result: DataFrame = uniqueMultipliers.groupBy("multiplier")
.agg(count(lit(1)).alias("cnt"))
.orderBy("multiplier")
 
result.collect
```
### Catalyst优化器
首先我们先来说说Catalyst的优化过程。基于代码中DataFrame之间确切的转换逻辑Catalyst会先使用第三方的SQL解析器ANTLR生成抽象语法树ASTAbstract Syntax Tree。AST由节点和边这两个基本元素构成其中节点就是各式各样的操作算子如select、filter、agg等而边则记录了数据表的Schema信息如字段名、字段类型等等。
以下图“倍率分析”的语法树为例它实际上描述了从源数据到最终计算结果之间的转换过程。因此在Spark SQL的范畴内AST语法树又叫作“执行计划”Execution Plan
![图片](https://static001.geekbang.org/resource/image/73/cb/73b8688bbd3564f30e856d9df46a8ccb.jpg?wh=1920x1598 "“倍率分析”的AST语法树 / 执行计划")
可以看到由算子构成的语法树、或者说执行计划给出了明确的执行步骤。即使不经过任何优化Spark Core也能把这个“原始的”执行计划按部就班地运行起来。
不过,从执行效率的角度出发,这么做并不是最优的选择。为什么这么说呢?我们以图中绿色的节点为例,**Scan用于全量扫描并读取中签者数据Filter则用来过滤出摇号批次大于等于“201601”的数据Select节点的作用则是抽取数据中的“carNum”字段**。
还记得吗我们的源文件是以Parquet格式进行存储的而Parquet格式在文件层面**支持“谓词下推”Predicates Pushdown和“列剪枝”Columns Pruning这两项特性**。
谓词下推指的是利用像“batchNum >= 201601”这样的过滤条件在扫描文件的过程中只读取那些满足条件的数据文件。又因为Parquet格式属于列存Columns Store数据结构因此Spark只需读取字段名为“carNum”的数据文件而“剪掉”读取其他数据文件的过程。
![图片](https://static001.geekbang.org/resource/image/72/e4/72781191ddf37608602cdb0690c0e9e4.jpg?wh=1920x455 "谓词下推与列剪枝示意图")
以中签数据为例在谓词下推和列剪枝的帮助下Spark Core只需要扫描图中绿色的文件部分。显然这两项优化都可以有效帮助Spark Core大幅削减数据扫描量、降低磁盘I/O消耗从而显著提升数据的读取效率。
因此如果能把3个绿色节点的执行顺序从“Scan > Filter > Select”调整为“Filter > Select > Scan”那么相比原始的执行计划调整后的执行计划能给Spark Core带来更好的执行性能。
像谓词下推、列剪枝这样的特性都被称为启发式的规则或策略。而Catalyst优化器的核心职责之一就是在逻辑优化阶段基于启发式的规则和策略调整、优化执行计划为物理优化阶段提升性能奠定基础。经过逻辑阶段的优化之后原始的执行计划调整为下图所示的样子请注意绿色节点的顺序变化。
![图片](https://static001.geekbang.org/resource/image/57/dd/57029cabc2155c72ddbffb6c8ab440dd.jpg?wh=1920x1598 "经过逻辑优化的执行计划")
经过逻辑阶段优化的执行计划依然可以直接交付Spark Core去运行不过在性能优化方面Catalyst并未止步于此。
除了逻辑阶段的优化Catalyst在物理优化阶段还会进一步优化执行计划。与逻辑阶段主要依赖先验的启发式经验不同物理阶段的优化主要依赖各式各样的统计信息如数据表尺寸、是否启用数据缓存、Shuffle中间文件等等。换句话说**逻辑优化更多的是一种“经验主义”,而物理优化则是“用数据说话”**。
以图中蓝色的Join节点为例执行计划仅交代了applyNumbersDF与filteredLuckyDogs这两张数据表需要做内关联但是它并没有交代清楚这两张表具体采用哪种机制来做关联。按照实现机制来分类数据关联有3种实现方式分别是嵌套循环连接NLJNested Loop Join、排序归并连接Sort Merge Join和哈希连接Hash Join
而按照数据分发方式来分类数据关联又可以分为Shuffle Join和Broadcast Join这两大类。因此在分布式计算环境中至少有6种Join策略供Spark SQL来选择。对于这6种Join策略我们以后再详细展开这里你只需要了解不同策略在执行效率上有着天壤之别即可。
回到蓝色Join节点的例子在物理优化阶段Catalyst优化器需要结合applyNumbersDF与filteredLuckyDogs这两张表的存储大小来决定是采用运行稳定但性能略差的Shuffle Sort Merge Join还是采用执行性能更佳的Broadcast Hash Join。
不论Catalyst决定采用哪种Join策略优化过后的执行计划都可以丢给Spark Core去做执行。不过Spark SQL优化引擎并没有就此打住当Catalyst优化器完成它的“历史使命”之后Tungsten会接过接力棒在Catalyst输出的执行计划之上继续打磨、精益求精力求把最优的执行代码交付给底层的SparkCore执行引擎。
![图片](https://static001.geekbang.org/resource/image/cb/fa/cbfdebe214a4d0f89ff1f4704e5913fa.jpg?wh=1920x849 "Catalyst优化器与Tungsten")
### Tungsten
站在Catalyst这个巨人的肩膀上Tungsten主要是在数据结构和执行代码这两个方面做进一步的优化。数据结构优化指的是Unsafe Row的设计与实现执行代码优化则指的是全阶段代码生成WSCGWhole Stage Code Generation
我们先来看看为什么要有Unsafe Row。对于DataFrame中的每一条数据记录Spark SQL默认采用org.apache.spark.sql.Row对象来进行封装和存储。我们知道使用Java Object来存储数据会引入大量额外的存储开销。
为此Tungsten设计并实现了一种叫做Unsafe Row的二进制数据结构。**Unsafe Row本质上是字节数组它以极其紧凑的格式来存储DataFrame的每一条数据记录大幅削减存储开销从而提升数据的存储与访问效率**。
以下表的Data Schema为例对于包含如下4个字段的每一条数据记录来说如果采用默认的Row对象进行存储的话那么每条记录需要消耗至少60个字节。
![图片](https://static001.geekbang.org/resource/image/24/25/24675c8d5e31c51e7yyd6336acf3f525.jpg?wh=1920x833 "数据表的Data Schema")
但如果用Tungsten Unsafe Row数据结构进行存储的话每条数据记录仅需消耗十几个字节如下图所示。
![图片](https://static001.geekbang.org/resource/image/6e/c7/6eb33b3yy4b4cd658e9739b8a75321c7.jpg?wh=1920x701 "使用Unsafe Row来存储数据记录")
说完了Unsafe Row的数据结构优化接下来我们再来说说WSCG全阶段代码生成。所谓全阶段其实就是我们在调度系统中学过的Stage。以图中的执行计划为例标记为绿色的3个节点在任务调度的时候会被划分到同一个Stage。
![图片](https://static001.geekbang.org/resource/image/57/dd/57029cabc2155c72ddbffb6c8ab440dd.jpg?wh=1920x1598 "绿色节点同属于一个Stage")
而代码生成指的是Tungsten在运行时把算子之间的“链式调用”捏合为一份代码。以上图3个绿色的节点为例在默认情况下Spark Core会对每一条数据记录都依次执行Filter、Select和Scan这3个操作。
经过了Tungsten的WSCG优化之后Filter、Select和Scan这3个算子会被“捏合”为一个函数f。这样一来Spark Core只需要使用函数f来一次性地处理每一条数据就能消除不同算子之间数据通信的开销一气呵成地完成计算。
好啦到此为止分别完成Catalyst和Tungsten这两个优化环节之后Spark SQL终于“心满意足”地把优化过的执行计划、以及生成的执行代码交付给老大哥Spark Core。Spark Core拿到计划和代码在运行时利用Tungsten Unsafe Row的数据结构完成分布式任务计算。到此我们这一讲的内容也就讲完了。
## 重点回顾
今天这一讲涉及的内容很多,我们一起做个总结。
首先在RDD开发框架下Spark Core的优化空间受限。绝大多数RDD高阶算子所封装的封装的计算逻辑形参函数f对于Spark Core是透明的Spark Core除了用闭包的方式把函数f分发到Executors以外没什么优化余地。
而DataFrame的出现带来了新思路它携带的Schema提供了丰富的类型信息而且DataFrame算子大多为处理数据列的标量函数。DataFrame的这两个特点为引擎内核的优化打开了全新的空间。在DataFrame的开发框架下负责具体优化过程的正是Spark SQL。
**Spark SQL则是凌驾于Spark Core之上的一层优化引擎它的主要职责是在用户代码交付Spark Core之前对用户代码进行优化。**
![图片](https://static001.geekbang.org/resource/image/cb/fa/cbfdebe214a4d0f89ff1f4704e5913fa.jpg?wh=1920x849)
Spark SQL由两个核心组件构成分别是Catalyst优化器和Tungsten其优化过程也分为Catalyst和Tungsten两个环节。
在Catalyst优化环节Spark SQL首先把用户代码转换为AST语法树又叫执行计划然后分别通过逻辑优化和物理优化来调整执行计划。逻辑阶段的优化主要通过先验的启发式经验如谓词下推、列剪枝对执行计划做优化调整。而物理阶段的优化更多是利用统计信息选择最佳的执行机制、或添加必要的计算节点。
Tungsten主要从数据结构和执行代码两个方面进一步优化。与默认的Java Object相比二进制的Unsafe Row以更加紧凑的方式来存储数据记录大幅提升了数据的存储与访问效率。全阶段代码生成消除了同一Stage内部不同算子之间的数据传递把多个算子融合为一个统一的函数并将这个函数一次性地作用Apply到数据之上相比不同算子的“链式调用”这会显著提升计算效率。
## **每课一练**
学完这一讲之后我们知道只有DataFrame才能“享受”到Spark SQL的优化过程而RDD只能直接交付Spark Core执行。那么这是不是意味着RDD开发框架会退出历史舞台而我们之前学过的与RDD有关的知识点如RDD概念、RDD属性、RDD算子都白学了呢
![图片](https://static001.geekbang.org/resource/image/3e/1d/3e410fb54d3b69358ca72ffc321dcd1d.jpg?wh=1920x587 "Spark SQL与Spark Core的关系
")
欢迎你在留言区和我交流讨论,也推荐你把这一讲的内容分享给更多朋友。