You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
17 KiB
Markdown

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 07 | RDD常用算子Spark如何实现数据聚合
你好,我是吴磊。
积累了一定的理论基础之后今天我们继续来学习RDD常用算子。在[RDD常用算子](https://time.geekbang.org/column/article/418079)那一讲我们讲了四个算子map、mapPartitions、flatMap和filter同时留了这样一道思考题“这些算子之间有哪些共同点
今天我们就来揭晓答案。首先在功能方面这4个算子都用于RDD内部的数据转换而学习过Shuffle的工作原理之后我们不难发现这4个算子当中没有任何一个算子会引入Shuffle计算。
而今天我们要学习的几个算子则恰恰相反它们都会引入繁重的Shuffle计算。这些算子分别是groupByKey、reduceByKey、aggregateByKey和sortByKey也就是表格中加粗的部分。
我们知道在数据分析场景中典型的计算类型分别是分组、聚合和排序。而groupByKey、reduceByKey、aggregateByKey和sortByKey这些算子的功能恰恰就是用来实现分组、聚合和排序的计算逻辑。
![图片](https://static001.geekbang.org/resource/image/c9/fc/c97a717512897749d5db659fb583c8fc.jpg?wh=1597x1977 "RDD算子分类表")
尽管这些算子看上去相比其他算子的适用范围更窄也就是它们只能作用Apply在Paired RDD之上所谓Paired RDD它指的是元素类型为KeyValue键值对的RDD。
但是在功能方面,可以说,它们承担了数据分析场景中的大部分职责。因此,掌握这些算子的用法,是我们能够游刃有余地开发数据分析应用的重要基础。那么接下来,我们就通过一些实例,来熟悉并学习这些算子的用法。
我们先来说说groupByKey坦白地说相比后面的3个算子groupByKey在我们日常开发中的“出镜率”并不高。之所以要先介绍它主要是为后续的reduceByKey和aggregateByKey这两个重要算子做铺垫。
## groupByKey分组收集
groupByKey的字面意思是“按照Key做分组”但实际上groupByKey算子包含两步即**分组**和**收集**。
具体来说对于元素类型为KeyValue键值对的Paired RDDgroupByKey的功能就是对Key值相同的元素做分组然后把相应的Value值以集合的形式收集到一起。换句话说groupByKey会把RDD的类型由RDD\[(Key, Value)\]转换为RDD\[(Key, Value集合)\]。
这么说比较抽象我们还是用一个小例子来说明groupByKey的用法。还是我们熟知的Word Count对于分词后的一个个单词假设我们不再统计其计数而仅仅是把相同的单词收集到一起那么我们该怎么做呢按照老规矩咱们还是先来给出代码实现
```scala
import org.apache.spark.rdd.RDD
 
// 以行为单位做分词
val cleanWordRDD: RDD[String] = _ // 完整代码请参考第一讲的Word Count
// 把普通RDD映射为Paired RDD
val kvRDD: RDD[(String, String)] = cleanWordRDD.map(word => (word, word))
 
// 按照单词做分组收集
val words: RDD[(String, Iterable[String])] = kvRDD.groupByKey()
```
结合前面的代码可以看到相比之前的Word Count我们仅需做两个微小的改动即可实现新的计算逻辑。第一个改动是把map算子的映射函数f由原来的word => word1变更为word => wordword这么做的效果是把kvRDD元素的Key和Value都变成了单词。
紧接着第二个改动我们用groupByKey替换了原先的reduceByKey。相比reduceByKeygroupByKey的用法要简明得多。groupByKey是无参函数要实现对Paired RDD的分组、收集我们仅需在RDD之上调用groupByKey()即可。
尽管groupByKey的用法非常简单但它的计算过程值得我们特别关注下面我用一张示意图来讲解上述代码的计算过程从而让你更加直观地感受groupByKey可能存在的性能隐患。
![图片](https://static001.geekbang.org/resource/image/6f/77/6f1c7fb6bebd3yy43b1404835fe86d77.jpg?wh=1920x1071 "groupByKey计算过程")
从图上可以看出为了完成分组收集对于Key值相同、但分散在不同数据分区的原始数据记录Spark需要通过Shuffle操作跨节点、跨进程地把它们分发到相同的数据分区。我们之前在[第6讲](https://time.geekbang.org/column/article/420399)中说了,**Shuffle是资源密集型计算**对于动辄上百万、甚至上亿条数据记录的RDD来说这样的Shuffle计算会产生大量的磁盘I/O与网络I/O开销从而严重影响作业的执行性能。
虽然groupByKey的执行效率较差不过好在它在应用开发中的“出镜率”并不高。原因很简单在数据分析领域中分组收集的使用场景很少而分组聚合才是统计分析的刚需。
为了满足分组聚合多样化的计算需要Spark提供了3种RDD算子允许开发者灵活地实现计算逻辑它们分别是reduceByKey、aggregateByKey和combineByKey。
reduceByKey我们并不陌生第1讲的Word Count实现就用到了这个算子aggregateByKey是reduceByKey的“升级版”相比reduceByKeyaggregateByKey用法更加灵活支持的功能也更加完备。
接下来我们先来回顾reduceByKey然后再对aggregateByKey进行展开。相比aggregateByKeycombineByKey仅在初始化方式上有所不同因此我把它留给你作为课后作业去探索。
## reduceByKey分组聚合
reduceByKey的字面含义是“按照Key值做聚合”它的计算逻辑就是根据聚合函数f给出的算法把Key值相同的多个元素聚合成一个元素。
在[第1讲](https://time.geekbang.org/column/article/415209)Word Count的实现中我们使用了reduceByKey来实现分组计数
```scala
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
 
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x: Int, y: Int) => x + y)
```
重温上面的这段代码你有没有觉得reduceByKey与之前讲过的map、filter这些算子有一些相似的地方没错给定处理函数f它们的用法都是“算子(f)”。只不过**对于map来说我们把f称作是映射函数对filter来说我们把f称作判定函数而对于reduceByKey我们把f叫作聚合函数。**
在上面的代码示例中reduceByKey的聚合函数是匿名函数(x, y) => x + y。与map、filter等算子的用法一样你也可以明确地定义带名函数f然后再用reduceByKey(f)的方式实现同样的计算逻辑。
需要强调的是给定RDD\[(Key类型Value类型)\]聚合函数f的类型必须是Value类型Value类型 => Value类型。换句话说函数f的形参必须是两个数值且数值的类型必须与Value的类型相同而f的返回值也必须是Value类型的数值。
咱们不妨再举一个小例子让你加深对于reduceByKey算子的理解。
接下来我们把Word Count的计算逻辑改为随机赋值、提取同一个Key的最大值。也就是在kvRDD的生成过程中我们不再使用映射函数word => (word, 1)而是改为word => (word, 随机数)然后再使用reduceByKey算子来计算同一个word当中最大的那个随机数。
你可以先停下来,花点时间想一想这个逻辑该怎么实现,然后再来参考下面的代码:
```scala
import scala.util.Random._
 
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, nextInt(100)))
 
// 显示定义提取最大值的聚合函数f
def f(x: Int, y: Int): Int = {
return math.max(x, y)
}
 
// 按照单词提取最大值
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey(f)
```
观察上面的代码片段不难发现reduceByKey算子的用法还是比较简单的只需要先定义好聚合函数f然后把它传给reduceByKey算子就行了。那么在运行时上述代码的计算又是怎样的一个过程呢
我把reduceByKey的计算过程抽象成了下图
![图片](https://static001.geekbang.org/resource/image/ac/85/aca17fe4d0fa5a52f4b9e73056aa1185.jpg?wh=1920x951 "reduceByKey计算过程")
从图中你可以看出来尽管reduceByKey也会引入Shuffle但相比groupByKey以全量原始数据记录的方式消耗磁盘与网络reduceByKey在落盘与分发之前会先在Shuffle的Map阶段做初步的聚合计算。
比如在数据分区0的处理中在Map阶段reduceByKey把Key同为Streaming的两条数据记录聚合为一条聚合逻辑就是由函数f定义的、取两者之间Value较大的数据记录这个过程我们称之为“**Map端聚合**”。相应地数据经由网络分发之后在Reduce阶段完成的计算我们称之为“**Reduce端聚合**”。
你可能会说“做了Map聚合又能怎样呢相比groupByKeyreduceByKey带来的性能收益并不算明显呀”确实就上面的示意图来说我们很难感受到reduceByKey带来的性能收益。不过量变引起质变在工业级的海量数据下相比groupByKeyreduceByKey通过在Map端大幅削减需要落盘与分发的数据量往往能将执行效率提升至少一倍。
应该说,对于大多数分组&聚合的计算需求来说只要设计合适的聚合函数f你都可以使用reduceByKey来实现计算逻辑。不过术业有专攻reduceByKey算子的局限性**在于其Map阶段与Reduce阶段的计算逻辑必须保持一致这个计算逻辑统一由聚合函数f定义**。当一种计算场景需要在两个阶段执行不同计算逻辑的时候reduceByKey就爱莫能助了。
比方说还是第1讲的Word Count我们想对单词计数的计算逻辑做如下调整
* 在Map阶段以数据分区为单位计算单词的加和
* 而在Reduce阶段对于同样的单词取加和最大的那个数值。
显然Map阶段的计算逻辑是sum而Reduce阶段的计算逻辑是max。对于这样的业务需求reduceByKey已无用武之地这个时候就轮到aggregateByKey这个算子闪亮登场了。
## aggregateByKey更加灵活的聚合算子
老规矩算子的介绍还是从用法开始。相比其他算子aggregateByKey算子的参数比较多。要在Paired RDD之上调用aggregateByKey你需要提供一个初始值一个Map端聚合函数f1以及一个Reduce端聚合函数f2aggregateByKey的调用形式如下所示
```scala
val rdd: RDD[(Key类型Value类型)] = _
rdd.aggregateByKey(初始值)(f1, f2)
```
初始值可以是任意数值或是字符串而聚合函数我们也不陌生它们都是带有两个形参和一个输出结果的普通函数。就这3个参数来说比较伤脑筋的是它们之间的类型需要保持一致具体来说
* 初始值类型必须与f2的结果类型保持一致
* f1的形参类型必须与Paired RDD的Value类型保持一致
* f2的形参类型必须与f1的结果类型保持一致。
不同类型之间的一致性描述起来比较拗口,咱们不妨结合示意图来加深理解:
![图片](https://static001.geekbang.org/resource/image/b0/f7/b0a1c86590f4213fa0fc62f5dd4ca3f7.jpg?wh=1920x568 "aggregateByKey参数之间的类型一致性")
熟悉了aggregateByKey的用法之后接下来我们用aggregateByKey这个算子来实现刚刚提到的“先加和再取最大值”的计算逻辑代码实现如下所示
```scala
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
 
// 显示定义Map阶段聚合函数f1
def f1(x: Int, y: Int): Int = {
return x + y
}
 
// 显示定义Reduce阶段聚合函数f2
def f2(x: Int, y: Int): Int = {
return math.max(x, y)
}
 
// 调用aggregateByKey实现先加和、再求最大值
val wordCounts: RDD[(String, Int)] = kvRDD.aggregateByKey(0) (f1, f2)
```
怎么样是不是很简单结合计算逻辑的需要我们只需要提前定义好两个聚合函数同时保证参数之间的类型一致性然后把初始值、聚合函数传入aggregateByKey算子即可。按照惯例我们还是通过aggregateByKey在运行时的计算过程来帮你深入理解算子的工作原理
![图片](https://static001.geekbang.org/resource/image/62/f3/62d25ab5df4fa53da4283263bb2128f3.jpg?wh=1920x1040 "reduceByKey计算过程")
不难发现在运行时与reduceByKey相比aggregateByKey的执行过程并没有什么两样最主要的区别还是Map端聚合与Reduce端聚合的计算逻辑是否一致。值得一提的是与reduceByKey一样aggregateByKey也可以通过Map端的初步聚合来大幅削减数据量在降低磁盘与网络开销的同时提升Shuffle环节的执行性能。
## sortByKey排序
在这一讲的最后我们再来说说sortByKey这个算子顾名思义它的功能是“按照Key进行排序”。给定包含KeyValue键值对的Paired RDDsortByKey会以Key为准对RDD做排序。算子的用法比较简单只需在RDD之上调用sortByKey()即可:
```scala
val rdd: RDD[(Key类型Value类型)] = _
rdd.sortByKey()
```
在默认的情况下sortByKey按照Key值的升序Ascending对RDD进行排序如果想按照降序Descending来排序的话你需要给sortByKey传入false。总结下来关于排序的规则你只需要记住如下两条即可
* 升序排序调用sortByKey()、或者sortByKey(true)
* 降序排序调用sortByKey(false)。
## 重点回顾
今天这一讲我们介绍了数据分析场景中常用的4个算子它们分别是groupByKey、reduceByKey、aggregateByKey和sortByKey掌握这些算子的用法与原理将为你游刃有余地开发数据分析应用打下坚实基础。
关于这些算子,你首先需要了解它们之间的共性。**一来这4个算子的作用范围都是Paired RDD二来在计算的过程中它们都会引入Shuffle**。而Shuffle往往是Spark作业执行效率的瓶颈因此在使用这4个算子的时候对于它们可能会带来的性能隐患我们要做到心中有数。
再者你需要掌握每一个算子的具体用法与工作原理。groupByKey是无参算子你只需在RDD之上调用groupByKey()即可完成对数据集的分组和收集。但需要特别注意的是,**以全量原始数据记录在集群范围内进行落盘与网络分发,会带来巨大的性能开销。**因此除非必需你应当尽量避免使用groupByKey算子。
利用聚合函数freduceByKey可以在Map端进行初步聚合大幅削减需要落盘与分发的数据量从而在一定程度上能够显著提升Shuffle计算的执行效率。对于绝大多数分组&聚合的计算需求只要聚合函数f设计得当reduceByKey都能实现业务逻辑。reduceByKey也有其自身的局限性那就是其Map阶段与Reduce阶段的计算逻辑必须保持一致。
对于Map端聚合与Reduce端聚合计算逻辑不一致的情况aggregateByKey可以很好地满足这样的计算场景。aggregateByKey的用法是aggregateByKey(初始值)(Map端聚合函数Reduce端聚合函数)对于aggregateByKey的3个参数你需要保证它们之间类型的一致性。一旦类型一致性得到满足你可以通过灵活地定义两个聚合函数来翻着花样地进行各式各样的数据分析。
最后对于排序类的计算需求你可以通过调用sortByKey来进行实现。sortByKey支持两种排序方式在默认情况下sortByKey()按Key值的升序进行排序sortByKey()与sortByKey(true)的效果是一样的。如果想按照降序做排序你只需要调用sortByKey(false)即可。
到此为止我们一起学习了RDD常用算子的前两大类也就是数据转换和数据聚合。在日常的开发工作中应该说绝大多数的业务需求都可以通过这些算子来实现。
因此恭喜你毫不夸张地说学习到这里你的一只脚已经跨入了Spark分布式应用开发的大门。不过我们还不能骄傲“学会”和“学好”之间还有一定的距离在接下来的时间里期待你和我一起继续加油真正做到吃透Spark、玩转Spark
## 每课一练
这一讲到这里就要结束了,今天的练习题是这样的:
学习过reduceByKey和aggregateByKey之后你能说说它们二者之间的联系吗你能用aggregateByKey来实现reduceByKey的功能吗
欢迎你分享你的答案。如果这一讲对你有帮助,也欢迎你把这一讲分享给自己的朋友,和他一起来讨论一下本讲的练习题,我们下一讲再见。