You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
16 KiB
Markdown

2 years ago
# 27 | 关于高水位和Leader Epoch的讨论
你好我是胡夕。今天我要和你分享的主题是Kafka中的高水位和Leader Epoch机制。
你可能听说过高水位High Watermark但不一定耳闻过Leader Epoch。前者是Kafka中非常重要的概念而后者是社区在0.11版本中新推出的主要是为了弥补高水位机制的一些缺陷。鉴于高水位机制在Kafka中举足轻重而且深受各路面试官的喜爱今天我们就来重点说说高水位。当然我们也会花一部分时间来讨论Leader Epoch以及它的角色定位。
## 什么是高水位?
首先我们要明确一下基本的定义什么是高水位或者说什么是水位水位一词多用于流式处理领域比如Spark Streaming或Flink框架中都有水位的概念。教科书中关于水位的经典定义通常是这样的
> 在时刻T任意创建时间Event Time为T且T≤T的所有事件都已经到达或被观测到那么T就被定义为水位。
“Streaming System”一书则是这样表述水位的
> 水位是一个单调增加且表征最早未完成工作oldest work not yet completed的时间戳。
为了帮助你更好地理解水位,我借助这本书里的一张图来说明一下。
![](https://static001.geekbang.org/resource/image/84/77/8426888d04e1e9917619829b7e3de877.png)
图中标注“Completed”的蓝色部分代表已完成的工作标注“In-Flight”的红色部分代表正在进行中的工作两者的边界就是水位线。
在Kafka的世界中水位的概念有一点不同。Kafka的水位不是时间戳更与时间无关。它是和位置信息绑定的具体来说它是用消息位移来表征的。另外Kafka源码使用的表述是高水位因此今天我也会统一使用“高水位”或它的缩写HW来进行讨论。值得注意的是Kafka中也有低水位Low Watermark它是与Kafka删除消息相关联的概念与今天我们要讨论的内容没有太多联系我就不展开讲了。
## 高水位的作用
在Kafka中高水位的作用主要有2个。
1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
2. 帮助Kafka完成副本同步。
下面这张图展示了多个与高水位相关的Kafka术语。我来详细解释一下图中的内容同时澄清一些常见的误区。
![](https://static001.geekbang.org/resource/image/45/db/453ff803a31aa030feedba27aed17ddb.jpg)
我们假设这是某个分区Leader副本的高水位图。首先请你注意图中的“已提交消息”和“未提交消息”。我们之前在专栏[第11讲](https://time.geekbang.org/column/article/102931)谈到Kafka持久性保障的时候特意对两者进行了区分。现在我借用高水位再次强调一下。在分区高水位以下的消息被认为是已提交消息反之就是未提交消息。消费者只能消费已提交消息即图中位移小于8的所有消息。注意这里我们不讨论Kafka事务因为事务机制会影响消费者所能看到的消息的范围它不只是简单依赖高水位来判断。它依靠一个名为LSOLog Stable Offset的位移值来判断事务型消费者的可见性。
另外,需要关注的是,**位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的**。
图中还有一个日志末端位移的概念即Log End Offset简写是LEO。它表示副本写入下一条消息的位移值。注意数字15所在的方框是虚线这就说明这个副本当前只有15条消息位移值是从0到14下一条新消息的位移是15。显然介于高水位和LEO之间的消息就属于未提交消息。这也从侧面告诉了我们一个重要的事实那就是**同一个副本对象其高水位值不会大于LEO值**。
**高水位和LEO是副本对象的两个重要属性**。Kafka所有副本都有对应的高水位和LEO值而不仅仅是Leader副本。只不过Leader副本比较特殊Kafka使用Leader副本的高水位来定义所在分区的高水位。换句话说**分区的高水位就是其Leader副本的高水位**。
## 高水位更新机制
现在我们知道了每个副本对象都保存了一组高水位值和LEO值但实际上在Leader副本所在的Broker上还保存了其他Follower副本的LEO值。我们一起来看看下面这张图。
![](https://static001.geekbang.org/resource/image/8b/de/8b1b8474a568e2ae40bf36bb03ca81de.jpg)
在这张图中我们可以看到Broker 0上保存了某分区的Leader副本和所有Follower副本的LEO值而Broker 1上仅仅保存了该分区的某个Follower副本。Kafka把Broker 0上保存的这些Follower副本又称为**远程副本**Remote Replica。Kafka副本机制在运行过程中会更新Broker 1上Follower副本的高水位和LEO值同时也会更新Broker 0上Leader副本的高水位和LEO以及所有远程副本的LEO但它不会更新远程副本的高水位值也就是我在图中标记为灰色的部分。
为什么要在Broker 0上保存这些远程副本呢其实它们的主要作用是**帮助Leader副本确定其高水位也就是分区高水位**。
为了帮助你更好地记忆这些值被更新的时机我做了一张表格。只有搞清楚了更新机制我们才能开始讨论Kafka副本机制的原理以及它是如何使用高水位来执行副本消息同步的。
![](https://static001.geekbang.org/resource/image/d6/41/d6d2f98c611e06ffb85f01031ca79b41.jpg)
在这里我稍微解释一下什么叫与Leader副本保持同步。判断的条件有两个。
1. 该远程Follower副本在ISR中。
2. 该远程Follower副本LEO值落后于Leader副本LEO值的时间不超过Broker端参数replica.lag.time.max.ms的值。如果使用默认值的话就是不超过10秒。
乍一看这两个条件好像是一回事因为目前某个副本能否进入ISR就是靠第2个条件判断的。但有些时候会发生这样的情况即Follower副本已经“追上”了Leader的进度却不在ISR中比如某个刚刚重启回来的副本。如果Kafka只判断第1个条件的话就可能出现某些副本具备了“进入ISR”的资格但却尚未进入到ISR中的情况。此时分区高水位值就可能超过ISR中副本LEO而高水位 > LEO的情形是不被允许的。
下面我们分别从Leader副本和Follower副本两个维度来总结一下高水位和LEO的更新机制。
**Leader副本**
处理生产者请求的逻辑如下:
1. 写入消息到本地磁盘。
2. 更新分区高水位值。
i. 获取Leader副本所在Broker端保存的所有远程副本LEO值LEO-1LEO-2……LEO-n
ii. 获取Leader副本高水位值currentHW。
iii. 更新 currentHW = max{currentHW, minLEO-1, LEO-2, ……LEO-n}。
处理Follower副本拉取消息的逻辑如下
1. 读取磁盘(或页缓存)中的消息数据。
2. 使用Follower副本发送请求中的位移值更新远程副本LEO值。
3. 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)。
**Follower副本**
从Leader拉取消息的处理逻辑如下
1. 写入消息到本地磁盘。
2. 更新LEO值。
3. 更新高水位值。
i. 获取Leader发送的高水位值currentHW。
ii. 获取步骤2中更新过的LEO值currentLEO。
iii. 更新高水位为min(currentHW, currentLEO)。
## 副本同步机制解析
搞清楚了这些值的更新机制之后我来举一个实际的例子说明一下Kafka副本同步的全流程。该例子使用一个单分区且有两个副本的主题。
当生产者发送一条消息时Leader和Follower副本对应的高水位是怎么被更新的呢我给出了一些图片我们一一来看。
首先是初始状态。下面这张图中的remote LEO就是刚才的远程副本的LEO值。在初始状态时所有值都是0。
![](https://static001.geekbang.org/resource/image/1e/36/1ee643ce819a503f72df3d9b4ab04536.jpg)
当生产者给主题分区发送一条消息后,状态变更为:
![](https://static001.geekbang.org/resource/image/73/0b/7317242d7068dbf618866d5974a2d80b.jpg)
此时Leader副本成功将消息写入了本地磁盘故LEO值被更新为1。
Follower再次尝试从Leader拉取消息。和之前不同的是这次有消息可以拉取了因此状态进一步变更为
![](https://static001.geekbang.org/resource/image/91/0d/910e114abe40f1f9e4a13f6e6083320d.jpg)
这时Follower副本也成功地更新LEO为1。此时Leader和Follower副本的LEO都是1但各自的高水位依然是0还没有被更新。**它们需要在下一轮的拉取中被更新**,如下图所示:
![](https://static001.geekbang.org/resource/image/80/cb/8066e72733f14d2732a054ed56e373cb.jpg)
在新一轮的拉取请求中由于位移值是0的消息已经拉取成功因此Follower副本这次请求拉取的是位移值=1的消息。Leader副本接收到此请求后更新远程副本LEO为1然后更新Leader高水位为1。做完这些之后它会将当前已更新过的高水位值1发送给Follower副本。Follower副本接收到以后也将自己的高水位值更新成1。至此一次完整的消息同步周期就结束了。事实上Kafka就是利用这样的机制实现了Leader和Follower副本之间的同步。
## Leader Epoch登场
故事讲到这里似乎很完美依托于高水位Kafka既界定了消息的对外可见性又实现了异步的副本同步机制。不过我们还是要思考一下这里面存在的问题。
从刚才的分析中我们知道Follower副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个Follower副本情况可能更糟也许需要多轮拉取请求。也就是说Leader副本高水位更新和Follower副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此社区在0.11版本正式引入了Leader Epoch概念来规避因高水位更新错配导致的各种不一致问题。
所谓Leader Epoch我们大致可以认为是Leader版本。它由两部分数据组成。
1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时都会增加该版本号。小版本号的Leader被认为是过期Leader不能再行使Leader权力。
2. 起始位移Start Offset。Leader副本在该Epoch值上写入的首条消息的位移。
我举个例子来说明一下Leader Epoch。假设现在有两个Leader Epoch<0, 0><1, 120>那么第一个Leader Epoch表示版本号是0这个版本的Leader从位移0开始保存消息一共保存了120条消息。之后Leader发生了变更版本号增加到1新版本的起始位移是120。
Kafka Broker会在内存中为每个分区都缓存Leader Epoch数据同时它还会定期地将这些信息持久化到一个checkpoint文件中。当Leader副本写入消息到磁盘时Broker会尝试更新这部分缓存。如果该Leader是首次写入消息那么Broker会向缓存中增加一个Leader Epoch条目否则就不做更新。这样每次有Leader变更时新的Leader副本会查询这部分缓存取出对应的Leader Epoch的起始位移以避免数据丢失和不一致的情况。
接下来我们来看一个实际的例子它展示的是Leader Epoch是如何防止数据丢失的。请先看下图。
![](https://static001.geekbang.org/resource/image/4d/f5/4d97a873fc1bfaf89b5cc8259838f0f5.jpg)
我稍微解释一下单纯依赖高水位是怎么造成数据丢失的。开始时副本A和副本B都处于正常状态A是Leader副本。某个使用了默认acks设置的生产者程序向A发送了两条消息A全部写入成功此时Kafka会通知生产者说两条消息全部发送成功。
现在我们假设Leader和Follower都写入了这两条消息而且Leader副本的高水位也已经更新了但Follower副本高水位还未更新——这是可能出现的。还记得吧Follower端高水位的更新与Leader端有时间错配。倘若此时副本B所在的Broker宕机当它重启回来后副本B会执行日志截断操作将LEO值调整为之前的高水位值也就是1。这就是说位移值为1的那条消息被副本B从磁盘中删除此时副本B的底层磁盘文件中只保存有1条消息即位移值为0的那条消息。
当执行完截断操作后副本B开始从A拉取消息执行正常的消息同步。如果就在这个节骨眼上副本A所在的Broker宕机了那么Kafka就别无选择只能让副本B成为新的Leader此时当A回来后需要执行相同的日志截断操作即将高水位调整为与B相同的值也就是1。这样操作之后位移值为1的那条消息就从这两个副本中被永远地抹掉了。这就是这张图要展示的数据丢失场景。
严格来说,这个场景发生的前提是**Broker端参数min.insync.replicas设置为1**。此时一旦消息被写入到Leader副本的磁盘就会被认为是“已提交状态”但现有的时间错配问题导致Follower端的高水位更新是有滞后的。如果在这个短暂的滞后时间窗口内接连发生Broker宕机那么这类数据的丢失就是不可避免的。
现在我们来看下如何利用Leader Epoch机制来规避这种数据丢失。我依然用图的方式来说明。
![](https://static001.geekbang.org/resource/image/3a/8c/3a2e1131e8244233c076de906c174f8c.jpg)
场景和之前大致是类似的只不过引用Leader Epoch机制后Follower副本B重启回来后需要向A发送一个特殊的请求去获取Leader的LEO值。在这个例子中该值为2。当获知到Leader LEO=2后B发现该LEO值不比它自己的LEO值小而且缓存中也没有保存任何起始位移值 > 2的Epoch条目因此B无需执行任何日志截断操作。这是对高水位机制的一个明显改进即副本是否执行日志截断不再依赖于高水位进行判断。
现在副本A宕机了B成为Leader。同样地当A重启回来后执行与B相同的逻辑判断发现也不用执行日志截断至此位移值为1的那条消息在两个副本中均得到保留。后面当生产者程序向B写入新消息时副本B所在的Broker缓存中会生成新的Leader Epoch条目\[Epoch=1, Offset=2\]。之后副本B会使用这个条目帮助判断后续是否执行日志截断操作。这样通过Leader Epoch机制Kafka完美地规避了这种数据丢失场景。
## 小结
今天我向你详细地介绍了Kafka的高水位机制以及Leader Epoch机制。高水位在界定Kafka消息对外可见性以及实现副本机制等方面起到了非常重要的作用但其设计上的缺陷给Kafka留下了很多数据丢失或数据不一致的潜在风险。为此社区引入了Leader Epoch机制尝试规避掉这类风险。事实证明它的效果不错在0.11版本之后关于副本数据不一致性方面的Bug的确减少了很多。如果你想深入学习Kafka的内部原理今天的这些内容是非常值得你好好琢磨并熟练掌握的。
![](https://static001.geekbang.org/resource/image/98/3f/989d13e4bc4f44618a10b5b7bd6f523f.jpg)
## 开放讨论
在讲述高水位时我是拿2个副本举的例子。不过你应该很容易地扩展到多个副本。现在请你尝试用3个副本来说明一下副本同步全流程以及分区高水位被更新的过程。
欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。