You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

11 KiB

29 | 流式计算:如何通过集群实现实时计算?

你好,我是陶辉。

上节课我们介绍了在有边界的存量数据上进行的MapReduce离线计算这节课我们来看看对于无边界数据怎样实时地完成流式计算。

对于不再变化的存量数据可以通过分而治之的MapReduce技术将数据划分到多台主机上并行计算由于待处理的数据量很大我们只能获得分钟级以上的时延。当面对持续实时产生动态数据的场景时业务上通常需要在秒级时延中及时地拿到运算结果。

比如,商家为了拉新促活,会为特定的用户群体(比如新用户或者不活跃用户)推出优惠活动,为了防止“羊毛党”通过大量主机并行地**“薅羊毛”**系统要能实时地聚合分析所有优惠券的使用者特点再基于业务规则及时地封掉“羊毛党”帐号或者IP地址才可以控制住风险范围提高营销活动的收益。那对于整个系统持续生成的大量订单数据怎样才能提供秒级的聚合分析结果呢

最初的流式计算方案是在时间维度上定期地将数据分片再基于MapReduce思想在空间维度的多台主机上实现并行计算这样也能获得实时计算结果。然而对每片数据执行批量计算想要在秒级甚至毫秒级拿到计算结果并不容易。当网络不稳定时数据会因为报文延误而乱序简单的基于时序分片会导致计算结果失真。当数据之间具有明显的业务关系时固定的时间窗口更是难以得到预期的分析结果。

接下来我们就深入学习一下流式计算的工作原理,以及流式计算常用的数据分片窗口。

流式计算是如何实现的?

在数据库、HDFS等分布式系统中存放的静态数据由于拥有清晰的边界所以被称为InBound Data有边界数据。然而线上运行中的互联网产品生命周期并不确定它产生的数据有明确的开始却没有截止时间点。对于这样有始无终的实时数据流我们把它称为OutBound Data无边界数据如下图所示

从业务需求上看有边界数据与无边界数据的计算目的是完全不同的。比如对于分布式监控系统我们需要基于IP地址、用户帐号、请求类型等许多特征进行定时的聚合统计例如获取每分钟内所有请求处理时延的平均值、中位数、最大值等监控系统性能。此时可以根据请求执行结果的产生时间对数据进行分片计算。比如下表中有7条监控数据需要求出每分钟请求时延的平均值。

如果我们按照分钟整点对数据进行分片就可以在02:00时对蓝色的消息1、2求出窗口内的平均时延192毫秒并立刻返回结果。之后当接收完红色的消息3、4、5后在第2分钟结束时再对3个数字求出平均值。以此类推。

**这种设计思想就是基于固定时间窗口的批处理解决方案。**当然,并不是一定要等到时间窗口结束时,才对这一批次的所有数据统一计算。我们完全可以在每个消息到来时,就计算出中间状态,当所在的时间窗口结束时,再将中间状态转换为最终结果。仍然以上表为例,我们可以在每个监控事件到达时,计算出请求时延和以及当前窗口内的事件个数,这样,在窗口结束时我们只需要将时延和除以事件个数,就能得到平均值。

因此,中间状态可以更均衡地使用计算资源,提高流式计算的整体性能。我们既可以把中间状态放在内存中,也可以把它持久化到本地磁盘中获取更高的可用性,为了方便计算节点的调度,我们通常还会将备份状态存放至远端的数据库。

当然流式计算最主要的性能提升思路还是基于MapReduce思想将同一窗口的数据从空间维度中分发到不同的计算节点进行并行的Map计算再将Map映射出的结果Reduce为最终结果。由于流式计算天然是基于消息事件驱动的因此它往往直接从Kafka等消息队列中获取输入数据如同[第27讲] 的介绍,消息队列很容易协助流式计算实现数据拆分。

到这里,我们已经看到了实现流式计算的基本思路,其中基于固定时间窗口的数据划分方式还有很大的改进空间,目前它还无法解决较为复杂的有状态计算。所谓有状态计算,是指在时间窗口内,不同的消息之间会互相作用并影响最终的计算结果,比如求平均值就是这样一个例子,每个新到达的数据都会影响中间状态值。

相反无状态计算处理到达的数据时并不涉及窗口内的其他数据处理流程要简单的多。例如当监控到请求时延超过3秒时就产生一条告警。此时只需要单独地判断每个消息中的时延数据就能够得到计算结果。

在真实的业务场景中有状态计算还要更复杂。比如对两个不同的数据源可以理解为数据库中的表做join连接时采用内连接、外连接这两种不同的连接方式就会影响到我们的时间窗口长度。再比如当不同的事件具有逻辑关系时窗口长度则应该由业务规则确定不同的请求可能拥有不等的窗口大小。接下来我们再来看看流式计算中的几种常见窗口。

如何通过窗口确定待计算的数据?

首先来看滑动窗口它是从固定窗口衍生出的一种窗口。我们继续延续求每分钟平均值的例子当业务上需要更平滑的曲线时可以通过每20秒求最近1分钟请求时延的平均值实现这就是滑动窗口其中窗口长度则是1分钟但每次计算完并不会淘汰窗口中的全部数据而只是将步长向后移动20秒即只淘汰最早20秒中的数据。当窗口长度与步长一致时滑动窗口就退化成了固定窗口。

当然我们还可以把窗口的计量单位从时间改为事件个数此时可以称为计数窗口。仍然延续上面的例子固定计数窗口可以改为求每100个访问记录的平均时延滑动计数窗口可以改为每10条记录中求最近100个记录的平均时延。由于消息本身是有时序的所以这些都可以称为时间驱动的窗口。事实上还有另外一种事件驱动的窗口与此完全不同如下图所示

固定窗口、滑动窗口并不会解析业务字段区别对待图中不同的Key关键字这就很难解决以下这类场景当需要统计用户在一个店铺内浏览的商品数量时就需要针对用户的店铺停留时长来设计动态的窗口大小。毕竟不同的用户在不同的店铺内停留时长不可能相同此时动态的窗口大小可以通过事件来驱动我们称为会话窗口。

事实上,我们还面临着信息统计准确性上的问题。在基于时间驱动的窗口中,这里的时间其实是事件到达流式系统时产生的系统处理时间,而不是事件发生的时间。仍然以访问日志为例,每条日志都有明确的请求访问时间,但在分布式系统传输时,由于网络波动的传输时延,以及各主机节点应用层的处理时延,这些事件到达流式计算框架的顺序已经发生了变化。如果仍然以固定的时间窗口来处理,就会得到错误的统计结果。

为了避免乱序事件扰乱统计结果我们可以使用水位线Watermark减少乱序概率。比如下图中消息队列中的数字表示事件时间其中事件7先于事件3、5到达了流式计算系统

如果设置了水位4窗口就不再以事件顺序严格划分而是通过水位上的时间来划分窗口这样事件7就会放在第2个窗口中处理

当然并不是有了水位线第1个窗口就会无限制的等下去。在经历一个时间段后第1个窗口会认定窗口关闭这未必准确它会处理3、1、3、2这4个事件。基于业务规则下一个水位被设置为9

这样第2个窗口会处理6、5、7事件而事件9就放在了第3个窗口中处理

以此类推。根据业务特性和经验值,找到最大乱序时间差,再基于此设置合适的水位线,就能减轻乱序事件的影响。

小结

这一讲我们介绍了流式计算的实现原理,以及常用的几种分片窗口。

对于无边界的实时数据流我们可以在时间维度上将其切分到不同的窗口中再将每个窗口内的数据从空间维度上分发到不同的节点并行计算在窗口结束时汇总结果这就实现了流式计算。Apache FlinkSparkStorm 等开源产品都是这样的流式计算框架。

通过不同的窗口划分规则可以实现不同的计算目的包括以时间驱动的固定窗口、滑动窗口和计数窗口以及以事件驱动的会话窗口。为了避免乱序事件的影响还可以通过携带超时时间的Watermark水位基于事件发生时间更精准地划分窗口。

思考题

最后留给你一道讨论题。你知道Lambda架构吗它通过分开部署的MapReduce、流式计算系统分别完成离线计算与实时流计算如下图所示

这套系统的IT成本很高因此大家致力于使用一套系统同时解决这两个问题。你认为这种解决方案是如何实现的你又是如何看待流式计算发展方向的欢迎你在留言区与大家一起探讨。

感谢阅读,如果你觉得这节课让你有所收获,也欢迎你把今天的内容分享给你的朋友。