You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

347 lines
22 KiB
Markdown

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 29 | Spark MLlib Pipeline高效开发机器学习应用
你好,我是吴磊。
前面我们一起学习了如何在Spark MLlib框架下做特征工程与模型训练。不论是特征工程还是模型训练针对同一个机器学习问题我们往往需要尝试不同的特征处理方法或是模型算法。
结合之前的大量实例,细心的你想必早已发现,针对同一问题,不同的算法选型在开发的过程中,存在着大量的重复性代码。
以GBDT和随机森林为例它们处理数据的过程是相似的原始数据都是经过StringIndexer、VectorAssembler和VectorIndexer这三个环节转化为训练样本只不过GBDT最后用GBTRegressor来做回归而随机森林用RandomForestClassifier来做分类。
![图片](https://static001.geekbang.org/resource/image/51/0b/51a23f4c00c6048f262eb6006f66600b.jpg?wh=1920x585 "重复的处理逻辑")
不仅如此,在之前验证模型效果的时候我们也没有闭环,仅仅检查了训练集上的拟合效果,并没有在测试集上进行推理并验证。如果我们尝试去加载新的测试数据集,那么所有的特征处理过程,都需要在测试集上重演一遍。无疑,这同样会引入大量冗余的重复代码。
那么有没有什么办法能够避免上述的重复开发让Spark MLlib框架下的机器学习开发更加高效呢答案是肯定的今天这一讲我们就来说说Spark MLlib Pipeline看看它如何帮助开发者大幅提升机器学习应用的开发效率。
## Spark MLlib Pipeline
什么是Spark MLlib Pipeline呢简单地说Pipeline是一套基于DataFrame的高阶开发API它让开发者以一种高效的方式来打造端到端的机器学习流水线。这么说可能比较抽象我们不妨先来看看Pipeline都有哪些核心组件它们又提供了哪些功能。
Pipeline的核心组件有两类一类是Transformer我们不妨把它称作“转换器”另一类是Estimator我把它叫作“模型生成器”。我们之前接触的各类特征处理函数实际上都属于转换器比如StringIndexer、MinMaxScaler、Bucketizer、VectorAssembler等等。而前面3讲提到的模型算法全部都是Estimator。
![图片](https://static001.geekbang.org/resource/image/c2/4f/c2aa44370d88f8a73975a315c37aeb4f.jpg?wh=1920x639 "Pipeline核心组件")
### Transformer
我们先来说说Transformer数据转换器。在形式上Transformer的输入是DataFrame输出也是DataFrame。结合特定的数据处理逻辑Transformer基于原有的DataFrame数据列去创建新的数据列而新的数据列中往往包含着不同形式的特征。
以StringIndexer为例它的转换逻辑很简单就是把字符串转换为数值。在创建StringIndexer实例的时候我们需要使用setInputCol(s)和setOutputCol(s)方法,来指定原始数据列和期待输出的数据列,而输出数据列中的内容就是我们需要的特征,如下图所示。
![图片](https://static001.geekbang.org/resource/image/77/82/7723f13198234b75549b968d87816e82.jpg?wh=1920x560 "以StringIndexer为例演示Transformer的作用")
结合图示可以看到Transformer消费原有DataFrame的数据列然后把生成的数据列再追加到该DataFrame就会生成新的DataFrame。换句话说Transformer并不是“就地”Inline修改原有的DataFrame而是基于它去创建新的DataFrame。
实际上每个Transformer都实现了setInputCol(s)和setOutputCol(s)这两个接口方法。除此之外Transformer还提供了transform接口用于封装具体的转换逻辑。正是基于这些核心接口Pipeline才能把各式各样的Transformer拼接在一起打造出了特征工程流水线。
一般来说在一个机器学习应用中我们往往需要多个Transformer来对数据做各式各样的转换才能生成所需的训练样本。在逻辑上多个基于同一份原始数据生成的、不同“版本”数据的DataFrame它们会同时存在于系统中。
不过受益于Spark的惰性求值Lazy Evaluation设计应用在运行时并不会出现多份冗余数据重复占用内存的情况。
不过为了开发上的遍历我们还是会使用var而不是用val来命名原始的DataFrame。原因很简单如果用val的话我们需要反复使用新的变量名来命名新生成的DataFrame。关于这部分开发小细节你可以通过回顾[上一讲](https://time.geekbang.org/column/article/444259)的代码来体会。
### Estimator
接下来我们来说说Estimator。相比TransformerEstimator要简单得多它实际上就是各类模型算法如GBDT、随机森林、线性回归等等。Estimator的核心接口只有一个那就是fit中文可以翻译成“拟合”。
Estimator的作用就是定义模型算法然后通过拟合DataFrame所囊括的训练样本来生产模型Models。这也是为什么我把Estimator称作是“模型生成器”。
不过有意思的是虽然模型算法是Estimator但是Estimator生产的模型却是不折不扣的Transformer。
要搞清楚为什么模型是Transformer我们得先弄明白模型到底是什么。所谓机器学习模型它本质上就是一个参数Parameters又称权重Weights矩阵外加一个模型结构。模型结构与模型算法有关比如决策树结构、GBDT结构、神经网络结构等等。
模型的核心用途就是做推断Inference或者说预测。给定数据样本模型可以推断房价、推断房屋类型等等。在Spark MLlib框架下数据样本往往是由DataFrame封装的而模型推断的结果还是保存在新的DataFrame中结果的默认列名是“predictions”。
其实基于训练好的推理逻辑通过增加“predictions”列把一个DataFrame转化成一个新的DataFrame这不就是Transformer在做的事情吗而这也是为什么在模型算法上我们调用的是fit方法而在做模型推断时我们在模型上调用的是transform方法。
## 构建Pipeline
好啦了解了Transformer和Estimator之后我们就可以基于它们去构建Pipeline来打造端到端的机器学习流水线。实际上一旦Transformer、Estimator准备就绪定义Pipeline只需一行代码就可以轻松拿下如下所示。
```scala
import org.apache.spark.ml.Pipeline
 
// 像之前一样,定义各种特征处理对象与模型算法
val stringIndexer = _
val vectorAssembler = _
val vectorIndexer = _
val gbtRegressor = _
 
// 将所有的Transformer、Estimator依序放入数组
val stages = Array(stringIndexer, vectorAssembler, vectorIndexer, gbtRegressor)
 
// 定义Spark MLlib Pipeline
val newPipeline = new Pipeline()
.setStages(stages)
```
可以看到要定义Pipeline只需创建Pipeline实例然后把之前定义好的Transformer、Estimator纷纷作为参数传入setStages方法即可。需要注意的是**一个Pipeline可以包含多个Transformer和Estimator不过Pipeline的最后一个环节必须是Estimator切记**。
到此为止Pipeline的作用、定义以及核心组件我们就讲完了。不过你可能会说“概念是讲完了不过我还是不知道Pipeline具体怎么用以及它到底有什么优势”别着急光说不练假把式接下来我们就结合GBDT与随机森林的例子来说说Pipeline的具体用法以及怎么用它帮你大幅度提升开发效率。
首先我们来看看在一个机器学习应用中Pipeline如何帮助我们提高效率。在上一讲我们用GBDT来拟合房价并给出了代码示例。
现在咱们把代码稍微调整一下用Spark MLlib Pipeline来实现模型训练。第一步我们还是先从文件创建DataFrame然后把数值型字段与非数值型字段区分开如下所示。
```scala
import org.apache.spark.sql.DataFrame
 
// rootPath为房价预测数据集根目录
val rootPath: String = _
val filePath: String = s"${rootPath}/train.csv"
 
// 读取文件创建DataFrame
var engineeringDF: DataFrame = spark.read.format("csv").option("header", true).load(filePath)
 
// 所有数值型字段
val numericFields: Array[String] = Array("LotFrontage", "LotArea", "MasVnrArea", "BsmtFinSF1", "BsmtFinSF2", "BsmtUnfSF", "TotalBsmtSF", "1stFlrSF", "2ndFlrSF", "LowQualFinSF", "GrLivArea", "BsmtFullBath", "BsmtHalfBath", "FullBath", "HalfBath", "BedroomAbvGr", "KitchenAbvGr", "TotRmsAbvGrd", "Fireplaces", "GarageCars", "GarageArea", "WoodDeckSF", "OpenPorchSF", "EnclosedPorch", "3SsnPorch", "ScreenPorch", "PoolArea")
 
// Label字段
val labelFields: Array[String] = Array("SalePrice")
 
import org.apache.spark.sql.types.IntegerType
 
for (field <- (numericFields ++ labelFields)) {
engineeringDF = engineeringDF
.withColumn(s"${field}Int",col(field).cast(IntegerType))
.drop(field)
}
```
数据准备好之后接下来我们就可以开始着手为Pipeline的构建打造零件依次定义转换器Transformer和模型生成器Estimator。在上一讲我们用StringIndexer把非数值字段转换为数值字段这一讲咱们也依法炮制。
```scala
import org.apache.spark.ml.feature.StringIndexer
 
// 所有非数值型字段
val categoricalFields: Array[String] = Array("MSSubClass", "MSZoning", "Street", "Alley", "LotShape", "LandContour", "Utilities", "LotConfig", "LandSlope", "Neighborhood", "Condition1", "Condition2", "BldgType", "HouseStyle", "OverallQual", "OverallCond", "YearBuilt", "YearRemodAdd", "RoofStyle", "RoofMatl", "Exterior1st", "Exterior2nd", "MasVnrType", "ExterQual", "ExterCond", "Foundation", "BsmtQual", "BsmtCond", "BsmtExposure", "BsmtFinType1", "BsmtFinType2", "Heating", "HeatingQC", "CentralAir", "Electrical", "KitchenQual", "Functional", "FireplaceQu", "GarageType", "GarageYrBlt", "GarageFinish", "GarageQual", "GarageCond", "PavedDrive", "PoolQC", "Fence", "MiscFeature", "MiscVal", "MoSold", "YrSold", "SaleType", "SaleCondition")
 
// StringIndexer期望的输出列名
val indexFields: Array[String] = categoricalFields.map(_ + "Index").toArray
 
// 定义StringIndexer实例
val stringIndexer = new StringIndexer()
// 批量指定输入列名
.setInputCols(categoricalFields)
// 批量指定输出列名,输出列名与输入列名,必须要一一对应
.setOutputCols(indexFields)
.setHandleInvalid("keep") 
```
在上一讲定义完StringIndexer实例之后我们立即拿它去对engineeringDF做转换。不过在构建Pipeline的时候我们不需要这么做只需要把这个“零件”定义好即可。接下来我们来打造下一个零件VectorAssembler。
```scala
import org.apache.spark.ml.feature.VectorAssembler
 
// 转换为整型的数值型字段
val numericFeatures: Array[String] = numericFields.map(_ + "Int").toArray
 
val vectorAssembler = new VectorAssembler()
/** 输入列为:数值型字段 + 非数值型字段
注意非数值型字段的列名要用indexFields
而不能用原始的categoricalFields不妨想一想为什么
*/
.setInputCols(numericFeatures ++ indexFields)
.setOutputCol("features")
.setHandleInvalid("keep")
```
与上一讲相比VectorAssembler的定义并没有什么两样。
下面我们继续来打造第三个零件VectorIndexer它用于帮助模型算法区分连续特征与离散特征。
```scala
import org.apache.spark.ml.feature.VectorIndexer
 
val vectorIndexer = new VectorIndexer()
// 指定输入列
.setInputCol("features")
// 指定输出列
.setOutputCol("indexedFeatures")
// 指定连续、离散判定阈值
.setMaxCategories(30)
.setHandleInvalid("keep")
```
到此为止Transformer就全部定义完了原始数据经过StringIndexer、VectorAssembler和VectorIndexer的转换之后会生成新的DataFrame。在这个最新的DataFrame中会有多个由不同Transformer生成的数据列其中“indexedFeatures”列包含的数据内容即是特征向量。
结合DataFrame一路携带过来的“SalePriceInt”列特征向量与预测标的终于结合在一起了就是我们常说的训练样本。有了训练样本接下来我们就可以着手定义Estimator。
```scala
import org.apache.spark.ml.regression.GBTRegressor
 
val gbtRegressor = new GBTRegressor()
// 指定预测标的
.setLabelCol("SalePriceInt")
// 指定特征向量
.setFeaturesCol("indexedFeatures")
// 指定决策树的数量
.setMaxIter(30)
// 指定决策树的最大深度
.setMaxDepth(5)
```
好啦到这里Pipeline所需的零件全部打造完毕零件就位只欠组装。我们需要通过Spark MLlib提供的“流水线工艺”把所有零件组装成Pipeline。
```scala
import org.apache.spark.ml.Pipeline
 
val components = Array(stringIndexer, vectorAssembler, vectorIndexer, gbtRegressor)
 
val pipeline = new Pipeline()
.setStages(components)
```
怎么样是不是很简单接下来的问题是有了Pipeline我们都能用它做些什么呢
```scala
// Pipeline保存地址的根目录
val savePath: String = _
 
// 将Pipeline物化到磁盘以备后用复用
pipeline.write
.overwrite()
.save(s"${savePath}/unfit-gbdt-pipeline")
 
// 划分出训练集和验证集
val Array(trainingData, validationData) = engineeringDF.randomSplit(Array(0.7, 0.3))
 
// 调用fit方法触发Pipeline计算并最终拟合出模型
val pipelineModel = pipeline.fit(trainingData)
```
首先我们可以把Pipeline保存下来以备后用至于怎么复用我们待会再说。再者把之前准备好的训练样本传递给Pipeline的fit方法即可触发整条Pipeline从头至尾的计算逻辑从各式各样的数据转换到最终的模型训练一步到位。
Pipeline fit方法的输出结果即是训练好的机器学习模型。我们最开始说过模型也是Transformer它可以用来推断预测结果。
看到这里你可能会说“和之前的代码实现相比Pipeline也没有什么特别之处无非是用Pipeline API把之前的环节拼接起来而已”。其实不然基于构建好的Pipeline我们可以在不同范围对其进行复用。对于机器学习应用来说我们**既可以在作业内部实现复用,也可以在作业之间实现复用,从而大幅度提升开发效率**。
### 作业内的代码复用
在之前的模型训练过程中我们仅仅在训练集与验证集上评估了模型效果。实际上在工业级应用中我们最关心的是模型在测试集上的泛化能力。就拿Kaggle竞赛来说对于每一个机器学习项目Kaggle都会同时提供train.csv和test.csv两个文件。
其中train.csv是带标签的用于训练模型而test.csv是不带标签的。我们需要对test.csv中的数据做推断然后把预测结果提交到Kaggle线上平台平台会结合房屋的实际价格来评判我们的模型到那时我们才能知道模型对于房价的预测到底有多准或是有多不准
要完成对test.csv的推断我们需要把原始数据转换为特征向量也就是把“粗粮”转化为“细粮”然后才能把它“喂给”模型。
在之前的代码实现中要做到这一点我们必须把之前加持到train.csv的所有转换逻辑都重写一遍比如StringIndexer、VectorAssembler和VectorIndexer。毫无疑问这样的开发方式是极其低效的更糟的是手工重写很容易会造成测试样本与训练样本不一致而这样的不一致是机器学习应用中的大忌。
不过有了Pipeline我们就可以省去这些麻烦。首先我们把test.csv加载进来并创建DataFrame然后把数值字段从String转为Int。
```scala
import org.apache.spark.sql.DataFrame
 
val rootPath: String = _
val filePath: String = s"${rootPath}/test.csv"
 
// 加载test.csv并创建DataFrame
var testData: DataFrame = spark.read.format("csv").option("header", true).load(filePath)
 
// 所有数值型字段
val numericFields: Array[String] = Array("LotFrontage", "LotArea", "MasVnrArea", "BsmtFinSF1", "BsmtFinSF2", "BsmtUnfSF", "TotalBsmtSF", "1stFlrSF", "2ndFlrSF", "LowQualFinSF", "GrLivArea", "BsmtFullBath", "BsmtHalfBath", "FullBath", "HalfBath", "BedroomAbvGr", "KitchenAbvGr", "TotRmsAbvGrd", "Fireplaces", "GarageCars", "GarageArea", "WoodDeckSF", "OpenPorchSF", "EnclosedPorch", "3SsnPorch", "ScreenPorch", "PoolArea")
 
// 所有非数值型字段
val categoricalFields: Array[String] = Array("MSSubClass", "MSZoning", "Street", "Alley", "LotShape", "LandContour", "Utilities", "LotConfig", "LandSlope", "Neighborhood", "Condition1", "Condition2", "BldgType", "HouseStyle", "OverallQual", "OverallCond", "YearBuilt", "YearRemodAdd", "RoofStyle", "RoofMatl", "Exterior1st", "Exterior2nd", "MasVnrType", "ExterQual", "ExterCond", "Foundation", "BsmtQual", "BsmtCond", "BsmtExposure", "BsmtFinType1", "BsmtFinType2", "Heating", "HeatingQC", "CentralAir", "Electrical", "KitchenQual", "Functional", "FireplaceQu", "GarageType", "GarageYrBlt", "GarageFinish", "GarageQual", "GarageCond", "PavedDrive", "PoolQC", "Fence", "MiscFeature", "MiscVal", "MoSold", "YrSold", "SaleType", "SaleCondition")
 
import org.apache.spark.sql.types.IntegerType
 
// 注意test.csv没有SalePrice字段也即没有Label
for (field <- (numericFields)) {
testData = testData
.withColumn(s"${field}Int",col(field).cast(IntegerType))
.drop(field)
}
```
接下来我们只需要调用Pipeline Model的transform方法就可以对测试集做推理。还记得吗模型是Transformer而transform是Transformer用于数据转换的统一接口。
```scala
val predictions = pipelineModel.transform(testData)
```
有了Pipeline我们就可以省去StringIndexer、VectorAssembler这些特征处理函数的重复定义在提升开发效率的同时消除样本不一致的隐患。除了在同一个作业内部复用Pipeline之外我们还可以在不同的作业之间对其进行复用从而进一步提升开发效率。
### 作业间的代码复用
对于同一个机器学习问题我们往往会尝试不同的模型算法以期获得更好的模型效果。例如对于房价预测我们既可以用GBDT也可以用随机森林。不过尽管模型算法不同但是它们的训练样本往往是类似的甚至是完全一样的。如果每尝试一种模型算法就需要从头处理一遍数据这未免过于低效也容易出错。
有了Pipeline我们就可以把算法选型这件事变得异常简单。还是拿房价预测来举例之前我们尝试使用GBTRegressor来训练模型这一次咱们来试试RandomForestRegressor也即使用随机森林来解决回归问题。按照惯例我们还是结合代码来进行讲解。
```scala
import org.apache.spark.ml.Pipeline
 
val savePath: String = _
 
// 加载之前保存到磁盘的Pipeline
val unfitPipeline = Pipeline.load(s"${savePath}/unfit-gbdt-pipeline")
 
// 获取Pipeline中的每一个StageTransformer或Estimator
val formerStages = unfitPipeline.getStages
 
// 去掉Pipeline中最后一个组件也即EstimatorGBTRegressor
val formerStagesWithoutModel = formerStages.dropRight(1)
 
import org.apache.spark.ml.regression.RandomForestRegressor
 
// 定义新的EstimatorRandomForestRegressor
val rf = new RandomForestRegressor()
.setLabelCol("SalePriceInt")
.setFeaturesCol("indexedFeatures")
.setNumTrees(30)
.setMaxDepth(5)
 
// 将老的Stages与新的Estimator拼接在一起
val stages = formerStagesWithoutModel ++ Array(rf)
 
// 重新定义新的Pipeline
val newPipeline = new Pipeline()
.setStages(stages)
```
首先我们把之前保存下来的Pipeline重新加载进来。然后用新的RandomForestRegressor替换原来的GBTRegressor。最后再把原有的Stages和新的Estimator拼接在一起去创建新的Pipeline即可。接下来只要调用fit方法就可以触发新Pipeline的运转并最终拟合出新的随机森林模型。
```scala
// 像之前一样从train.csv创建DataFrame准备数据
var engineeringDF = _
 
val Array(trainingData, testData) = engineeringDF.randomSplit(Array(0.7, 0.3))
 
// 调用fit方法触发Pipeline运转拟合新模型
val pipelineModel = newPipeline.fit(trainingData)
```
可以看到短短的几行代码就可以让我们轻松地完成模型选型。到此Pipeline在开发效率与容错上的优势可谓一览无余。
## 重点回顾
今天的内容就讲完啦今天这一讲我们一起学习了Spark MLlib Pipeline。你需要理解Pipeline的优势所在并掌握它的核心组件与具体用法。Pipeline的核心组件是Transformer与Estimator。
其中Transformer完成从DataFrame到DataFrame的转换基于固有的转换逻辑生成新的数据列。Estimator主要是模型算法它基于DataFrame中封装的训练样本去生成机器学习模型。将若干Transformer与Estimator拼接在一起通过调用Pipeline的setStages方法即可完成Pipeline的创建。
**Pipeline的核心优势在于提升机器学习应用的开发效率并同时消除测试样本与训练样本之间不一致这一致命隐患。Pipeline可用于作业内的代码复用或是作业间的代码复用。**
在同一作业内Pipeline能够轻松地在测试集之上完成数据推断。而在作业之间开发者可以加载之前保存好的Pipeline然后用“新零件”替换“旧零件”的方式在复用大部分处理逻辑的同时去打造新的Pipeline从而实现高效的模型选型过程。
在今后的机器学习开发中我们要充分利用Pipeline提供的优势来降低开发成本从而把主要精力放在特征工程与模型调优上。
到此为止Spark MLlib模块的全部内容我们就讲完了。
在这个模块中我们主要围绕着特征工程、模型训练和机器学习流水线等几个方面梳理了Spark MLlib子框架为开发者提供的种种能力。换句话说我们知道了Spark MLlib能做哪些事情、擅长做哪些事情。如果我们能够做到对这些能力了如指掌在日常的机器学习开发中就可以灵活地对其进行取舍从而去应对不断变化的业务需求加油
## 每日一练
我们今天一直在讲Pipeline的优势你能说一说Pipeline有哪些可能的劣势吗
欢迎你在留言区和我交流互动也推荐你把这一讲分享给更多同事、朋友说不定就能让他进一步理解Pipeline。