You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

90 lines
10 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 28 | MapReduce如何通过集群实现离线计算
你好,我是陶辉。
接下来的2节课我将介绍如何通过分布式集群优化计算任务。这一讲我们首先来看对于有边界静态数据的离线计算下一讲再来看对无边界数据流的实时计算。
对大量数据做计算时我们通常会采用分而治之的策略提升计算速度。比如单机上基于递归、分治思想实现的快速排序、堆排序时间复杂度只有O(N\*logN)这比在原始数据集上工作的插入排序、冒泡排序要快得多O(N2)。然而当单机磁盘容量无法存放全部数据或者受限于CPU频率、核心数量单机的计算时间远大于可接受范围时我们就需要在分布式集群上使用分治策略。
比如大规模集群每天产生的日志量是以TB为单位计算的这种日志分析任务单台服务器的处理能力是远远不够的。我们需要将计算任务分解成单机可以完成的小任务由分布式集群并行处理后再从中间结果中归并得到最终的运算结果。这一过程由Google抽象为[MapReduce](https://zh.wikipedia.org/wiki/MapReduce) 模式实现在Hadoop等分布式系统中。
虽然MapReduce已经有十多个年头的历史了但它仍是分布式计算的基石这种编程思想在新出现的各种技术中都有广泛的应用。比如当在单机上使用TensorFlow完成一轮深度学习的时间过久或者单颗GPU显存无法存放完整的神经网络模型时就可以通过Map思想把数据或者模型分解给多个TensorFlow实例并行计算后再根据Reduce思想合并得到最终结果。再比如知识图谱也是通过MapReduce思想并行完成图计算任务的。
接下来我们就具体看看如何在分布式集群中实现离线计算以及MapReduce是怎样提供SQL语言接口的。
## 分而治之:如何实现集群中的批量计算?
分而治之的思想在分布式系统中广为使用,比如[\[第21讲\]](https://time.geekbang.org/column/article/252741) 介绍过的AKF立方体Z轴扩展就是基于用户的请求缩小集群中单个节点待处理的数据量比如下图中当关系数据库中单表行数达到千万行以上时此时不得不存放在磁盘中的索引将会严重降低SQL语句的查询速度。而执行分库分表后由应用或者中间层的代理分解查询语句待多个不足百万行的表快速返回查询结果后再归并为最终的结果集。
![](https://static001.geekbang.org/resource/image/71/81/712a0a73b71090abcaa7ac552f402181.png)
与上述的IO类任务不同并非所有的计算任务都可以基于分治策略分解为可以并发执行的子任务。比如[\[第14讲\]](https://time.geekbang.org/column/article/241632) 介绍过的基于[CBC分组模式](https://zh.wikipedia.org/zh-hans/%E5%88%86%E7%BB%84%E5%AF%86%E7%A0%81%E5%B7%A5%E4%BD%9C%E6%A8%A1%E5%BC%8F)的AES加密算法就无法分解执行如下图所示每16个字节的块在加密时都依赖前1个块的加密结果这样的计算过程既无法利用多核CPU也无法基于MapReduce思想放在多主机上并发执行。
[![](https://static001.geekbang.org/resource/image/2b/3b/2b8bca7a74eb5f98125098e271d0973b.jpg "图片源自https://zh.wikipedia.org/zh-hans/%E5%88%86%E7%BB%84%E5%AF%86%E7%A0%81%E5%B7%A5%E4%BD%9C%E6%A8%A1%E5%BC%8F")](https://zh.wikipedia.org/zh-hans/%E5%88%86%E7%BB%84%E5%AF%86%E7%A0%81%E5%B7%A5%E4%BD%9C%E6%A8%A1%E5%BC%8F)
我们再来看可以使用MapReduce的计算任务其中最经典的例子是排序Google在构建倒排索引时要为大量网页排序。当使用插入排序不熟悉插入排序的同学可以想象自己拿了一手乱牌然后在手中一张张重新插入将其整理有序在整个数据集上操作时计算的时间复杂度是O(N2)但快排、堆排序、归并排序等算法的时间复杂度只有O(N\*logN),这就是通过分治策略,缩小子问题数据规模实现的。
比如下图是在8个数字上使用归并排序算法进行排序的流程。我们将数组递归地进行3log8轮对半拆分后每个子数组就只有2个元素。对2个元素排序只需要进行1次比较就能完成。接着再将有序的子数组不断地合并就可以得到完整的有序数组。
![](https://static001.geekbang.org/resource/image/8e/71/8e9f75013bcb26ae2befec6ff8739971.png)
其中将两个含有N/2个元素的有序子数组比如1、3、7、19和4、8、11、25合并为一个有序数组时只需要做N/2到N-1次比较图中只做了5次比较速度非常快。因此比较次数乘以迭代轮数就可以得出时间复杂度为O(N\*logN)。
同样的道理引申到分布式系统中就成为了MapReduce模式。其中原始数据集要通过SPLIT步骤拆分到分布式系统中的多个节点中而每个节点并发执行用户预定义的MAP函数最后将MAP运算出的结果通过用户预定义的REDUCE函数归并为最终的结果。比如上例中我们可以将8个元素拆分到2个节点中并行计算其中每个节点究竟是继续采用归并排序还是使用其他排序算法这由预定义的MAP函数决定。当MAP函数生成有序的子数组后REDUCE函数再将它们归并为完整的有序数组具体如下图所示
![](https://static001.geekbang.org/resource/image/72/15/72bb89540bae52a46e69a5d802680715.png)
当面对TB、PB级别的数据时MapReduce思想就成了唯一的解决方案。当然在实际软件工程中实现MapReduce的框架要比上面的示意图复杂许多毕竟在大规模分布式系统中故障每时每刻都会发生如何分发数据、调度节点执行MAP映射、监控计算节点等都需要精心的设计。特别是当单个节点的磁盘无法存放下全部数据时常常使用类似HDFS的分布式文件系统存放数据所以MapReduce框架往往还需要对接这样的系统来获取数据具体如下图所示
[![](https://static001.geekbang.org/resource/image/4f/39/4f3182c6334ec0c7b67e69b6ded2e839.png "图片来源http://a4academics.com/tutorials/83-hadoop/840-map-reduce-architecture")](http://a4academics.com/tutorials/83-hadoop/840-map-reduce-architecture)
而且生产环境中的任务远比整数排序复杂得多所以写对Map、Reduce函数并不容易。另一方面大部分数据分析任务又是高度相似的所以我们没有必要总是直接编写Map、Reduce函数实现发布式系统的离线计算。由于SQL语言支持聚合分析、表关联还内置了许多统计函数很适合用来做数据分析它的学习成本又非常低所以大部分MapReduce框架都提供了类SQL语言的接口可以替代自行编写Map、Reduce函数。接下来我们看看SQL语言统计数据时Map、Reduce函数是怎样工作的。
## SQL是如何简化MapReduce模式的
我们以最常见的Web日志分析为例观察用SQL语言做统计时MapReduce流程是怎样执行的。举个例子Nginx的access.log访问日志是这样的基于默认的combined格式
```
127.0.0.1 - - [18/Jul/2020:10:16:15 +0800] "GET /loginuserid=101 HTTP/1.1" 200 56 "-" "curl/7.29.0"
```
你可以通过正则表达式取出客户端IP地址、用户名、HTTP响应码这样就可以生成结构化的数据表格
![](https://static001.geekbang.org/resource/image/e9/e7/e9fcf8e7529f973b1679af93333b4ee7.jpg)
如果我们想按照客户端IP、HTTP响应码聚合统计访问次数基于通用的SQL规则就可以写出下面这行SQL语句
```
select ClientIp, StatusCode, count(*) from access_log group by ClientIp, StatusCode
```
而建立在MapReduce之上的框架比如Hive会将它翻译成如下图所示的MapReduce流程
![](https://static001.geekbang.org/resource/image/4c/f9/4cb7443e0f9cdf2ba77fbbe230487ff9.png)
其中我们假定5行数据被拆分到2个节点中执行Map函数其中它们分别基于2行、3行这样小规模的数据集生成了正确的聚合统计结果。接着在Shuffle步骤基于key关键字排序后再交由Reduce函数归并出正确的结果。
除了这个例子中的count函数像max求最大值、min求最小值、distinct去重、sum求和、avg求平均数、median求中位数、stddev求标准差等函数都很容易分解为子任务并发执行最后归并出最终结果。
当多个数据集之间需要做交叉统计时SQL中的join功能包括内连接、左外连接、右外连接、全连接四种模式也很容易做关联查询。此时我们可以在并行计算的Map函数中把where条件中的关联字段作为key关键字经由Reduce阶段实现结果的关联。
由于MapReduce操作的数据集非常庞大还需要经由网络调度多台服务器才能完成计算因此任务的执行时延至少在分钟级所以通常不会服务于用户的实时请求而只是作为离线的异步任务将运算结果写入数据库。
## 小结
这一讲我们介绍了在集群中使用分治算法统计大规模数据的MapReduce模式。
当数据量很大或者计算时间过长时如果计算过程可以被分解为并发执行的子任务就可以基于MapReduce思想利用分布式集群的计算力完成任务。其中用户可以预定义在节点中并发执行的Map函数以及将Map输出的列表合并为最终结果的Reduce函数。
虽然MapReduce将并行计算抽象为统一的模型但开发Map、Reduce函数的成本还是太高了于是针对高频场景许多MapReduce之上的框架提供了类SQL语言接口通过group by的聚合、join连接以及各种统计函数我们就可以利用整个集群完成数据分析。
MapReduce模式针对的是静态数据也叫有边界数据它更多用于业务的事前或者事后处理流程中而做事中处理时必须面对实时、不断增长的无边界数据流此时MapReduce就无能为力了。下一讲我们将介绍处理无边界数据的流式计算框架。
## 思考题
最后留给你一道思考题。你遇到过哪些计算任务是无法使用MapReduce模式完成的欢迎你在留言区与大家一起探讨。
感谢阅读,如果你觉得这节课让你有所收获,也欢迎你把今天的内容分享给你的朋友。