You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
21 KiB
Markdown

2 years ago
# 33流计算中的数据关联流与流、流与批
你好,我是吴磊。
在上一讲我们提到Structured Streaming会复用Spark SQL所提供的一切数据处理能力比如数据抽取、过滤、分组聚合、关联、排序等等。不过在这些常规的数据处理类型中有一类操作需要我们特别关注它就是数据关联Joins
这主要是出于两方面的原因,一来,数据关联的应用非常普遍,可以说是数据应用中“出场率”最高的操作类型之一;再者,与批处理中的数据关联不同,流计算中的数据关联,还需要考虑到流处理过程中固有的一些限制,比如说时间窗口、数据延迟容忍度、输出模式,等等。
因此今天这一讲我们专门来说一说Structured Streaming中的数据关联。我们先盘点好Structured Streaming的技能树看看它都支持哪些种类的数据关联。之后再用一个短视频推荐的例子上手试验一下总结出不同类型数据关联的适用场景以及注意事项。
## 流计算中的数据关联
我们知道如果按照关联形式来划分的话数据关联可以分为Inner Join、Left Join、Right Join、Semi Join、Anti Join等等。如果按照实现方式来划分的话可以分为Nested Loop Join、Sort Merge Join和Hash Join。而如果考虑分布式环境下数据分发模式的话Join又可以分为Shuffle Join和Broadcast Join。
对于上述的3种分类标准它们之间是相互正交的我们在Spark SQL学习模块介绍过它们各自的适用场景与优劣势记不清的可以回顾第[17](https://time.geekbang.org/column/article/427470)、[18](https://time.geekbang.org/column/article/428259)讲)。
而在流计算的场景下按照数据来源的不同数据关联又可以分为“流批关联”与“双流关联”。所谓“流批关联”Stream-Static Join它指的是参与关联的一张表来自离线批数据而另一张表的来源是实时的数据流。换句话说动态的实时数据流可以与静态的离线数据关联在一起为我们提供多角度的数据洞察。
而“双流关联”Stream-Stream Join顾名思义它的含义是参与关联的两张表都来自于不同的数据流属于动态数据与动态数据之间的关联计算如下图所示。
![图片](https://static001.geekbang.org/resource/image/99/ac/991af276cf633d587ea627251ae7bdac.jpg?wh=1920x918 "流批关联与双流关联示意图")
显然相对于关联形式、实现方式和分发模式数据来源的分类标准与前三者也是相互正交的。我们知道基于前3种分类标准数据关联已经被划分得足够细致。再加上一种正交的分类标准数据关联的划分只会变得更为精细。
更让人头疼的是在Structured Streaming流计算框架下“流批关联”与“双流关联”对于不同的关联形式有着不同的支持与限制。而这也是我们需要特别关注流处理中数据关联的原因之一。
接下来,我们就分别对“流批关联”和“双流关联”进行展开,说一说它们支持的功能与特性,以及可能存在的限制。本着由简入难的原则,我们先来介绍“流批关联”,然后再去说“双流关联”。
## 流批关联
为了更好地说明流批关联,咱们不妨从一个实际场景入手。在短视频流行的当下,推荐引擎扮演着极其重要的角色,而要想达到最佳的推荐效果,推荐引擎必须依赖用户的实时反馈。
所谓实时反馈,其实就是我们习以为常的点赞、评论、转发等互动行为,不过,这里需要突出的,是一个“实时性”、或者说“及时性”。毕竟,在选择越来越多的今天,用户的兴趣与偏好,也在随着时间而迁移、变化,捕捉用户最近一段时间的兴趣爱好更加重要。
假设现在我们需要把离线的用户属性和实时的用户反馈相关联从而建立用户特征向量。显然在这个特征向量中我们既想包含用户自身的属性字段如年龄、性别、教育背景、职业等等更想包含用户的实时互动信息比如1小时内的点赞数量、转发数量等等从而对用户进行更为全面的刻画。
一般来说,实时反馈来自线上的数据流,而用户属性这类数据,往往存储在离线数据仓库或是分布式文件系统。因此,用户实时反馈与用户属性信息的关联,正是典型的流批关联场景。
那么,针对刚刚说的短视频场景,我们该如何把离线用户属性与线上用户反馈“合二为一”呢?为了演示流批关联的过程与用法,咱们自然需要事先把离线数据与线上数据准备好。本着一切从简的原则,让你仅用笔记本电脑就能复现咱们课程中的实例,这里我们使用本地文件系统来存放离线的用户属性。
而到目前为止对于数据流的生成我们仅演示过Socket的用法。实际上除了用于测试的Socket以外Structured Streaming还支持Kafka、文件等Source作为数据流的来源。为了尽可能覆盖更多知识点这一讲咱们不妨通过文件的形式来模拟线上的用户反馈。
还记得吗Structured Streaming通过readStream API来创建各式各样的数据流。要以文件的方式创建数据流我们只需将文件格式传递给format函数然后启用相应的option即可如下所示。关于readStream API的一般用法你可以回顾“流动的Word Count”[第30讲](https://time.geekbang.org/column/article/446691))。
```scala
var streamingDF: DataFrame = spark.readStream
.format("csv")
.option("header", true)
.option("path", s"${rootPath}/interactions")
.schema(actionSchema)
.load
```
对于这段代码片段来说需要你特别注意两个地方。一个是format函数它的形参是各式各样的文件格式如CSV、Parquet、ORC等等。第二个地方是指定监听地址的option选项也就是option(“path”, s"${rootPath}/interactions")。
该选项指定了Structured Streaming需要监听的文件系统目录一旦有新的数据内容进入该目录Structured Streaming便以流的形式把新数据加载进来。
需要说明的是上面的代码并不完整目的是让你先对文件形式的Source建立初步认识。随着后续讲解的推进待会我们会给出完整版代码并详细讲解其中的每一步。
要用文件的形式模拟数据流的生成我们只需将包含用户互动行为的文件依次拷贝到Structured Streaming的监听目录即可在我们的例子中也就是interactions目录。
![图片](https://static001.geekbang.org/resource/image/eb/3b/ebb4bc05741f0923f83cb010c360b43b.jpg?wh=1920x662 "流批关联示意图")
如上图的步骤1所示我们事先把用户反馈文件保存到临时的staging目录中然后依次把文件拷贝到interactions目录即可模拟数据流的生成。而用户属性信息本身就是离线数据因此我们把相关数据文件保存到userProfile目录即可如图中步骤3所示。
对于上面的流批关联计算过程在给出代码实现之前咱们不妨先来了解一下数据从而更好地理解后续的代码内容。离线的用户属性比较简单仅包含id、name、age与gender四个字段文件内容如下所示。
![图片](https://static001.geekbang.org/resource/image/83/8b/832c4da115abd99c93e509af90fc1b8b.jpg?wh=1920x927 "userProfile.csv")
线上的用户反馈相对复杂一些分别包含userId、videoId、event、eventTime等字段。前两个字段分别代表用户ID与短视频ID而event是互动类型包括Like点赞、Comment评论、Forward转发三个取值eventTime则代表产生互动的时间戳如下所示。
![图片](https://static001.geekbang.org/resource/image/6a/0b/6af42cc99a5a4yy4f2d78b971ab1180b.jpg?wh=1920x804 "interactions0.csv")
除了上面的interactions0.csv以外为了模拟数据流的生成我还为你准备了interactions1.csv、interactions2.csv两个文件它们的Schema与interactions0.csv完全一致内容也大同小异。对于这3个文件我们暂时把它们缓存在staging目录下。
好啦数据准备好之后接下来我们就可以从批数据与流数据中创建DataFrame并实现两者的关联达到构建用户特征向量的目的。首先我们先来加载数据。
```scala
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
 
// 保存staging、interactions、userProfile等文件夹的根目录
val rootPath: String = _
 
// 使用read API读取离线数据创建DataFrame
val staticDF: DataFrame = spark.read
.format("csv")
.option("header", true)
.load(s"${rootPath}/userProfile/userProfile.csv")
 
// 定义用户反馈文件的Schema
val actionSchema = new StructType()
.add("userId", "integer")
.add("videoId", "integer")
.add("event", "string")
.add("eventTime", "timestamp")
 
// 使用readStream API加载数据流注意对比readStream API与read API的区别与联系
var streamingDF: DataFrame = spark.readStream
// 指定文件格式
.format("csv")
.option("header", true)
// 指定监听目录
.option("path", s"${rootPath}/interactions")
// 指定数据Schema
.schema(actionSchema)
.load
```
为了方便你把代码与计算流程对应上这里我再一次把流批关联示意图贴在了下面。上述代码对应的是下图中的步骤2与步骤3也就是流数据与批数据的加载。
![图片](https://static001.geekbang.org/resource/image/eb/3b/ebb4bc05741f0923f83cb010c360b43b.jpg?wh=1920x662 "流批关联示意图")
从代码中我们不难发现readStream API与read API的用法几乎如出一辙不仅如此二者的返回类型都是DataFrame。因此流批关联在用法上与普通的DataFrame之间的关联看上去并没有什么不同如下所示。
```scala
// 互动数据分组、聚合对应流程图中的步骤4
streamingDF = streamingDF
// 创建Watermark设置最大容忍度为30分钟
.withWatermark("eventTime", "30 minutes")
// 按照时间窗口、userId与互动类型event做分组
.groupBy(window(col("eventTime"), "1 hours"), col("userId"), col("event"))
// 记录不同时间窗口,用户不同类型互动的计数
.count
 
/**
流批关联对应流程图中的步骤5
可以看到与普通的两个DataFrame之间的关联看上去没有任何差别
*/
val jointDF: DataFrame = streamingDF.join(staticDF, streamingDF("userId") === staticDF("id"))
```
除了在用法上没有区别以外普通DataFrame数据关联中适用的优化方法同样适用于流批关联。比方说对于streamingDF来说它所触发的每一个Micro-batch都会扫描一次staticDF所封装的离线数据。
显然在执行效率方面这并不是一种高效的做法。结合Spark SQL模块学到的Broadcast Join的优化方法我们完全可以在staticDF之上创建广播变量然后把流批关联原本的Shuffle Join转变为Broadcast Join来提升执行性能。这个优化技巧仅涉及几行代码的修改因此我把它留给你作为课后作业去练习。
完成流批关联之后我们还需要把计算结果打印到终端Console是Structured Streaming支持的Sink之一它可以帮我们确认计算结果与预期是否一致如下所示。
```scala
jointDF.writeStream
// 指定Sink为终端Console
.format("console")
// 指定输出选项
.option("truncate", false)
// 指定输出模式
.outputMode("update")
// 启动流处理应用
.start()
// 等待中断指令
.awaitTermination()
```
上面这段代码想必你并不陌生咱们在之前的几讲中都是指定Console为输出Sink这里的操作没什么不同。
好啦到此为止流批关联实例的完整代码就是这些了。接下来让我们把代码敲入本地环境的spark-shell然后依次把staging文件夹中的interactions\*.csv拷贝到interactions目录之下来模拟数据流的生成从而触发流批关联的计算。代码与数据的全部内容你可以通过这里的[GitHub地址](https://github.com/wulei-bj-cn/learn-spark/tree/main/chapter33)进行下载。
这里我贴出部分计算结果供你参考。下面的截图是我们把interactions0.csv文件拷贝到interactions目录之后得到的结果你可以在你的环境下去做验证同时继续把剩下的两个文件拷贝到监听目录来进一步观察流批关联的执行效果。
![图片](https://static001.geekbang.org/resource/image/47/cb/47fc7fb6a8fa32e64a7131b3489a66cb.png?wh=1576x296 "部分计算结果截屏")
## 双流关联
了解了流批关联之后,我们再来说说“双流关联”。显然,与流批关联相比,双流关联最主要的区别是数据来源的不同。除此之外,在双流关联中,**事件时间的处理尤其关键**。为什么这么说呢?
学过上一讲之后我们知道在源源不断的数据流当中总会有Late Data产生。Late Data需要解决的主要问题就是其是否参与当前批次的计算。
毫无疑问数据关联是一种最为常见的计算。因此在双流关联中我们应该利用Watermark机制明确指定两条数据流各自的Late Data“容忍度”从而避免Structured Streaming为了维护状态数据而过度消耗系统资源。Watermark的用法很简单你可以通过回顾[上一讲](https://time.geekbang.org/column/article/450916)来进行复习。
说到这里,你可能会问:“什么是状态数据?而维护状态数据,又为什么会过度消耗系统资源呢?”一图胜千言,咱们不妨通过下面的示意图,来说明状态数据的维护,会带来哪些潜在的问题和隐患。
![图片](https://static001.geekbang.org/resource/image/83/1f/8361f27df68785173346a30488b9af1f.jpg?wh=1920x1063 "状态数据维护示意图")
假设咱们有两个数据流一个是短视频发布的数据流其中记录着短视频相关的元信息如ID、Name等等。另一个数据流是互动流也就是用户对于短视频的互动行为。其实在刚刚的流批关联例子中我们用到数据流也是互动流这个你应该不会陌生。
现在我们想统计短视频在发布一段时间比如1个小时、6个小时、12个小时等等之后每个短视频的热度。所谓热度其实就是转评赞等互动行为的统计计数。
要做到这一点咱们可以先根据短视频ID把两个数据流关联起来然后再做统计计数。上图演示的是两条数据流在Micro-batch模式下的关联过程。为了直击要点咱们把注意力放在ID=1的短视频上。
显然在视频流中短视频的发布有且仅有一次即便是内容完全相同的短视频在数据的记录上也会有不同的ID值。而在互动流中ID=1的数据条目会有多个而且会分布在不同的Micro-batch中。事实上只要视频没有下线随着时间的推移互动流中总会夹带着ID=1的互动行为数据。
为了让视频流中ID=1的记录能够与互动流的数据关联上我们需要一直把视频流中批次0的全部内容缓存在内存中从而去等待“迟到”的ID=1的互动流数据。像视频流这种**为了后续计算而不得不缓存下来的数据,我们就把它称作为“状态数据”**。显然,状态数据在内存中积压的越久、越多,内存的压力就越大。
在双流关联中除了要求两条数据流要添加Watermark机之外为了进一步限制状态数据的尺寸Structured Streaming还要求在关联条件中对于事件时间加以限制。这是什么意思呢咱们还是结合视频流与互动流的示例通过代码来解读。
```scala
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
 
// 保存staging、interactions、userProfile等文件夹的根目录
val rootPath: String = _
 
// 定义视频流Schema
val postSchema = new StructType().add("id", "integer").add("name", "string").add("postTime", "timestamp")
// 监听videoPosting目录以实时数据流的方式加载新加入的文件
val postStream: DataFrame = spark.readStream.format("csv").option("header", true).option("path", s"${rootPath}/videoPosting").schema(postSchema).load
// 定义Watermark设置Late data容忍度
val postStreamWithWatermark = postStream.withWatermark("postTime", "5 minutes")
 
// 定义互动流Schema
val actionSchema = new StructType().add("userId", "integer").add("videoId", "integer").add("event", "string").add("eventTime", "timestamp")
// 监听interactions目录以实时数据流的方式加载新加入的文件
val actionStream: DataFrame = spark.readStream.format("csv").option("header", true).option("path", s"${rootPath}/interactions").schema(actionSchema).load
// 定义Watermark设置Late data容忍度
val actionStreamWithWatermark = actionStream.withWatermark("eventTime", "1 hours")
 
// 双流关联
val jointDF: DataFrame = actionStreamWithWatermark
.join(postStreamWithWatermark,
expr("""
// 设置Join Keys
videoId = id AND
// 约束Event time
eventTime >= postTime AND
eventTime <= postTime + interval 1 hour
"""))
```
代码的前两部分比较简单分别是从监听文件夹读取新增的文件内容依次创建视频流和互动流并在两条流上设置Watermark机制。这些内容之前已经学过不再重复咱们把重点放在最后的双流关联代码上。
可以看到在关联条件中除了要设置关联的主外键之外还必须要对两张表各自的事件时间进行约束。其中postTime是视频流的事件时间而eventTime是互动流的事件时间。上述代码的含义是对于任意发布的视频流我们只关心它一小时以内的互动行为一小时以外的互动数据将不再参与关联计算。
这样一来,**在Watermark机制的“保护”之下事件时间的限制进一步降低了状态数据需要在内存中保存的时间从而降低系统资源压力**。简言之对于状态数据的维护有了Watermark机制与事件时间的限制可谓是加了“双保险”。
## 重点回顾
好啦到这里我们今天的内容就讲完啦咱们一起来做个总结。首先我们要知道根据数据流的来源不同Structured Streaming支持“流批关联”和“双流关联”两种关联模式。
流批关联统一了流处理与批处理二者的统一使得Structured Streaming有能力服务于更广泛的业务场景。流批关联的用法相对比较简单通过readStream API与read API分别读取实时流数据与离线数据然后按照一般Join语法完成数据关联。
在今天的演示中我们用到了File这种形式的Source你需要掌握File Source的一般用法。具体来说你需要通过readStream API的format函数来指定文件格式然后通过option指定监听目录。一旦有新的文件移动到监听目录Spark便以数据流的形式加载新数据。
对于双流关联来说我们首先需要明白在这种模式下Structured Streaming需要缓存并维护状态数据。**状态数据的维护,主要是为了保证计算逻辑上的一致性**。为了让满足条件的Late data同样能够参与计算Structured Streaming需要一直在内存中缓存状态数据。毫无疑问状态数据的堆积会对系统资源带来压力与隐患。
为了减轻这样的压力与隐患在双流关联中一来我们应该对参与关联的两条数据流设置Watermark机制再者在语法上Structured Streaming在关联条件中会强制限制事件时间的适用范围。在这样的“双保险”机制下开发者即可将状态数据维护带来的性能隐患限制在可控的范围内从而在实现业务逻辑的同时保证应用运行稳定。
## 课后练习题
今天的题目有两道。
第一道题目是我在流批关联那里用interactions0.csv文件给你演示了数据关联操作/请你动手在你的环境下去做验证同时继续把剩下的两个文件interactions1.csv、interactions2.csv两个文件拷贝到监听目录来进一步观察流批关联的执行效果。
第二道题目是在双流关联中我们需要Watermark和关联条件来同时约束状态数据维护的成本与开销。那么在流批关联中我们是否也需要同样的约束呢为什么
欢迎你在留言区跟我交流互动,也推荐你把这一讲分享给更多同事、朋友。