gitbook/零基础入门Spark/docs/418079.md
2022-09-03 22:05:03 +08:00

276 lines
20 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 03 | RDD常用算子RDD内部的数据转换
你好,我是吴磊。
在上一讲的最后我们用一张表格整理了Spark[官网](https://spark.apache.org/docs/latest/rdd-programming-guide.html)给出的RDD算子。想要在Spark之上快速实现业务逻辑理解并掌握这些算子无疑是至关重要的。
因此,在接下来的几讲,我将带你一起梳理这些常见算子的用法与用途。不同的算子,就像是厨房里的炒勺、铲子、刀具和各式各样的锅碗瓢盆,只有熟悉了这些“厨具”的操作方法,才能在客人点餐的时候迅速地做出一桌好菜。
今天这一讲我们先来学习同一个RDD内部的数据转换。掌握RDD常用算子是做好Spark应用开发的基础而数据转换类算子则是基础中的基础因此我们优先来学习这类RDD算子。
在这些算子中我们重点讲解的就是map、mapPartitions、flatMap、filter。这4个算子几乎囊括了日常开发中99%的数据转换场景剩下的mapPartitionsWithIndex我把它留给你作为课后作业去探索。
![图片](https://static001.geekbang.org/resource/image/a3/88/a3ec138b50456604bae8ce22cdf56788.jpg?wh=1599x1885 "RDD算子分类表")
俗话说巧妇难为无米之炊要想玩转厨房里的厨具我们得先准备好米、面、油这些食材。学习RDD算子也是一样要想动手操作这些算子咱们得先有RDD才行。
所以接下来我们就一起来看看RDD是怎么创建的。
## 创建RDD
在Spark中创建RDD的典型方式有两种
* 通过SparkContext.parallelize在**内部数据**之上创建RDD
* 通过SparkContext.textFile等API从**外部数据**创建RDD。
这里的内部、外部是相对应用程序来说的。开发者在Spark应用中自定义的各类数据结构如数组、列表、映射等都属于“内部数据”而“外部数据”指代的是Spark系统之外的所有数据形式如本地文件系统或是分布式文件系统中的数据再比如来自其他大数据组件Hive、Hbase、RDBMS等的数据。
第一种创建方式的用法非常简单只需要用parallelize函数来封装内部数据即可比如下面的例子
```scala
import org.apache.spark.rdd.RDD
val words: Array[String] = Array("Spark", "is", "cool")
val rdd: RDD[String] = sc.parallelize(words)
```
你可以在spark-shell中敲入上述代码来直观地感受parallelize创建RDD的过程。通常来说在Spark应用内定义体量超大的数据集其实都是不太合适的因为数据集完全由Driver端创建且创建完成后还要在全网范围内跨节点、跨进程地分发到其他Executors所以往往会带来性能问题。因此parallelize API的典型用法是在“小数据”之上创建RDD。
要想在真正的“大数据”之上创建RDD我们还得依赖第二种创建方式也就是通过SparkContext.textFile等API从**外部数据**创建RDD。由于textFile API比较简单而且它在日常的开发中出现频率比较高因此我们使用textFile API来创建RDD。在后续对各类RDD算子讲解的过程中我们都会使用textFile API从文件系统创建RDD。
为了保持讲解的连贯性我们还是使用第一讲中的源文件wikiOfSpark.txt来创建RDD代码实现如下所示
```scala
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
```
好啦创建好了RDD我们就有了可以下锅的食材。接下来咱们就要正式地走进厨房把铲子和炒勺挥起来啦。
## RDD内的数据转换
首先我们先来认识一下map算子。毫不夸张地说在所有的RDD算子中map“出场”的概率是最高的。因此我们必须要掌握map的用法与注意事项。
### map以元素为粒度的数据转换
我们先来说说map算子的用法**给定映射函数fmap(f)以元素为粒度对RDD做数据转换。**其中f可以是带有明确签名的带名函数也可以是匿名函数它的形参类型必须与RDD的元素类型保持一致而输出类型则任由开发者自行决定。
这种照本宣科的介绍听上去难免会让你有点懵别着急接下来我们用些小例子来更加直观地展示map的用法。
在[第一讲](https://time.geekbang.org/column/article/415209)的Word Count示例中我们使用如下代码把包含单词的RDD转换成元素为KeyValue对的RDD后者统称为Paired RDD。
```scala
// 把普通RDD转换为Paired RDD
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
```
在上面的代码实现中传递给map算子的形参word => word1就是我们上面说的映射函数f。只不过这里f是以匿名函数的方式进行定义的其中左侧的word表示匿名函数f的输入形参而右侧的word1则代表函数f的输出结果。
如果我们把匿名函数变成带名函数的话可能你会看的更清楚一些。这里我用一段代码重新定义了带名函数f。
```scala
// 把RDD元素转换为KeyValue的形式
 
// 定义映射函数f
def f(word: String): (String, Int) = {
return (word, 1)
}
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)
```
可以看到我们使用Scala的def语法明确定义了带名映射函数f它的计算逻辑与刚刚的匿名函数是一致的。在做RDD数据转换的时候我们只需把函数f传递给map算子即可。不管f是匿名函数还是带名函数map算子的转换逻辑都是一样的你不妨把以上两种实现方式分别敲入到spark-shell去验证执行结果的一致性。
到这里为止我们就掌握了map算子的基本用法。现在你就可以定义任意复杂的映射函数f然后在RDD之上通过调用map(f)去翻着花样地做各种各样的数据转换。
比如通过定义如下的映射函数f我们就可以改写Word Count的计数逻辑也就是把“Spark”这个单词的统计计数权重提高一倍
```scala
// 把RDD元素转换为KeyValue的形式
 
// 定义映射函数f
def f(word: String): (String, Int) = {
if (word.equals("Spark")) { return (word, 2) }
return (word, 1)
}
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)
```
尽管map算子足够灵活允许开发者自由定义转换逻辑。不过就像我们刚刚说的map(f)是以元素为粒度对RDD做数据转换的在某些计算场景下这个特点会严重影响执行效率。为什么这么说呢我们来看一个具体的例子。
比方说我们把Word Count的计数需求从原来的对单词计数改为对单词的哈希值计数在这种情况下我们的代码实现需要做哪些改动呢我来示范一下
```scala
// 把普通RDD转换为Paired RDD
 
import java.security.MessageDigest
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
// 获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
// 使用MD5计算哈希值
val hash = md5.digest(word.getBytes).mkString
// 返回哈希值与数字1的Pair
(hash, 1)
}
```
由于map(f)是以元素为单元做转换的那么对于RDD中的每一条数据记录我们都需要实例化一个MessageDigest对象来计算这个元素的哈希值。
在工业级生产系统中一个RDD动辄包含上百万甚至是上亿级别的数据记录如果处理每条记录都需要事先创建MessageDigest那么实例化对象的开销就会聚沙成塔不知不觉地成为影响执行效率的罪魁祸首。
那么问题来了有没有什么办法能够让Spark在更粗的数据粒度上去处理数据呢还真有mapPartitions和mapPartitionsWithIndex这对“孪生兄弟”就是用来解决类似的问题。相比mapPartitionsmapPartitionsWithIndex仅仅多出了一个数据分区索引因此接下来我们把重点放在mapPartitions上面。
### mapPartitions以数据分区为粒度的数据转换
按照介绍算子的惯例我们还是先来说说mapPartitions的用法。mapPartitions顾名思义就是**以数据分区为粒度使用映射函数f对RDD进行数据转换**。对于上述单词哈希值计数的例子我们结合后面的代码来看看如何使用mapPartitions来改善执行性能
```scala
// 把普通RDD转换为Paired RDD
 
import java.security.MessageDigest
 
val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
 
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
// 注意这里是以数据分区为粒度获取MD5对象实例
val md5 = MessageDigest.getInstance("MD5")
val newPartition = partition.map( word => {
// 在处理每一条数据记录的时候可以复用同一个Partition内的MD5对象
(md5.digest(word.getBytes()).mkString,1)
})
newPartition
})
```
可以看到在上面的改进代码中mapPartitions以数据分区匿名函数的形参partition为粒度对RDD进行数据转换。具体的数据处理逻辑则由代表数据分区的形参partition进一步调用map(f)来完成。你可能会说“partition. map(f)仍然是以元素为粒度做映射呀!这和前一个版本的实现,有什么本质上的区别呢?”
仔细观察你就会发现相比前一个版本我们把实例化MD5对象的语句挪到了map算子之外。如此一来以数据分区为单位实例化对象的操作只需要执行一次而同一个数据分区中所有的数据记录都可以共享该MD5对象从而完成单词到哈希值的转换。
通过下图的直观对比你会发现以数据分区为单位mapPartitions只需实例化一次MD5对象而map算子却需要实例化多次具体的次数则由分区内数据记录的数量来决定。
![图片](https://static001.geekbang.org/resource/image/c7/8d/c76be8ff89f1c37e52e9f17b66bf398d.jpg?wh=1920x779 "map与mapPartitions的区别")
对于一个有着上百万条记录的RDD来说其数据分区的划分往往是在百这个量级因此相比map算子mapPartitions可以显著降低对象实例化的计算开销这对于Spark作业端到端的执行性能来说无疑是非常友好的。
实际上。除了计算哈希值以外对于数据记录来说凡是可以共享的操作都可以用mapPartitions算子进行优化。这样的共享操作还有很多比如创建用于连接远端数据库的Connections对象或是用于连接Amazon S3的文件系统句柄再比如用于在线推理的机器学习模型等等不一而足。你不妨结合实际工作场景把你遇到的共享操作整理到留言区期待你的分享。
相比mapPartitionsmapPartitionsWithIndex仅仅多出了一个数据分区索引这个数据分区索引可以为我们获取分区编号当你的业务逻辑中需要使用到分区编号的时候不妨考虑使用这个算子来实现代码。除了这个额外的分区索引以外mapPartitionsWithIndex在其他方面与mapPartitions是完全一样的。
介绍完map与mapPartitions算子之后接下来我们趁热打铁再来看一个与这两者功能类似的算子flatMap。
### **flatMap从元素到集合、再从集合到元素**
flatMap其实和map与mapPartitions算子类似在功能上与map和mapPartitions一样flatMap也是用来做数据映射的在实现上对于给定映射函数fflatMap(f)以元素为粒度对RDD进行数据转换。
不过与前两者相比flatMap的映射函数f有着显著的不同。对于map和mapPartitions来说其映射函数f的类型都是元素 => 元素即元素到元素。而flatMap映射函数f的类型元素 => 集合即元素到集合如数组、列表等。因此flatMap的映射过程在逻辑上分为两步
* 以元素为单位,创建集合;
* 去掉集合“外包装”,提取集合元素。
这么说比较抽象我们还是来举例说明。假设我们再次改变Word Count的计算逻辑由原来统计单词的计数改为统计相邻单词共现的次数如下图所示
![图片](https://static001.geekbang.org/resource/image/b9/6a/b9feyy652bb60d7d30a25e3e122a9f6a.jpg?wh=1920x764 "变更Word Count计算逻辑")
对于这样的计算逻辑我们该如何使用flatMap进行实现呢这里我们先给出代码实现然后再分阶段地分析flatMap的映射过程
```scala
// 读取文件内容
val lineRDD: RDD[String] = _ // 请参考第一讲获取完整代码
// 以行为单位提取相邻单词
val wordPairRDD: RDD[String] = lineRDD.flatMap( line => {
// 将行转换为单词数组
val words: Array[String] = line.split(" ")
// 将单个单词数组,转换为相邻单词数组
for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
})
```
在上面的代码中我们采用匿名函数的形式来提供映射函数f。这里f的形参是String类型的line也就是源文件中的一行文本而f的返回类型是Array\[String\]也就是String类型的数组。在映射函数f的函数体中我们先用split语句把line转化为单词数组然后再用for循环结合yield语句依次把单个的单词转化为相邻单词词对。
注意for循环返回的依然是数组也即类型为Array\[String\]的词对数组。由此可见函数f的类型是String => Array\[String\]),也就是刚刚说的第一步,**从元素到集合**。但如果我们去观察转换前后的两个RDD也就是lineRDD和wordPairRDD会发现它们的类型都是RDD\[String\]换句话说它们的元素类型都是String。
回顾map与mapPartitions这两个算子我们会发现转换前后RDD的元素类型与映射函数f的类型是一致的。但在flatMap这里却出现了RDD元素类型与函数类型不一致的情况。这是怎么回事呢其实呢这正是flatMap的“奥妙”所在为了让你直观地理解flatMap的映射过程我画了一张示意图如下所示
![图片](https://static001.geekbang.org/resource/image/a6/bd/a6bcd12fbc377405557c1aaf63cd24bd.jpg?wh=1920x840)
不难发现映射函数f的计算过程对应着图中的步骤1与步骤2每行文本都被转化为包含相邻词对的数组。紧接着flatMap去掉每个数组的“外包装”提取出数组中类型为String的词对元素然后以词对为单位构建新的数据分区如图中步骤3所示。这就是flatMap映射过程的第二步**去掉集合“外包装”,提取集合元素**。
得到包含词对元素的wordPairRDD之后我们就可以沿用Word Count的后续逻辑去计算相邻词汇的共现次数。你不妨结合文稿中的代码与第一讲中Word Count的代码去实现完整版的“相邻词汇计数统计”。
### filter过滤RDD
在今天的最后我们再来学习一下与map一样常用的算子filter。filter顾名思义这个算子的作用是对RDD进行过滤。就像是map算子依赖其映射函数一样filter算子也需要借助一个判定函数f才能实现对RDD的过滤转换。
所谓判定函数它指的是类型为RDD元素类型 => Boolean的函数。可以看到判定函数f的形参类型必须与RDD的元素类型保持一致而f的返回结果只能是True或者False。在任何一个RDD之上调用filter(f)其作用是保留RDD中满足f也就是f返回True的数据元素而过滤掉不满足f也就是f返回False的数据元素。
老规矩我们还是结合示例来讲解filter算子与判定函数f。
在上面flatMap例子的最后我们得到了元素为相邻词汇对的wordPairRDD它包含的是像“Spark-is”、“is-cool”这样的字符串。为了仅保留有意义的词对元素我们希望结合标点符号列表对wordPairRDD进行过滤。例如我们希望过滤掉像“Spark-&”、“|-data”这样的词对。
掌握了filter算子的用法之后要实现这样的过滤逻辑我相信你很快就能写出如下的代码实现
```scala
// 定义特殊字符列表
val list: List[String] = List("&", "|", "#", "^", "@")
 
// 定义判定函数f
def f(s: String): Boolean = {
val words: Array[String] = s.split("-")
val b1: Boolean = list.contains(words(0))
val b2: Boolean = list.contains(words(1))
return !b1 && !b2 // 返回不在特殊字符列表中的词汇对
}
 
// 使用filter(f)对RDD进行过滤
val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)
```
掌握了filter算子的用法之后你就可以定义任意复杂的判定函数f然后在RDD之上通过调用filter(f)去变着花样地做数据过滤,从而满足不同的业务需求。
## 重点回顾
好啦到此为止关于RDD内数据转换的几个算子我们就讲完了我们一起来做个总结。今天这一讲你需要掌握map、mapPartitions、flatMap和filter这4个算子的作用和具体用法。
首先我们讲了map算子的用法它允许开发者自由地对RDD做各式各样的数据转换给定映射函数fmap(f)以元素为粒度对RDD做数据转换。其中f可以是带名函数也可以是匿名函数它的形参类型必须与RDD的元素类型保持一致而输出类型则任由开发者自行决定。
为了提升数据转换的效率Spark提供了以数据分区为粒度的mapPartitions算子。mapPartitions的形参是代表数据分区的partition它通过在partition之上再次调用map(f)来完成数据的转换。相比mapmapPartitions的优势是以数据分区为粒度初始化共享对象这些共享对象在我们日常的开发中很常见比如数据库连接对象、S3文件句柄、机器学习模型等等。
紧接着我们介绍了flatMap算子。flatMap的映射函数f比较特殊它的函数类型是元素 => 集合这里集合指的是像数组、列表这样的数据结构。因此flatMap的映射过程在逻辑上分为两步这一点需要你特别注意
* **以元素为单位,创建集合;**
* **去掉集合“外包装”,提取集合元素。**
最后我们学习了filter算子filter算子的用法与map很像它需要借助判定函数f来完成对RDD的数据过滤。判定函数的类型必须是RDD元素类型 => Boolean也就是形参类型必须与RDD的元素类型保持一致返回结果类型则必须是布尔值。RDD中的元素是否能够得以保留取决于判定函数f的返回值是True还是False。
虽然今天我们只学了4个算子但这4个算子在日常开发中的出现频率非常之高。掌握了这几个简单的RDD算子你几乎可以应对RDD中90%的数据转换场景。希望你对这几个算子多多加以练习,从而在日常的开发工作中学以致用。
## **每课一练**
讲完了正课我来给你留3个思考题
1.请你结合[官网](https://spark.apache.org/docs/latest/rdd-programming-guide.html)的介绍自学mapPartitionsWithIndex算子。请你说一说在哪些场景下可能会用到这个算子
2.对于我们今天学过的4个算子再加上没有详细解释的mapPartitionsWithIndex你能说说它们之间有哪些共性或是共同点吗
3.你能说一说在日常的工作中还遇到过哪些可以在mapPartitions中初始化的共享对象呢
欢迎你在评论区回答这些练习题。你也可以把这一讲分享给更多的朋友或者同事,和他们一起讨论讨论,交流是学习的催化剂。我在评论区等你。