You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

14 KiB

20 | RDD和DataFrame既生瑜何生亮

你好,我是吴磊。

从今天开始我们进入Spark SQL性能调优篇的学习。在这一篇中我会先带你学习Spark SQL已有的优化机制如Catalyst、Tungsten这些核心组件以及AQE、DPP等新特性。深入理解这些内置的优化机制会让你在开发应用之初就有一个比较高的起点。然后针对数据分析中的典型场景如数据关联我们再去深入探讨性能调优的方法和技巧。

今天这一讲我们先来说说RDD和DataFrame的渊源。这也是面试的时候面试官经常会问的。比如说“Spark 3.0大版本发布Spark SQL的优化占比将近50%而像PySpark、Mllib和Streaming的优化占比都不超过10%Graph的占比几乎可以忽略不计。这是否意味着Spark社区逐渐放弃了其他计算领域只专注于数据分析

这个问题的标准答案是“Spark SQL取代Spark Core成为新一代的引擎内核所有其他子框架如Mllib、Streaming和Graph都可以共享Spark SQL的性能优化都能从Spark社区对于Spark SQL的投入中受益。”不过面试官可没有那么好对付一旦你这么说他/她可能会追问“为什么需要Spark SQL这个新一代引擎内核Spark Core有什么问题吗Spark SQL解决了Spark Core的哪些问题怎么解决的

面对这一连串“箭如雨发”的追问你还能回答出来吗接下来我就从RDD的痛点说起一步一步带你探讨DataFrame出现的必然性Spark Core的局限性以及它和Spark SQL的关系。

RDD之痛优化空间受限

自从Spark社区在1.3版本发布了DataFrame它就开始代替RDD逐渐成为开发者的首选。我们知道新抽象的诞生一定是为了解决老抽象不能搞定的问题。那么这些问题都是什么呢下面我们就一起来分析一下。

在RDD的开发框架下我们调用RDD算子进行适当的排列组合就可以很轻松地实现业务逻辑。我把这些使用频繁的RDD算子总结到了下面的表格里你可以看一看。

表格中高亮显示的就是RDD转换和聚合算子它们都是高阶函数。高阶函数指的是形参包含函数的函数或是返回结果包含函数的函数。为了叙述方便我们把那些本身是高阶函数的RDD算子简称“高阶算子”。

**对于这些高阶算子开发者需要以Lambda函数的形式自行提供具体的计算逻辑。**以map为例我们需要明确对哪些字段做映射以什么规则映射。再以filter为例我们需要指明以什么条件在哪些字段上过滤。

但这样一来Spark只知道开发者要做map、filter但并不知道开发者打算怎么做map和filter。也就是说**在RDD的开发模式下Spark Core只知道“做什么”而不知道“怎么做”。**这会让Spark Core两眼一抹黑除了把Lambda函数用闭包的形式打发到Executors以外实在是没有什么额外的优化空间。

**对于Spark Core来说优化空间受限最主要的影响莫过于让应用的执行性能变得低下。**一个典型的例子就是相比Java或者ScalaPySpark实现的应用在执行性能上相差悬殊。原因在于在RDD的开发模式下即便是同一个应用不同语言实现的版本在运行时也会有着天壤之别。

当我们使用Java或者Scala语言做开发时所有的计算都在JVM进程内完成如图中左侧的Spark计算节点所示。

而当我们在PySpark上做开发的时候只能把由RDD算子构成的计算代码一股脑地发送给Python进程。Python进程负责执行具体的脚本代码完成计算之后再把结果返回给Executor进程。由于每一个Task都需要一个Python进程如果RDD的并行度为#N那么整个集群就需要#N个这样的Python进程与Executors交互。不难发现其中的任务调度、数据计算和数据通信等开销正是PySpark性能低下的罪魁祸首。

DataFrame应运而生

针对优化空间受限这个核心问题Spark社区痛定思痛在2013年在1.3版本中发布了DataFrame。那么DataFrame的特点是什么它和RDD又有什么不同呢

首先用一句话来概括DataFrame就是携带数据模式Data Schema的结构化分布式数据集而RDD是不带Schema的分布式数据集。**因此从数据表示Data Representation的角度来看是否携带Schema是它们唯一的区别。**带Schema的数据表示形式决定了DataFrame只能封装结构化数据而RDD则没有这个限制所以除了结构化数据它还可以封装半结构化和非结构化数据。

其次从开发API上看RDD算子多是高阶函数这些算子允许开发者灵活地实现业务逻辑表达能力极强

DataFrame的表达能力却很弱。一来它定义了一套DSLDomain Specific Language算子如select、filter、agg、groupBy等等。由于DSL语言是为解决某一类任务而专门设计的计算机语言非图灵完备因此语言表达能力非常有限。二来DataFrame中的绝大多数算子都是标量函数Scalar Functions它们的形参往往是结构化的数据列Columns表达能力也很弱。

你可能会问“相比RDDDataFrame的表示和表达能力都变弱了那它是怎么解决RDD优化空间受限的核心痛点呢

当然仅凭DataFrame在API上的改动就想解决RDD的核心痛点比登天还难。DataFrame API最大的意义在于它为Spark引擎的内核优化打开了全新的空间。

首先DataFrame中Schema所携带的类型信息让Spark可以根据明确的字段类型设计定制化的数据结构从而大幅提升数据的存储和访问效率。其次DataFrame中标量算子确定的计算逻辑让Spark可以基于启发式的规则和策略甚至是动态的运行时信息去优化DataFrame的计算过程。

Spark SQL智能大脑

那么问题来了有了DataFrame API负责引擎内核优化的那个幕后英雄是谁为了支持DataFrame开发模式Spark从1.3版本开始推出Spark SQL。**Spark SQL的核心组件有二其一是Catalyst优化器其二是Tungsten。**关于Catalyst和Tungsten的特性和优化过程我们在后面的两讲再去展开今天这一讲咱们专注在它们和DataFrame的关系。

Catalyst执行过程优化

我们先来说说Catalyst的优化过程。当开发者通过Actions算子触发DataFrame的计算请求时Spark内部会发生一系列有趣的事情。

首先基于DataFrame确切的计算逻辑Spark会使用第三方的SQL解析器ANTLR来生成抽象语法树ASTAbstract Syntax Tree。既然是树就会有节点和边这两个基本的构成元素。节点记录的是标量算子如select、filter的处理逻辑边携带的是数据信息关系表和数据列如下图所示。这样的语法树描述了从源数据到DataFrame结果数据的转换过程。

在Spark中语法树还有个别名叫做“Unresolved Logical Plan”。它正是Catalyst优化过程的起点。之所以取名“Unresolved”是因为边上记录的关系表和数据列仅仅是一些字符串还没有和实际数据对应起来。举个例子Filter之后的那条边输出的数据列是joinKey和payLoad。这些字符串的来源是DataFrame的DSL查询Catalyst并不确定这些字段名是不是有效的更不知道每个字段都是什么类型。

因此,Catalyst做的第一步优化就是结合DataFrame的Schema信息确认计划中的表名、字段名、字段类型与实际数据是否一致。这个过程也叫做把“Unresolved Logical Plan”转换成“Analyzed Logical Plan”。

基于解析过后的“Analyzed Logical Plan”Catalyst才能继续做优化。利用启发式的规则和执行策略Catalyst最终把逻辑计划转换为可执行的物理计划。总之Catalyst的优化空间来源DataFrame的开发模式。

Tungsten数据结构优化

说完Catalyst我接着再来说说Tungsten。在开发原则那一讲我们提到过Tungsten使用定制化的数据结构Unsafe Row来存储数据Unsafe Row的优点是存储效率高、GC效率高。Tungsten之所以能够设计这样的数据结构仰仗的也是DataFrame携带的Schema。Unsafe Row我们之前讲过这里我再带你简单回顾一下。

Tungsten是用二进制字节序列来存储每一条用户数据的因此在存储效率上完胜Java Object。比如说如果我们要存储上表中的数据用Java Object来存储会消耗100个字节数而使用Tungsten仅需要不到20个字节如下图所示。

但是要想实现上图中的二进制序列Tungsten必须要知道数据条目的Schema才行。也就是说它需要知道每一个字段的数据类型才能决定在什么位置安放定长字段、安插Offset以及存放变长字段的数据值。DataFrame刚好能满足这个前提条件。

我们不妨想象一下如果数据是用RDD封装的Tungsten还有可能做到这一点吗当然不可能。这是因为虽然RDD也带类型如RDD[Int]、RDD[(Int, String)]但如果RDD中携带的是开发者自定义的数据类型如RDD[User]或是RDD[Product]Tungsten就会两眼一抹黑完全不知道你的User和Product抽象到底是什么。成也萧何、败也萧何RDD的通用性是一柄双刃剑在提供开发灵活性的同时也让引擎内核的优化变得无比困难。

**总的来说基于DataFrame简单的标量算子和明确的Schema定义借助Catalyst优化器和TungstenSpark SQL有能力在运行时构建起一套端到端的优化机制。这套机制运用启发式的规则与策略以及运行时的执行信息将原本次优、甚至是低效的查询计划转换为高效的执行计划从而提升端到端的执行性能。**因此在DataFrame的开发框架下不论你使用哪种开发语言开发者都能共享Spark SQL带来的性能福利。

最后我们再来回顾最开始提到的面试题“从2.0版本至今Spark对于其他子框架的完善与优化相比Spark SQL占比很低。这是否意味着Spark未来的发展重心是数据分析其他场景如机器学习、流计算会逐渐边缘化吗

最初Spark SQL确实仅仅是运行SQL和DataFrame应用的子框架但随着优化机制的日趋完善Spark SQL逐渐取代Spark Core演进为新一代的引擎内核。到目前为止所有子框架的源码实现都已从RDD切换到DataFrame。因此和PySpark一样像Streaming、Graph、Mllib这些子框架实际上都是通过DataFrame API运行在Spark SQL之上它们自然可以共享Spark SQL引入的种种优化机制。

形象地说Spark SQL就像是Spark的智能大脑凡是通过DataFrame这双“眼睛”看到的问题都会经由智能大脑这个指挥中心统筹地进行分析与优化优化得到的行动指令最终再交由Executors这些“四肢”去执行。

小结

今天我们围绕RDD的核心痛点探讨了DataFrame出现的必然性Spark Core的局限性以及它和Spark SQL的关系对Spark SQL有了更深刻的理解。

RDD的核心痛点是优化空间有限它指的是RDD高阶算子中封装的函数对于Spark来说完全透明因此Spark对于计算逻辑的优化无从下手。

相比RDDDataFrame是携带Schema的分布式数据集只能封装结构化数据。DataFrame的算子大多数都是普通的标量函数以消费数据列为主。但是DataFrame更弱的表示能力和表达能力反而为Spark引擎的内核优化打开了全新的空间。

根据DataFrame简单的标量算子和明确的Schema定义借助Catalyst优化器和TungstenSpark SQL有能力在运行时构建起一套端到端的优化机制。这套机制运用启发式的规则与策略和运行时的执行信息将原本次优、甚至是低效的查询计划转换为高效的执行计划从而提升端到端的执行性能。

在DataFrame的开发模式下所有子框架、以及PySpark都运行在Spark SQL之上都可以共享Spark SQL提供的种种优化机制这也是为什么Spark历次发布新版本、Spark SQL占比最大的根本原因。

每日一练

  1. Java Object在对象存储上为什么会有比较大的开销JVM需要多少个字节才能存下字符串“abcd”
  2. 在DataFrame的开发框架下 PySpark中还有哪些操作是“顽固分子”会导致计算与数据在JVM进程与Python进程之间频繁交互(提示参考RDD的局限性那些对Spark透明的计算逻辑Spark是没有优化空间的)

期待在留言区看到你的思考和答案,我们下一讲见!