You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
9.3 KiB
Markdown

2 years ago
# 32 | Beam Window打通流处理的任督二脉
你好,我是蔡元楠。
今天我要与你分享的主题是“Beam Window打通流处理的任督二脉”。
在上一讲中我们一起用Beam编写了第一个完整的WordCount项目我们所用的例子是统计莎士比亚的文集中最常使用到的一些单词。
这里我们所用到的“莎士比亚文集”这种类型的数据集是一个静态的数据集。也就是说,我们在生成输入数据集的时候,就已经知道了这个数据集是完整的,并不需要再等待新的数据进来。
根据前面的内容我们可以把这种数据集归类为有界数据集Bounded Dataset。这里我们的数据流水线就是一个批处理的数据流水线。
这个时候你可能会有一个疑问,如果我们想要统计的内容是一个正在连载的小说,我们在编写数据流水线的时候,这个小说还并没有完结,也就是说,未来还会不断有新的内容作为输入数据流入我们的数据流水线,那我们需要怎么做呢?
这个时候我们就需要用到窗口Window这个概念了。
## 窗口
在Beam的世界中窗口这个概念将PCollection里的每个元素根据时间戳Timestamp划分成为了不同的有限数据集合。
当我们要将一些聚合操作Aggregation应用在PCollection上面的时候或者我们想要将不同的PCollections连接Join在一起的时候其实Beam是将这些操作应用在了这些被窗口划分好的不同数据集合上的。
无论是有界数据集还是无界数据集Beam都会一视同仁按照上面所说的规则进行处理。
你可能又会有一个疑问我们在上一讲的例子当中根本就没有提到过窗口这个概念但是我刚刚又说Beam同样会将有界数据集根据窗口划分成不同的有限数据集合来处理那这些窗口、PCollection中每个元素的时间戳又是从哪里来的呢
其实我们在用I/O连接器读取有界数据集的过程中Read Transform会默认为每个元素分配一个相同的时间戳。在一般情况下这个时间戳就是你运行数据流水线的时间也就是处理时间Processing Time。而Beam也会为这个数据流水线默认地分配一个全局窗口Global Window你可以把它理解为是一个从无限小到无限大的时间窗口。
如果你想要显式地将一个全局窗口赋予一个有界数据集的话,可以使用如下代码来完成:
Java
```
PCollection<String> input = p.apply(TextIO.read().from(filepath));
PCollection<String> batchInputs = input.apply(Window.<String>into(new GlobalWindows()));
```
需要注意的是你在处理有界数据集的时候可以不用显式地将一个窗口分配给一个PCollection数据集。但是在处理无边界数据集的时候你必须要显式地分配一个窗口给这个无边界数据集。而这个窗口不可以是前面提到的全局窗口否则在运行数据流水线的时候会直接抛出异常错误。
在了解过窗口的基本概念之后接下来我来给你讲讲在Beam中存在的不同窗口类型。
### 固定窗口Fixed Window
固定窗口在有的数据处理框架中又被称为滚动窗口Tumbling Window。固定窗口通常是由一个静态的窗口大小定义而来的。
例如要是我们定义一个每小时的窗口那这个窗口大小就是固定的一个小时如果我们按照2019年7月8号的0时作为时间的起始点那么这个固定窗口就可以分为类似下面这样的形式
```
[July 8, 2019 0:00:00 AM, July 8, 2019 1:00:00 AM)
[July 8, 2019 1:00:00 AM, July 8, 2019 2:00:00 AM)
[July 8, 2019 2:00:00 AM, July 8, 2019 3:00:00 AM)
……
```
而一个PCollection中的所有元素就会根据它们各自自身的时间戳被分配给相应的固定窗口中。
这里你需要注意一点因为固定窗口本质上并不可能会重叠在一起如果我们定义的窗口是固定窗口的话PCollection中的每一个元素只会落入一个且是唯一一个窗口中。
在Beam中如果要定义一个上述所说的按照每小时分割的窗口我们可以使用一个Window Transform来完成如下所示
Java
```
PCollection<String> input = p.apply(KafkaIO.<Long, String>read()).apply(Values.<String>create());
PCollection<String> fixedWindowedInputs = input.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))));
```
### 滑动窗口Sliding Window
滑动窗口通常是由一个静态的窗口大小和一个滑动周期Sliding Period定义而来的。
例如我们可以定义一个窗口大小为一个小时滑动周期为30分钟的一个滑动窗口。我们还是以2019年7月8号的0时作为时间的起始点那这个滑动窗口可以分为下面这样的形式
```
[July 8, 2019 0:00:00 AM, July 8, 2019 1:00:00 AM)
[July 8, 2019 0:30:00 AM, July 8, 2019 1:30:00 AM)
[July 8, 2019 1:00:00 AM, July 8, 2019 2:00:00 AM)
[July 8, 2019 1:30:00 AM, July 8, 2019 2:30:00 AM)
……
```
因为Beam对于滑动周期的大小并没有做任何限制所以你可以看到滑动窗口和固定窗口不同的是当滑动周期小于窗口大小的时候滑动窗口会有部分重叠。也就是说在一个PCollection里面同一个元素是可以被分配到不同的滑动窗口中的。
可能你也会发现到,当滑动窗口的窗口大小和滑动周期一样的时候,这个滑动窗口的性质其实就和固定窗口一样了。
在Beam中如果要定义一个上述所说窗口大小为一个小时而滑动周期为30分钟的一个滑动窗口我们同样可以使用一个Window Transform来完成如下所示
Java
```
PCollection<String> input = p.apply(KafkaIO.<Long, String>read()).apply(Values.<String>create());
PCollection<String> slidingWindowedInputs = input.apply(Window.<String>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardMinutes(30))));
```
### 会话窗口Sessions Window
会话窗口和上面所说的两个窗口有点不一样,它并没有一个固定的窗口长度。
会话窗口主要是用于记录持续了一段时间的活动数据集。在一个会话窗口中的数据集如果将它里面所有的元素按照时间戳来排序的话那么任意相邻的两个元素它们的时间戳相差不会超过一个定义好的静态间隔时间段Gap Duration
怎么理解这个定义呢?我想用一个例子来解释会比较清晰。
假设,我们现在正在一个视频流的平台中处理无界数据集,我们想要分析在这个平台中的一些用户行为习惯。
为了方便说明,我们想要分析的问题非常简单,就是一个用户在线看视频一般会在每天的什么时候开始看多长时间的视频。同时,我们假设只会有一个用户的数据流入我们的输入数据集中,这个数据会带有用户对视频平台发送视频流请求的时间戳。
我们希望定义一个会话窗口来处理这些数据而这个会话窗口的间隔时间段为5分钟。
所有的数据假设都是发生在2019年7月8号中的流入的数据集如下
```
(key1, value1, [7:44:00 AM7:44:00 AM))
(key1, value2, [7:45:00 AM7:45:00 AM))
(key1, value3, [7:49:00 AM7:49:00 AM))
(key1, value4, [8:01:00 AM8:01:00 AM))
(key1, value5, [8:02:00 AM8:02:00 AM))
```
那么这5个数据会形成两个会话窗口分别是
```
(key1[(value1, [7:44:00 AM7:44:00 AM)), (value2, [7:45:00 AM7:45:00 AM)), (value3, [7:49:00 AM7:49:00 AM))])
```
```
(key1[(value4, [8:01:00 AM8:01:00 AM)), (value5, [8:02:00 AM8:02:00 AM))])
```
你可以看到在第一个会话窗口中数据的时间戳分别是7:44:00AM7:45:00AM和7:49:00AM这个窗口的总长度有5分钟。任意相邻的两个元素之间的时间间隔不会超过我们之前定义好的5分钟。
而在第二个会话窗口中数据的时间戳分别是8:01:00AM和8:02:00AM这个窗口的总长度有1分钟同样任意相邻的两个元素之间的时间间隔不会超过5分钟。每一个会话窗口都记录了一个用户的在线活跃点和时长。
在Beam中如果要定义一个上述所说会话窗口的话你可以使用以下代码来完成
Java
```
PCollection<String> input = p.apply(KafkaIO.<Long, String>read()).apply(Values.<String>create());
PCollection<String> sessionWindowedInputs = input.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(5))));
```
## 小结
今天我们一起学习了在处理无界数据集的时候,我们需要显示定义到的一个概念——窗口。
窗口的定义是后面我们编写流处理数据流水线的一个基石。而窗口这个概念其实就是用来回答我们在第23讲中“WWWH”问题里“Where in event time they are being computed”这个问题的。除去全局窗口Beam里面总共让我们定义三种不同的窗口类型分别是固定窗口滑动窗口和会话窗口。
## 思考题
在今天介绍的三种时间窗口类型中,你觉得这三种时间窗口分别适合使用在什么样的应用场景中呢?
欢迎你把答案写在留言区,与我和其他同学一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。