You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

16 KiB

30Structured Streaming从“流动的Word Count”开始

你好,我是吴磊。

从今天这一讲开始,我们将进入流计算的学习模块。与以往任何时候都不同,今天的大数据处理,对于延迟性的要求越来越高,因此流处理的基本概念与工作原理,是每一个大数据从业者必备的“技能点”。

在这个模块中按照惯例我们还是从一个可以迅速上手的实例开始带你初步认识Spark的流处理框架Structured Streaming。然后我们再从框架所提供的能力、特性出发深入介绍Structured Streaming工作原理、最佳实践以及开发注意事项等等。

在专栏的第一个模块我们一直围绕着Word Count在打转也就是通过从文件读取内容然后以批处理的形式来学习各式各样的数据处理技巧。而今天这一讲我们换个花样从一个“流动的Word Count”入手去学习一下在流计算的框架下Word Count是怎么做的。

环境准备

要上手今天的实例你只需要拥有Spark本地环境即可并不需要分布式的物理集群。

不过咱们需要以“流”的形式为Spark提供输入数据因此要完成今天的实验我们需要开启两个命令行终端。一个用于启动spark-shell另一个用于开启Socket端口并输入数据如下图所示。

图片

图片

流动的Word Count

环境准备好之后接下来我们具体来说一说什么是“流动的Word Count”。

所谓没有对比就没有鉴别为了说清楚“流动的Word Count”咱们不妨拿批处理版本的Word Count作对比。在之前的Word Count中数据以文件wikiOfSpark.txt的形式一次性地“喂给”Spark从而触发一次Job计算。而在“流动的Word Count”里数据以行为粒度分批地“喂给”Spark每一行数据都会触发一次Job计算。

具体来说我们使用netcat工具向本地9999端口的Socket地址发送数据行。而Spark流处理应用则时刻监听着本机的9999端口一旦接收到数据条目就会立即触发计算逻辑的执行。当然在我们的示例中这里的计算逻辑就是Word Count。计算执行完毕之后流处理应用再把结果打印到终端Console上。

与批处理不同只要我们不人为地中断流处理应用理论上它可以一直运行到永远。以“流动的Word Count”为例只要我们不强制中断它它就可以一直监听9999端口接收来自那里的数据并以实时的方式处理它。

好啦,弄清楚我们要做的事情之后,接下来,我们一起来一步一步地实现它。

首先第一步我们在第二个用来输入数据的终端敲入命令“nc -lk 9999”也就是使用netcat工具开启本机9999端口的Socket地址。一般来说大多数操作系统都预装了netcat工具因此不论你使用什么操作系统应该都可以成功执行上述命令。

图片

命令敲击完毕之后光标会在屏幕上一直闪烁这表明操作系统在等待我们向Socket地址发送数据。我们暂且把它搁置在这里等一会流处理应用实现完成之后再来处理它。

接下来第二步我们从第一个终端进入spark-shell本地环境然后开始开发流处理应用。首先我们先导入DataFrame并指定应用所需监听的主机与端口号。

import org.apache.spark.sql.DataFrame
 
// 设置需要监听的本机地址与端口号
val host: String = "127.0.0.1"
val port: String = "9999"

数据加载

然后是数据加载环节我们通过SparkSession的readStream API来创建DataFrame。

// 从监听地址创建DataFrame
var df: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()

仔细观察上面的代码你有没有觉得特别眼熟呢没错readStream API与SparkSession的read API看上去几乎一模一样。

图片

可以看到与read API类似readStream API也由3类最基本的要素构成也就是

  • format指定流处理的数据源头类型
  • option与数据源头有关的若干选项
  • load将数据流加载进Spark

流计算场景中有3个重要的基础概念需要我们重点掌握。它们依次是Source、流处理引擎与Sink。其中Source是流计算的数据源头也就是源源不断地产生数据的地方。与之对应Sink指的是数据流向的目的地也就是数据要去向的地方后面我们讲到writeSteam API的时候再去展开。

图片

而流处理引擎是整个模块的学习重点后续我们还会深入讨论。它的作用显而易见在数据流动过程中实现数据处理保证数据完整性与一致性。这里的数据处理包括我们Spark SQL模块讲过的各种操作类型比如过滤、投影、分组、聚合、排序等等。

现在让我们先把注意力放到readStream API与Source上来。通过readStream API的format函数我们可以指定不同类型的数据源头。在Structured Streaming框架下Spark主要支持3类数据源分别是Socket、File和Kafka。

其中Socket类型主要用于开发试验或是测试应用的连通性这也是这一讲中我们采用Socket作为数据源的原因。File指的是文件系统Spark可以通过监听文件夹把流入文件夹的文件当作数据流来对待。而在实际的工业级应用中Kafka + Spark的组合最为常见因此在本模块的最后我们会单独开辟一篇专门讲解Kafka与Spark集成的最佳实践。

通过format指定完数据源之后还需要使用零到多个option来指定数据源的具体地址、访问权限等信息。以咱们代码中的Socket为例我们需要明确主机地址与端口地址。

// 从监听地址创建DataFrame
var df: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()

一切准备就绪之后我们就可以通过load来创建DataFrame从而把数据流源源不断地加载进Spark系统。

数据处理

有了DataFrame在手我们就可以使用之前学习过的各类DataFrame算子去实现Word Count的计算逻辑。这一步比较简单你不妨先自己动手试试然后再接着往下看。

/**
使用DataFrame API完成Word Count计算
*/
 
// 首先把接收到的字符串以空格为分隔符做拆分得到单词数组words
df = df.withColumn("words", split($"value", " "))
 
// 把数组words展平为单词word
.withColumn("word", explode($"words"))
 
// 以单词word为Key做分组
.groupBy("word")
 
// 分组计数
.count()

首先需要说明的是我们从Socket创建的DataFrame默认只有一个“value”列它以行为粒度存储着从Socket接收到数据流。比方说我们在第二个终端也就是netcat界面敲入两行数据分别是“Apache Spark”和“Spark Logo”。那么在“value”列中就会有两行数据与之对应同样是“Apache Spark”和“Spark Logo”。

对于“value”列我们先是用空格把它拆分为数组words然后再用explode把words展平为单词word接下来就是对单词word做分组计数。这部分处理逻辑比较简单你很容易就可以上手鼓励你尝试其他不同的算子来实现同样的逻辑。

数据输出

数据处理完毕之后与readStream API相对应我们可以调用writeStream API来把处理结果写入到Sink中。在Structured Streaming框架下Spark支持多种Sink类型其中有Console、File、Kafka和Foreach(Batch)。对于这几种Sink的差异与特点我们留到下一讲再去展开。

图片

这里我们先来说说ConsoleConsole就是我们常说的终端选择Console作为SinkSpark会把结果打印到终端。因此Console往往与Socket配合用于开发实验与测试连通性代码实现如下所示。

/**
将Word Count结果写入到终端Console
*/
 
df.writeStream
// 指定Sink为终端Console
.format("console")
 
// 指定输出选项
.option("truncate", false)
 
// 指定输出模式
.outputMode("complete")
//.outputMode("update")
 
// 启动流处理应用
.start()
// 等待中断指令
.awaitTermination()

可以看到writeStream API看上去与DataFrame的write API也是极为神似。

图片

其中format用于指定Sink类型option则用于指定与Sink类型相关的输出选项比如与Console相对应的“truncate”选项用来表明输出内容是否需要截断。在write API中我们最终通过调用save把数据保持到指定路径而在writeStream API里我们通过start来启动端到端的流计算。

所谓端到端的流计算它指的就是我们在“流动的Word Count”应用中实现的3个计算环节也即从数据源不断地加载数据流以Word Count的计算逻辑处理数据并最终把计算结果打印到Console。

整个计算过程持续不断即便netcat端没有任何输入“流动的Word Count”应用也会一直运行直到我们强制应用退出为止。而这正是函数awaitTermination的作用顾名思义它的目的就是在“等待用户中断”。

对于writeStream API与write API的不同除了刚刚说的start和awaitTermination以外细心的你想必早已发现writeStream API多了一个outputMode函数它用来指定数据流的输出模式。

想要理解这个函数就要清楚数据流的输出模式都有哪些。我们先来说一说Structured Streaming都支持哪些输出模式然后再用“流动的Word Count”的执行结果来直观地进行对比说明。

一般来说Structured Streaming支持3种Sink输出模式也就是

  • Complete mode输出到目前为止处理过的全部内容
  • Append mode仅输出最近一次作业的计算结果
  • Update mode仅输出内容有更新的计算结果

当然这3种模式并不是在任何场景下都适用。比方说在我们“流动的Word Count”示例中Append mode就不适用。原因在于对于有聚合逻辑的流处理来说开发者必须要提供Watermark才能使用Append mode。

后面第32讲我们还会继续学习Watermark和Sink的三种输出模式这里你有个大致印象就好。

执行结果

到目前为止“流动的Word Count”应用代码已全部开发完毕接下来我们先让它跑起来感受一下流计算的魅力。然后我们再通过将outputMode中的“complete”替换为“update”直观对比一下它们的特点和区别。

要运行“流动的Word Count”首先第一步我们把刚刚实现的所有代码依次敲入第一个终端的spark-shell。全部录入之后等待一会你应该会看到如下的画面

图片

当出现“Batch: 0”字样后这表明我们的流处理应用已经成功运行并在9999端口等待数据流的录入。接下来我们切换到第二个终端也就是开启netcat的终端界面然后依次逐行注意依次逐行输入下面的文本内容每行数据录入之间请间隔3~5秒。

图片

然后我们再把屏幕切换到spark-shell终端你会看到Spark跑了4批作业执行结果分别如下。

图片

可以看到在Complete mode下每一批次的计算结果都会包含系统到目前为止处理的全部数据内容。你可以通过对比每个批次与前面批次的差异来验证这一点。

接下来我们在spark-shell终端输入强制中断命令ctrl + D或ctrl + C退出spark-shell。然后再次在终端敲入“spark-shell”命令再次进入spark-shell本地环境并再次录入“流动的Word Count”代码。不过这一次在代码的最后我们把writeStream中的outputMode由原来的“complete”改为“update”。

代码录入完毕之后我们再切回到netcat终端并重新录入刚刚的4条数据然后观察第一个终端spark-shell界面的执行结果。

图片

对比之下一目了然可以看到在Update mode下每个批次仅输出内容有变化的数据记录。所谓有变化也就是要么单词是第一次在本批次录入计数为1要么单词是重复录入计数有所变化。你可以通过观察不同批次的输出以及对比Update与Complete不同模式下的输出结果来验证这一点。

好啦到目前为止我们一起开发了一个流处理小应用“流动的Word Count”并一起查看了它在不同输出模式下的计算结果。恭喜你学到这里可以说你的一只脚已经跨入了Spark流计算的大门。后面还有很多精彩的内容有待我们一起去发掘让我们一起加油

重点回顾

今天这一讲你需要掌握如下几点。首先你需要熟悉流计算场景中3个重要的基本概念也就是Source、流处理引擎和Sink如下图所示。

图片

再者对于Source与Sink你需要知道在Structured Streaming框架下Spark都能提供哪些具体的支持。以Source为例Spark支持Socket、File和Kafka而对于SinkSpark支持Console、File、Kafka和Foreach(Batch)。

之后我们结合一个流处理小应用借此熟悉了在Structured Streaming框架下流处理应用开发的一般流程。一般来说我们通过readStream API从不同类型的Source读取数据流、并创建DataFrame然后使用DataFrame算子处理数据如数据的过滤、投影、分组、聚合等最终通过writeStream API将处理结果写入到不同形式的Sink中去。

最后对于结果的输出我们需要了解在不同的场景下Structured Streaming支持不同的输出模式。输出模式主要有3种分别是Complete mode、Append mode和Update mode。其中Complete mode输出到目前为止处理过的所有数据而Update mode仅输出在当前批次有所更新的数据内容。

每课一练

在运行“流动的Word Count”的时候我们强调依次逐行输入数据内容请你把示例给出的4行数据一次性地输入netcat拷贝&粘贴)然后观察Structured Streaming给出的结果与之前相比有什么不同

欢迎你在留言区跟我交流互动也推荐你把今天的内容分享给更多同事、朋友一起动手搭建这个Word Count流计算应用。