You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
14 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 24 | Kafka的协调服务ZooKeeper实现分布式系统的“瑞士军刀”
你好,我是李玥。
上节课我带你一起学习了RocketMQ NameServer的源代码RocketMQ的NameServer虽然设计非常简洁但很好地解决了路由寻址的问题。
而Kafka却采用了完全不同的设计思路它选择使用ZooKeeper这样一个分布式协调服务来实现和RocketMQ的NameServer差不多的功能。
这节课我先带大家简单了解一下ZooKeeper然后再来一起学习一下Kafka是如何借助ZooKeeper来构建集群实现路由寻址的。
## ZooKeeper的作用是什么
Apache ZooKeeper它是一个非常特殊的中间件为什么这么说呢一般来说像中间件类的开源产品大多遵循“做一件事并做好它。”这样的UNIX哲学每个软件都专注于一种功能上。而ZooKeeper更像是一个“瑞士军刀”它提供了很多基本的操作能实现什么样的功能更多取决于使用者如何来使用它。
ZooKeeper 作为一个分布式的协调服务框架主要用来解决分布式集群中应用系统需要面对的各种通用的一致性问题。ZooKeeper本身可以部署为一个集群集群的各个节点之间可以通过选举来产生一个Leader选举遵循半数以上的原则所以一般集群需要部署奇数个节点。
ZooKeeper最核心的功能是它提供了一个分布式的存储系统数据的组织方式类似于UNIX文件系统的树形结构。由于这是一个可以保证一致性的存储系统所以你可以放心地在你的应用集群中读写ZooKeeper的数据而不用担心数据一致性的问题。分布式系统中一些需要整个集群所有节点都访问的元数据比如集群节点信息、公共配置信息等特别适合保存在ZooKeeper中。
在这个树形的存储结构中每个节点被称为一个“ZNode”。ZooKeeper提供了一种特殊的ZNode类型临时节点。这种临时节点有一个特性如果创建临时节点的客户端与ZooKeeper集群失去连接这个临时节点就会自动消失。在ZooKeeper内部它维护了ZooKeeper集群与所有客户端的心跳通过判断心跳的状态来确定是否需要删除客户端创建的临时节点。
ZooKeeper还提供了一种订阅ZNode状态变化的通知机制Watcher一旦ZNode或者它的子节点状态发生了变化订阅的客户端会立即收到通知。
利用ZooKeeper临时节点和Watcher机制我们很容易随时来获取业务集群中每个节点的存活状态并且可以监控业务集群的节点变化情况当有节点上下线时都可以收到来自ZooKeeper的通知。
此外我们还可以用ZooKeeper来实现业务集群的快速选举、节点间的简单通信、分布式锁等很多功能。
下面我带你一起来看一下Kafka是如何来使用ZooKeeper的。
## Kafka在ZooKeeper中保存了哪些信息
首先我们来看一下Kafka在ZooKeeper都保存了哪些信息我把这些ZNode整理了一张图方便你来学习。
![](https://static001.geekbang.org/resource/image/80/b3/806ac0fc52ccbf50506e3b5d269b81b3.jpg)
你可能在网上看到过和这个图类似的其他版本的图这些图中绘制的ZNode比我们这张图要多一些这些图大都是描述的0.8.x的旧版本的情况最新版本的Kafka已经将消费位置管理等一些原本依赖ZooKeeper实现的功能替换成了其他的实现方式。
图中圆角的矩形是临时节点,直角矩形是持久化的节点。
我们从左往右来看左侧这棵树保存的是Kafka的Broker信息/brokers/ids/\[0…N\]每个临时节点对应着一个在线的BrokerBroker启动后会创建一个临时节点代表Broker已经加入集群可以提供服务了节点名称就是BrokerID节点内保存了包括Broker的地址、版本号、启动时间等等一些Broker的基本信息。如果Broker宕机或者与ZooKeeper集群失联了这个临时节点也会随之消失。
右侧部分的这棵树保存的就是主题和分区的信息。/brokers/topics/节点下面的每个子节点都是一个主题节点的名称就是主题名称。每个主题节点下面都包含一个固定的partitions节点pattitions节点的子节点就是主题下的所有分区节点名称就是分区编号。
每个分区节点下面是一个名为state的临时节点节点中保存着分区当前的leader和所有的ISR的BrokerID。这个state临时节点是由这个分区当前的Leader Broker创建的。如果这个分区的Leader Broker宕机了对应的这个state临时节点也会消失直到新的Leader被选举出来再次创建state临时节点。
## Kafka客户端如何找到对应的Broker
那Kafka客户端如何找到主题、队列对应的Broker呢其实通过上面ZooKeeper中的数据结构你应该已经可以猜的八九不离十了。是的先根据主题和队列在右边的树中找到分区对应的state临时节点我们刚刚说过state节点中保存了这个分区Leader的BrokerID。拿到这个Leader的BrokerID后再去左侧的树中找到BrokerID对应的临时节点就可以获取到Broker真正的访问地址了。
在《[21 | Kafka Consumer源码分析消息消费的实现过程](https://time.geekbang.org/column/article/135120)》这一节课中我讲过Kafka的客户端并不会去直接连接ZooKeeper它只会和Broker进行远程通信那我们可以合理推测一下ZooKeeper上的元数据应该是通过Broker中转给每个客户端的。
下面我们一起看一下Kafka的源代码来验证一下我们的猜测是不是正确的。
在之前的课程中我和大家讲过客户端真正与服务端发生网络传输是在org.apache.kafka.clients.NetworkClient#poll方法中实现的我们一直跟踪这个调用链
```
NetworkClient#poll() -> DefaultMetadataUpdater#maybeUpdate(long) -> DefaultMetadataUpdater#maybeUpdate(long, Node)
```
直到maybeUpdate(long, Node)这个方法在这个方法里面Kafka构造了一个更新元数据的请求
```
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId, now)) {
// 构建一个更新元数据的请求的构造器
Metadata.MetadataRequestAndVersion metadataRequestAndVersion = metadata.newMetadataRequestAndVersion();
inProgressRequestVersion = metadataRequestAndVersion.requestVersion;
MetadataRequest.Builder metadataRequest = metadataRequestAndVersion.requestBuilder;
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
// 发送更新元数据的请求
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return defaultRequestTimeoutMs;
}
//...
}
```
这段代码先构造了更新元数据的请求的构造器然后调用sendInternalMetadataRequest()把这个请求放到待发送的队列中。这里面有两个地方我需要特别说明一下。
第一点是在这个方法里面创建的并不是一个真正的更新元数据的MetadataRequest而是一个用于构造MetadataRequest的构造器MetadataRequest.Builder等到真正要发送请求之前Kafka才会调用Builder.buid()方法把这个MetadataRequest构建出来然后发送出去。而且不仅是元数据的请求所有的请求都是这样来处理的。
第二点是调用sendInternalMetadataRequest()方法时,这个请求也并没有被真正发出去,依然是保存在待发送的队列中,然后择机来异步批量发送。
请求的具体内容封装在org.apache.kafka.common.requests.MetadataRequest这个对象中它包含的信息很简单只有一个主题的列表来表明需要获取哪些主题的元数据另外还有一个布尔类型的字段allowAutoTopicCreation表示是否允许自动创建主题。
然后我们再来看下在Broker中Kafka是怎么来处理这个更新元数据的请求的。
Broker处理所有RPC请求的入口类在kafka.server.KafkaApis#handle这个方法里面我们找到对应处理更新元数据的方法handleTopicMetadataRequest(RequestChannel.Request)这段代码是用Scala语言编写的
```
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.body[MetadataRequest]
val requestVersion = request.header.apiVersion
// 计算需要获取哪些主题的元数据
val topics =
// 在旧版本的协议中,每次都获取所有主题的元数据
if (requestVersion == 0) {
if (metadataRequest.topics() == null || metadataRequest.topics.isEmpty)
metadataCache.getAllTopics()
else
metadataRequest.topics.asScala.toSet
} else {
if (metadataRequest.isAllTopics)
metadataCache.getAllTopics()
else
metadataRequest.topics.asScala.toSet
}
// 省略掉鉴权相关代码
// ...
val topicMetadata =
if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
else
// 从元数据缓存过滤出相关主题的元数据
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
// ...
// 获取所有Broker列表
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))
// 构建Response并发送
sendResponseMaybeThrottle(request, requestThrottleMs =>
new MetadataResponse(
requestThrottleMs,
brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava
))
}
```
这段代码的主要逻辑是先根据请求中的主题列表去本地的元数据缓存MetadataCache中过滤出相应主题的元数据也就是我们上面那张图中右半部分的那棵树的子集然后再去本地元数据缓存中获取所有Broker的集合也就是上图中左半部分那棵树最后把这两部分合在一起作为响应返回给客户端。
Kafka在每个Broker中都维护了一份和ZooKeeper中一样的元数据缓存并不是每次客户端请求元数据就去读一次ZooKeeper。由于ZooKeeper提供了Watcher这种监控机制Kafka可以感知到ZooKeeper中的元数据变化从而及时更新Broker中的元数据缓存。
这样就完成了一次完整的更新元数据的流程。通过分析代码,可以证实,我们开始的猜测都是没有问题的。
## 小结
最后我们对这节课的内容做一个总结。
首先我们简单的介绍了ZooKeeper它是一个分布式的协调服务它的核心服务是一个高可用、高可靠的一致性存储在此基础上提供了包括读写元数据、节点监控、选举、节点间通信和分布式锁等很多功能**这些功能可以极大方便我们快速开发一个分布式的集群系统。**
但是ZooKeeper也并不是完美的在使用的时候你需要注意几个问题
1. 不要往ZooKeeper里面写入大量数据它不是一个真正意义上的存储系统只适合存放少量的数据。依据服务器配置的不同ZooKeeper在写入超过几百MB数据之后性能和稳定性都会严重下降。
2. 不要让业务集群的可用性依赖于ZooKeeper的可用性什么意思呢你的系统可以使用Zookeeper但你要留一手要考虑如果Zookeeper集群宕机了你的业务集群最好还能提供服务。因为ZooKeeper的选举过程是比较慢的而它对网络的抖动又比较敏感一旦触发选举这段时间内的ZooKeeper是不能提供任何服务的。
Kafka主要使用ZooKeeper来保存它的元数据、监控Broker和分区的存活状态并利用ZooKeeper来进行选举。
Kafka在ZooKeeper中保存的元数据主要就是Broker的列表和主题分区信息两棵树。这份元数据同时也被缓存到每一个Broker中。客户端并不直接和ZooKeeper来通信而是在需要的时候通过RPC请求去Broker上拉取它关心的主题的元数据然后保存到客户端的元数据缓存中以便支撑客户端生产和消费。
可以看到目前Kafka的这种设计集群的可用性是严重依赖ZooKeeper的也就是说如果ZooKeeper集群不能提供服务那整个Kafka集群也就不能提供服务了这其实是一个不太好的设计。
如果你需要要部署大规模的Kafka集群建议的方式是拆分成多个互相独立的小集群部署每个小集群都使用一组独立的ZooKeeper提供服务。这样每个ZooKeeper中存储的数据相对比较少并且如果某个ZooKeeper集群故障只会影响到一个小的Kafka集群故障的影响面相对小一些。
Kafka的开发者也意识到了这个问题目前正在讨论开发一个元数据服务来替代ZooKeeper感兴趣的同学可以看一下他们的[Proposal](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum)。
## 思考题
本节课的思考题是这样的请你顺着我们这节课源码分析的思路继续深挖进去看一下Broker中的元数据缓存又是如何与ZooKeeper中的元数据保持同步的呢欢迎在留言区写下你的想法。
感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给你的朋友。