You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

17 KiB

13 | Spark SQL让我们从“小汽车摇号分析”开始

你好,我是吴磊。

在开篇词我们提出“入门Spark需要三步走”到目前为止我们携手并肩跨越了前面两步首先恭喜你学到这里熟练掌握了Spark常用算子与核心原理以后你已经可以轻松应对大部分数据处理需求了。

不过数据处理毕竟是比较基础的数据应用场景就像赛车有着不同的驾驶场景想成为Spark的资深赛车手我们还要走出第三步——学习Spark计算子框架。只有完成这一步我们才能掌握Spark SQLStructured Streaming和Spark MLlib的常规开发方法游刃有余地应对不同的数据应用场景如数据分析、流计算和机器学习等等。

图片

那这么多子框架从哪里入手比较好呢在所有的子框架中Spark SQL是代码量最多、Spark社区投入最大、应用范围最广、影响力最深远的那个。就子框架的学习来说我们自然要从Spark SQL开始。

今天我们从一个例子入手在实战中带你熟悉数据分析开发的思路和实现步骤。有了对Spark SQL的直观体验我们后面几讲还会深入探讨Spark SQL的用法、特性与优势让你逐步掌握Spark SQL的全貌。

业务需求

今天我们要讲的小例子来自于北京市小汽车摇号。我们知道为了限制机动车保有量从2011年开始北京市政府推出了小汽车摇号政策。随着摇号进程的推进在2016年为了照顾那些长时间没有摇中号码牌的“准司机”摇号政策又推出了“倍率”制度。

所谓倍率制度,它指的是,结合参与摇号次数,为每个人赋予不同的倍率系数。有了倍率加持,大家的中签率就由原来整齐划一的基础概率,变为“基础概率 * 倍率系数”。参与摇号的次数越多,倍率系数越大,中签率也会相应得到提高。

不过身边无数的“准司机”总是跟我说其实倍率这玩意没什么用背了8倍、10倍的倍率照样摇不上那么今天这一讲咱们就来借着学习Spark SQL的机会用数据来为这些还没摸过车的“老司机”答疑解惑帮他们定量地分析一下倍率与中签率之间到底有没有关系

准备工作

巧妇难为无米之炊既然是做数据分析那咱们得先有数据才行。我这边为你准备了2011年到2019年北京市小汽车的摇号数据你可以通过这个地址从网盘进行下载提取码为ajs6。

这份数据的文件名是“2011-2019 小汽车摇号数据.tar.gz”解压之后的目录结构如下图所示。

可以看到根目录下有apply和lucky两个子目录apply目录的内容是 2011-2019 年各个批次参与摇号的申请号码而lucky目录包含的是各个批次中签的申请号码。为了叙述方便我们把参与过摇号的人叫“申请者”把中签的人叫“中签者”。apply和lucky的下一级子目录是各个摇号批次而摇号批次目录下包含的是Parquet格式的数据文件。

图片

数据下载、解压完成之后,接下来,我们再来准备运行环境。

咱们的小例子比较轻量Scala版本的代码实现不会超过20行再者摇号数据体量很小解压之后的Parquet文件总大小也不超过4G。

选择这样的例子也是为了轻装上阵避免你因为硬件限制而难以实验。想要把用于分析倍率的应用跑起来你在笔记本或是PC上通过启动本地spark-shell环境就可以。不过如果条件允许的话我还是鼓励你搭建分布式的物理集群。关于分布式集群的搭建细节你可以参考第4讲

好啦,准备好数据与运行环境之后,接下来,我们就可以步入正题,去开发探索倍率与中签率关系的数据分析应用啦。

数据探索

不过先别忙着直接上手数据分析。在此之前我们先要对数据模式Data Schema有最基本的认知也就是源数据都有哪些字段这些字段的类型和含义分别是什么这一步就是我们常说的数据探索。

数据探索的思路是这样的首先我们使用SparkSession的read API读取源数据、创建DataFrame。然后通过调用DataFrame的show方法我们就可以轻松获取源数据的样本数据从而完成数据的初步探索代码如下所示。

import org.apache.spark.sql.DataFrame
 
val rootPath: String = _
// 申请者数据
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark是spark-shell中默认的SparkSession实例
// 通过read API读取源文件
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
// 数据打印
applyNumbersDF.show
 
// 中签者数据
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// 通过read API读取源文件
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
// 数据打印
luckyDogsDF.show

看到这里想必你已经眉头紧锁“SparkSessionDataFrame这些都是什么鬼你好像压根儿也没有提到过这些概念呀”别着急对于这些关键概念我们在后续的课程中都会陆续展开今天这一讲咱们先来“知其然”“知其所以然”的部分咱们放到后面去讲。

对于SparkSession你可以把它理解为是SparkContext的进阶版是Spark2.0版本以后新一代的开发入口。SparkContext通过textFile API把源数据转换为RDD而SparkSession通过read API把源数据转换为DataFrame。

而DataFrame你可以把它看作是一种特殊的RDD。RDD我们已经很熟悉了现在就把DataFrame跟RDD做个对比让你先对DataFrame有个感性认识。

先从功能分析与RDD一样DataFrame也用来封装分布式数据集它也有数据分区的概念也是通过算子来实现不同DataFrame之间的转换只不过DataFrame采用了一套与RDD算子不同的独立算子集。

再者在数据内容方面与RDD不同DataFrame是一种带Schema的分布式数据集因此你可以简单地把DataFrame看作是数据库中的一张二维表。

最后DataFrame背后的计算引擎是Spark SQL而RDD的计算引擎是Spark Core这一点至关重要。不过关于计算引擎之间的差异我们留到下一讲再去展开。

好啦言归正传。简单了解了SparkSession与DataFrame的概念之后我们继续来看数据探索。

把上述代码丢进spark-shell之后分别在applyNumbersDF和luckyDogsDF这两个DataFrame之上调用show函数我们就可以得到样本数据。可以看到“这两张表”的Schema是一样的它们都包含两个字段一个是String类型的carNum另一个是类型为Int的batchNum。

图片

其中carNum的含义是申请号码、或是中签号码而batchNum则代表摇号批次比如201906表示2019年的最后一批摇号201401表示2014年的第一次摇号。

好啦,进行到这里,初步的数据探索工作就告一段落了。

业务需求实现

完成初步的数据探索之后我们就可以结合数据特点比如两张表的Schema完全一致但数据内容的范畴不同来实现最开始的业务需求计算中签率与倍率之间的量化关系。

首先既然是要量化中签率与倍率之间的关系我们只需要关注那些中签者lucky目录下的数据的倍率变化就好了。而倍率的计算要依赖apply目录下的摇号数据。因此要做到仅关注中签者的倍率我们就必须要使用数据关联这个在数据分析领域中最常见的操作。此外由于倍率制度自2016年才开始推出所以我们只需要访问2016年以后的数据即可。

基于以上这些分析,我们先把数据过滤与数据关联的代码写出来,如下所示。

// 过滤2016年以后的中签数据且仅抽取中签号码carNum字段
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")
 
// 摇号数据与中签数据做内关联Join Key为中签号码carNum
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")

在上面的代码中我们使用filter算子对luckyDogsDF做过滤然后使用select算子提取carNum字段。

紧接着我们在applyNumbersDF之上调用join算子从而完成两个DataFrame的数据关联。join算子有3个参数你可以对照前面代码的第5行来理解这里第一个参数用于指定需要关联的DataFrame第二个参数代表Join Key也就是依据哪些字段做关联而第三个参数指定的是关联形式比如inner表示内关联left表示左关联等等。

做完数据关联之后,接下来,我们再来说一说,倍率应该怎么统计。对于倍率这个数值,官方的实现略显粗暴,如果去观察 apply 目录下 2016 年以后各个批次的文件,你就会发现,所谓的倍率,实际上就是申请号码的副本数量。

比如说我的倍率是8那么在各个批次的摇号文件中我的申请号码就会出现8次。是不是很粗暴因此要统计某个申请号码的倍率我们只需要统计它在批次文件中出现的次数就可以达到目的。

按照批次、申请号码做统计计数是不是有种熟悉的感觉没错这不就是我们之前学过的Word Count吗它本质上其实就是一个分组计数的过程。不过这一次咱们不再使用reduceByKey这个RDD算子了而是使用DataFrame的那套算子来实现我们先来看代码。

val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum"))
.agg(count(lit(1)).alias("multiplier"))

分组计数

对照代码我给你分析下思路我们先是用groupBy算子来按照摇号批次和申请号码做分组然后通过agg和count算子把batchNumcarNum出现的次数作为carNum在摇号批次batchNum中的倍率并使用alias算子把倍率重命名为“multiplier”。

这么说可能有点绕我们可以通过在multipliers之上调用show函数来直观地观察这一步的计算结果。为了方便说明我用表格的形式来进行示意。

图片

可以看到,同一个申请号码,在不同批次中的倍率是不一样的。就像我们之前说的,随着摇号的次数增加,倍率也会跟着提升。不过,这里咱们要研究的是倍率与中签率的关系,所以只需要关心中签者是在多大的倍率下中签的就行。因此,对于同一个申请号码,我们只需要保留其中最大的倍率就可以了。

需要说明的是取最大倍率的做法会把倍率的统计基数变小从而引入幸存者偏差。更严谨的做法应该把中签者过往的倍率也都统计在内这样倍率的基数才是准确的。不过呢结合实验幸存者偏差并不影响“倍率与中签率是否有直接关系”这一结论。因此咱们不妨采用取最大倍率这种更加简便的做法。毕竟学习Spark SQL才是咱们的首要目标。

为此我们需要“抹去”batchNum这个维度按照carNum对multipliers做分组并提取倍率的最大值代码如下所示。

val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum")
.agg(max("multiplier").alias("multiplier"))

分组聚合的方法跟前面差不多我们还是先用groupBy做分组不过这次仅用carNum一个字段做分组然后使用agg和max算子来保留倍率最大值。经过这一步的计算之后我们就得到了每个申请号码在中签之前的倍率系数

图片

可以看到uniqueMultipliers这个DataFrame仅包含申请号码carNum和倍率multiplier这两个字段且carNum字段不存在重复值也就是说在这份数据集中一个申请号码只有一个最大倍率与之对应。

好啦,到此为止,我们拿到了每一个中签者,在中签之前的倍率系数。接下来,结合这份数据,我们就可以统计倍率本身的分布情况。

具体来说,我们想知道的是,不同倍率之下的人数分布是什么样子的。换句话说,这一次,我们要按照倍率来对数据做分组然后计算不同倍率下的统计计数。不用说这次咱们还是得仰仗groupBy和agg这两个算子代码如下所示。

val result: DataFrame = uniqueMultipliers.groupBy("multiplier")
.agg(count(lit(1)).alias("cnt"))
.orderBy("multiplier")
 
result.collect

在最后一步我们依然使用groupBy和agg算子如法炮制得到按照倍率统计的人数分布之后我们通过collect算子来收集计算结果并同时触发上述的所有代码从头至尾交付执行。

计算结果result包含两个字段一个是倍率一个是持有该倍率的统计人数。如果把result结果数据做成柱状图的话我们可以更加直观地观察到中签率与倍率之间的关系如下图所示。

图片

不难发现,不同倍率下的中签者人数,呈现出正态分布。也即是说,对于一个申请者来说,他/她有幸摇中的概率,并不会随着倍率的增加而线性增长。用身边那些“老司机”的话说,中签这件事,确实跟倍率的关系不大。

重点回顾

今天这一讲,我们一起动手,开发了“倍率的统计分布”这个数据分析应用,并解答了中签率与倍率之间是否存在关联关系这一难题。

尽管在实现的过程中我们遇到了一些新概念和新的算子但你不必担心更不必着急。今天这节课你只需要对Spark SQL框架下的应用开发有一个感性的认识就可以了。

在Spark SQL的开发框架下我们通常是通过SparkSession的read API从源数据创建DataFrame。然后以DataFrame为入口在DataFrame之上调用各式各样的转换算子如agg、groupBy、select、filter等等对DataFrame进行转换进而完成相应的数据分析。

为了后续试验方便我把今天涉及的代码片段整理到了一起你可以把它们丢进spark-shell去运行观察每个环节的计算结果体会不同算子的计算逻辑与执行结果之间的关系。加油祝你好运

import org.apache.spark.sql.DataFrame
 
val rootPath: String = _
// 申请者数据
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark是spark-shell中默认的SparkSession实例
// 通过read API读取源文件
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
 
// 中签者数据
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// 通过read API读取源文件
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
 
// 过滤2016年以后的中签数据且仅抽取中签号码carNum字段
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")
 
// 摇号数据与中签数据做内关联Join Key为中签号码carNum
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")
 
// 以batchNum、carNum做分组统计倍率系数
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum"))
.agg(count(lit(1)).alias("multiplier"))
 
// 以carNum做分组保留最大的倍率系数
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum")
.agg(max("multiplier").alias("multiplier"))
 
// 以multiplier倍率做分组统计人数
val result: DataFrame = uniqueMultipliers.groupBy("multiplier")
.agg(count(lit(1)).alias("cnt"))
.orderBy("multiplier")
 
result.collect

每课一练

1.脑洞时间:你觉得汽车摇号的倍率制度应该怎样设计,才是最合理的?

2.请在你的Spark环境中把代码运行起来并确认执行结果是否与result一致。

欢迎你在留言区跟我交流互动,也推荐你把这一讲的内容分享给更多的朋友、同事。我们下一讲见!