You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

30 KiB

14 | Controller选举是怎么实现的

你好,我是胡夕。

上节课我们学习了单线程事件队列模型处理Controller事件的代码。Controller组件通过ControllerEventManager类构造了一个阻塞队列同时配以专属的事件处理线程实现了对各类ControllerEvent的处理。

这种设计思路既保证了多线程访问所需的线程安全还简化了Controller端的代码结构极大地提升了代码的可维护性。

今天我们学习下Controller选举部分的源码。

还记得我在第11节课的案例中提到的“恢复大法”——删除ZooKeeper的/controller节点吗当时我们靠着这个“秘籍”涉险过关既恢复了错误的集群状态又避免了重启整个生产环境。

但你有没有想过,为什么删除/controller节点能够令集群元数据重新保持同步呢如果不了解这背后的原理我们是不敢贸然在生产环境做这种操作的。今天我们要学习的就是这背后的一整套实现逻辑重点关注下Controller是怎么被选举出来的。

我始终认为只有掌握了这些知识才算真正入门Kafka服务器端的代码了。作为Broker端最重要的组件之一Controller在Kafka中的地位无可替代。整个Kafka集群就只有一个Controller从某种意义上来说它是目前Kafka这个分布式系统中唯一的“单点”。

因此了解这个“单点”的选举触发场景以及如何被选举出来的对于我们后面深入理解Controller在集群中的作用非常有帮助。毕竟Controller对外提供的一些服务也是采用了类似的实现原理。

概览

ZooKeeper /controller节点

再次强调下,在一个Kafka集群中某段时间内只能有一台Broker被选举为Controller。随着时间的推移可能会有不同的Broker陆续担任过Controller的角色但是在某一时刻Controller只能由一个Broker担任

那选择哪个Broker充当Controller呢当前Controller的选举过程依赖ZooKeeper完成。ZooKeeper除了扮演集群元数据的“真理之源”角色还定义了/controller临时节点Ephemeral Node以协助完成Controller的选举。

下面这段代码展示的是一个双Broker的Kafka集群上的ZooKeeper中/controller节点

{"version":1,"brokerid":0,"timestamp":"1585098432431"}
cZxid = 0x1a
ctime = Wed Mar 25 09:07:12 CST 2020
mZxid = 0x1a
mtime = Wed Mar 25 09:07:12 CST 2020
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100002d3a1f0000
dataLength = 54
numChildren = 0

有两个地方的内容,你要重点关注一下。

  • Controller Broker Id是0表示序号为0的Broker是集群Controller。
  • ephemeralOwner字段不是0x0说明这是一个临时节点。

既然是临时节点那么一旦Broker与ZooKeeper的会话终止该节点就会消失。Controller选举就依靠了这个特性。每个Broker都会监听/controller节点随时准备应聘Controller角色。下图展示了Broker与/controller节点的交互关系

如图所示集群上所有的Broker都在实时监听ZooKeeper上的这个节点。这里的“监听”有两个含义。

  • 监听这个节点是否存在。倘若发现这个节点不存在Broker会立即“抢注”该节点即创建/controller节点。创建成功的那个Broker即当选为新一届的Controller。
  • 监听这个节点数据是否发生了变更。同样一旦发现该节点的内容发生了变化Broker也会立即启动新一轮的Controller选举。

掌握了这些基础之后下面我们来阅读具体的源码文件KafkaController.scala。这是一个2200行的大文件。我先向你介绍一下这个文件的大致结构以免你陷入到一些繁枝末节中。

源码结构

KafkaController文件的代码结构如下图所示

整体而言,该文件大致由五部分组成。

  • 选举触发器ElectionTrigger这里的选举不是指Controller选举而是指主题分区副本的选举即为哪些分区选择Leader副本。后面在学习副本管理器和分区管理器时我们会讲到它。
  • KafkaController ObjectKafkaController伴生对象仅仅定义了一些常量和回调函数类型。
  • ControllerEvent定义Controller事件类型。上节课我们详细学习过Controller事件以及基于事件的单线程事件队列模型。这部分的代码看着很多但实际上都是千篇一律的。你看懂了一个事件的定义其他的也就不在话下了。
  • 各种ZooKeeper监听器定义ZooKeeper监听器去监听ZooKeeper中各个节点的变更。今天我们重点关注监听/controller节点的那个监听器。
  • KafkaController Class定义KafkaController类以及实际的处理逻辑。这是我们今天的重点学习对象。

接下来我会给你重点介绍KafkaController类、ZooKeeper监听器和Controller选举这三大部分。在众多的ZooKeeper监听器中我会详细介绍监听Controller变更的监听器它也是我们了解Controller选举流程的核心环节。

KafkaController类

这个类大约有1900行代码里面定义了非常多的变量和方法。这些方法大多是处理不同Controller事件的。后面讲到选举流程的时候我会挑一些有代表性的来介绍。我希望你能举一反三借此吃透其他方法的代码。毕竟它们做的事情大同小异至少代码风格非常相似。

在学习重要的方法之前我们必须要先掌握KafkaController类的定义。接下来我们从4个维度来进行学习分别是原生字段、辅助字段、各类ZooKeeper监听器字段和统计字段。

弄明白了这些字段的含义之后,再去看操作这些字段的方法,会更加有的放矢,理解起来也会更加容易。

原生字段

首先来看原生字段。所谓的原生字段是指在创建一个KafkaController实例时需要指定的字段。

先来看下KafkaController类的定义代码

// 字段含义:
// configKafka配置信息通过它你能拿到Broker端所有参数的值
// zkClientZooKeeper客户端Controller与ZooKeeper的所有交互均通过该属性完成
// time提供时间服务(如获取当前时间)的工具类
// metrics实现指标监控服务(如创建监控指标)的工具类
// initialBrokerInfoBroker节点信息包括主机名、端口号所用监听器等
// initialBrokerEpochBroker Epoch值用于隔离老Controller发送的请求
// tokenManager实现Delegation token管理的工具类。Delegation token是一种轻量级的认证机制
// threadNamePrefixController端事件处理线程名字前缀
class KafkaController(val config: KafkaConfig,
                      zkClient: KafkaZkClient,
                      time: Time,
                      metrics: Metrics,
                      initialBrokerInfo: BrokerInfo,
                      initialBrokerEpoch: Long,
                      tokenManager: DelegationTokenManager,
                      threadNamePrefix: Option[String] = None)
  extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
  ......
}

就像我上节课说过的KafkaController实现了ControllerEventProcessor接口因而也就实现了处理Controller事件的process方法。这里面比较重要的字段有3个。

  • configKafkaConfig类实例里面封装了Broker端所有参数的值。
  • zkClientZooKeeper客户端类定义了与ZooKeeper交互的所有方法。
  • initialBrokerEpochController所在Broker的Epoch值。Kafka使用它来确保Broker不会处理老Controller发来的请求。

其他字段要么是像time、metrics一样是工具类字段要么是像initialBrokerInfo、tokenManager字段一样使用场景很有限我就不展开讲了。

辅助字段

除了原生字段之外KafkaController还定义了很多辅助字段帮助实现Controller的各类功能。

我们来看一些重要的辅助字段:

......
// 集群元数据类,保存集群所有元数据
val controllerContext = new ControllerContext
// Controller端通道管理器类负责Controller向Broker发送请求
var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
  stateChangeLogger, threadNamePrefix)
// 线程调度器当前唯一负责定期执行Leader重选举
private[controller] val kafkaScheduler = new KafkaScheduler(1)
// Controller事件管理器负责管理事件处理线程
private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,
  controllerContext.stats.rateAndTimeMetrics)
......
// 副本状态机,负责副本状态转换
val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
// 分区状态机,负责分区状态转换
val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
// 主题删除管理器,负责删除主题及日志
val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
  partitionStateMachine, new ControllerDeletionClient(this, zkClient))
......

其中有7个字段是重中之重。

  • controllerContext:集群元数据类,保存集群所有元数据。
  • controllerChannelManagerController端通道管理器类负责Controller向Broker发送请求。
  • kafkaScheduler线程调度器当前唯一负责定期执行分区重平衡Leader选举。
  • eventManagerController事件管理器负责管理事件处理线程。
  • replicaStateMachine:副本状态机,负责副本状态转换。
  • partitionStateMachine:分区状态机,负责分区状态转换。
  • topicDeletionManager:主题删除管理器,负责删除主题及日志。

各类ZooKeeper监听器

我们今天开头学到的ControllerChangeHandler仅仅是其中的一种。实际上该类定义了很多监听器如下所示

// Controller节点ZooKeeper监听器
private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
// Broker数量ZooKeeper监听器
private val brokerChangeHandler = new BrokerChangeHandler(eventManager)
// Broker信息变更ZooKeeper监听器集合
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
// 主题数量ZooKeeper监听器
private val topicChangeHandler = new TopicChangeHandler(eventManager)
// 主题删除ZooKeeper监听器
private val topicDeletionHandler = new TopicDeletionHandler(eventManager)
// 主题分区变更ZooKeeper监听器
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
// 主题分区重分配ZooKeeper监听器
private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)
// Preferred Leader选举ZooKeeper监听器
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
// ISR副本集合变更ZooKeeper监听器
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
// 日志路径变更ZooKeeper监听器
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)

我分别解释一下这些ZooKeeper监听器的作用

  • controllerChangeHandler:前面说过,它是监听/controller节点变更的。这种变更包括节点创建、删除以及数据变更。
  • brokerChangeHandler监听Broker的数量变化。
  • brokerModificationsHandlers监听Broker的数据变更比如Broker的配置信息发生的变化。
  • topicChangeHandler:监控主题数量变更。
  • topicDeletionHandler:监听主题删除节点/admin/delete_topics的子节点数量变更。
  • partitionModificationsHandlers监控主题分区数据变更的监听器比如新增加了副本、分区更换了Leader副本。
  • partitionReassignmentHandler:监听分区副本重分配任务。一旦发现新提交的任务,就为目标分区执行副本重分配。
  • preferredReplicaElectionHandler监听Preferred Leader选举任务。一旦发现新提交的任务就为目标主题执行Preferred Leader选举。
  • isrChangeNotificationHandler监听ISR副本集合变更。一旦被触发就需要获取ISR发生变更的分区列表然后更新Controller端对应的Leader和ISR缓存元数据。
  • logDirEventNotificationHandler监听日志路径变更。一旦被触发需要获取受影响的Broker列表然后处理这些Broker上失效的日志路径。

我画了一张脑图希望可以帮助你更高效地记住这些ZooKeeper监听器

统计字段

最后,我们来看统计字段。

这些统计字段大多用于计算统计指标。有的监控指标甚至是非常重要的Controller监控项比如ActiveControllerCount指标。下面我们来了解下KafkaController都定义了哪些统计字段。这些指标的含义一目了然非常清晰我用注释的方式给出每个字段的含义

// 当前Controller所在Broker Id
@volatile private var activeControllerId = -1
// 离线分区总数
@volatile private var offlinePartitionCount = 0
// 满足Preferred Leader选举条件的总分区数
@volatile private var preferredReplicaImbalanceCount = 0
// 总主题数
@volatile private var globalTopicCount = 0
// 总主题分区数
@volatile private var globalPartitionCount = 0
// 待删除主题数
@volatile private var topicsToDeleteCount = 0
//待删除副本数
@volatile private var replicasToDeleteCount = 0
// 暂时无法删除的主题数
@volatile private var ineligibleTopicsToDeleteCount = 0
// 暂时无法删除的副本数
@volatile private var ineligibleReplicasToDeleteCount = 0

好了KafkaController类的定义我们就全部介绍完了。再次强调下因为KafkaController类的代码很多我强烈建议你熟练掌握这些字段的含义因为后面的所有方法都是围绕着这些字段进行操作的。

接下来我以Controller的选举流程为例引出KafkaController的一些方法的实现原理。不过在此之前我们要学习监听Controller变更的ZooKeeper监听器ControllerChangeHandler的源码。

ControllerChangeHandler监听器

就像我前面说到的KafkaController定义了十几种ZooKeeper监听器。和Controller相关的监听器是ControllerChangeHandler用于监听Controller的变更定义代码如下

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
  // ZooKeeper中Controller节点路径即/controller
  override val path: String = ControllerZNode.path
  // 监听/controller节点创建事件
  override def handleCreation(): Unit = eventManager.put(ControllerChange)
  // 监听/controller节点被删除事件
  override def handleDeletion(): Unit = eventManager.put(Reelect)
  // 监听/controller节点数据变更事件
  override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

该监听器接收ControllerEventManager实例实现了ZNodeChangeHandler接口的三个方法handleCreationhandleDeletionhandleDataChange。该监听器下的path变量实际上就是/controller字符串表示它监听ZooKeeper的这个节点。

3个handle方法都用于监听/controller节点的变更但实现细节上稍有不同。

handleCreation和handleDataChange的处理方式是向事件队列写入ControllerChange事件handleDeletion的处理方式是向事件队列写入Reelect事件。

Deletion表明ZooKeeper中/controller节点不存在了即Kafka集群中的Controller暂时空缺了。因为它和Creation和DataChange是不同的状态需要区别对待因此Reelect事件做的事情要比ControllerChange的多处理ControllerChange事件只需要当前Broker执行“卸任Controller”的逻辑即可而Reelect事件是重选举除了Broker执行卸任逻辑之外还要求Broker参与到重选举中来。

由于KafkaController的process方法代码非常长因此我节选了刚刚提到的那两个事件的处理代码

// process方法(部分)
override def process(event: ControllerEvent): Unit = {
    try {
      event match {
       ......
       // ControllerChange事件
       case ControllerChange =>
          processControllerChange()
       // Reelect事件
       case Reelect =>
          processReelect()
        ......
      }
    }
    ......
}
// 如果是ControllerChange事件仅执行卸任逻辑即可
private def processControllerChange(): Unit = {
    maybeResign()
  }
// 如果是Reelect事件还需要执行elect方法参与新一轮的选举
private def processReelect(): Unit = {
    maybeResign()
    elect()
}

可以看到虽然代码非常长但整体结构却工整清晰全部都是基于模式匹配的事件处理。process方法会根据给定的Controller事件类型调用对应的process***方法处理该事件。这里只列举了ZooKeeper端/controller节点监听器监听的两类事件以及对应的处理方法。

对于ControllerChange事件而言处理方式是调用maybeResign去执行Controller的卸任逻辑。如果是Reelect事件除了执行卸任逻辑之外还要额外执行elect方法进行新一轮的Controller选举。

Controller选举流程

说完了ControllerChangeHandler源码我们来看下Controller的选举。所谓的Controller选举是指Kafka选择集群中一台Broker行使Controller职责。整个选举过程分为两个步骤触发选举和开始选举。

触发选举

我先用一张图展示下可能触发Controller选举的三个场景。

这三个场景是:

  1. 集群从零启动时;
  2. Broker侦测/controller节点消失时
  3. Broker侦测到/controller节点数据发生变更时。

这三个场景殊途同归最后都要执行选举Controller的动作。我来一一解释下这三个场景然后再介绍选举Controller的具体操作。

场景一:集群从零启动

集群首次启动时Controller尚未被选举出来。于是Broker启动后首先将Startup这个ControllerEvent写入到事件队列中然后启动对应的事件处理线程和ControllerChangeHandler ZooKeeper监听器最后依赖事件处理线程进行Controller的选举。

在源码中KafkaController类的startup方法就是做这些事情的。当Broker启动时它会调用这个方法启动ControllerEventThread线程。值得注意的是每个Broker都需要做这些事情不是说只有Controller所在的Broker才需要执行这些逻辑

startup方法的主体代码如下

def startup() = {
  // 第1步注册ZooKeeper状态变更监听器它是用于监听Zookeeper会话过期的
  zkClient.registerStateChangeHandler(new StateChangeHandler {
    override val name: String = StateChangeHandlers.ControllerHandler
    override def afterInitializingSession(): Unit = {
      eventManager.put(RegisterBrokerAndReelect)
    }
    override def beforeInitializingSession(): Unit = {
      val queuedEvent = eventManager.clearAndPut(Expire)
      queuedEvent.awaitProcessing()
    }
  })
  // 第2步写入Startup事件到事件队列
  eventManager.put(Startup)
  // 第3步启动ControllerEventThread线程开始处理事件队列中的ControllerEvent
  eventManager.start()
}


首先startup方法会注册ZooKeeper状态变更监听器用于监听Broker与ZooKeeper之间的会话是否过期。接着写入Startup事件到事件队列然后启动ControllerEventThread线程开始处理事件队列中的Startup事件。

接下来我们来学习下KafkaController的process方法处理Startup事件的方法

// KafkaController的process方法
override def process(event: ControllerEvent): Unit = {
    try {
      event match {
       ......
       case Startup =>
          processStartup() // 处理Startup事件
      }
    }
    ......
}
private def processStartup(): Unit = {
   // 注册ControllerChangeHandler ZooKeeper监听器
   zkClient.registerZNodeChangeHandlerAndCheckExistence(
    controllerChangeHandler)
   // 执行Controller选举
   elect()
}

从这段代码可知process方法调用processStartup方法去处理Startup事件。而processStartup方法又会调用zkClient的registerZNodeChangeHandlerAndCheckExistence方法注册ControllerChangeHandler监听器。

值得注意的是,虽然前面的三个场景是并列的关系,但实际上,后面的两个场景必须要等场景一的这一步成功执行之后,才能被触发。

这三种场景都要选举Controller因此我们最后统一学习elect方法的代码实现。

总体来说集群启动时Broker通过向事件队列“塞入”Startup事件的方式来触发Controller的竞选。

场景二:/controller节点消失

Broker检测到/controller节点消失时就意味着此时整个集群中没有Controller。因此所有检测到/controller节点消失的Broker都会立即调用elect方法执行竞选逻辑。

你可能会问“Broker是怎么侦测到ZooKeeper上的这一变化的呢”实际上这是ZooKeeper监听器提供的功能换句话说这是Apache ZooKeeper自己实现的功能所以我们才说Kafka依赖ZooKeeper完成Controller的选举。

讲到这里我说点题外话社区最近正在酝酿彻底移除ZooKeeper依赖。具体到Controller端的变化就是在Kafka内部实现一个类似于Raft的共识算法来选举Controller。我会在后面的特别放送里详细讲一下社区移除ZooKeeper的全盘计划。

场景三:/controller节点数据变更

Broker检测到/controller节点数据发生变化通常表明Controller“易主”了这就分为两种情况

  • 如果Broker之前是Controller那么该Broker需要首先执行卸任操作然后再尝试竞选
  • 如果Broker之前不是Controller那么该Broker直接去竞选新Controller。

具体到代码层面maybeResign方法形象地说明了这两种情况。你要注意方法中的maybe字样这表明Broker可能需要执行卸任操作也可能不需要。Kafka源码非常喜欢用maybe***来命名方法名以表示那些在特定条件下才需要执行的逻辑。以下是maybeResign的实现

private def maybeResign(): Unit = {
  // 非常关键的一步!这是判断是否需要执行卸任逻辑的重要依据!
  // 判断该Broker之前是否是Controller
  val wasActiveBeforeChange = isActive
  // 注册ControllerChangeHandler监听器  
  zkClient.registerZNodeChangeHandlerAndCheckExistence(
    controllerChangeHandler)
  // 获取当前集群Controller所在的Broker Id如果没有Controller则返回-1
  activeControllerId = zkClient.getControllerId.getOrElse(-1)
  // 如果该Broker之前是Controller但现在不是了
  if (wasActiveBeforeChange && !isActive) {
    onControllerResignation() // 执行卸任逻辑
  }
}

代码的第一行非常关键它是决定是否需要执行卸任的重要依据。毕竟如果Broker之前不是Controller那何来“卸任”一说呢之后代码要注册ControllerChangeHandler监听器获取当前集群Controller所在的Broker ID如果没有Controller则返回-1。有了这些数据之后maybeResign方法需要判断该Broker是否之前是Controller但现在不是了。如果是这种情况的话则调用onControllerResignation方法执行Controller卸任逻辑。

说到“卸任”你可能会问“卸任逻辑是由哪个方法执行的呢”实际上这是由onControllerResignation方法执行的它主要是用于清空各种数据结构的值、取消ZooKeeper监听器、关闭各种状态机以及管理器等等。我用注释的方式给出它的逻辑实现

private def onControllerResignation(): Unit = {
  debug("Resigning")
  // 取消ZooKeeper监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(
    isrChangeNotificationHandler.path)
  zkClient.unregisterZNodeChangeHandler(
    partitionReassignmentHandler.path)
  zkClient.unregisterZNodeChangeHandler(
    preferredReplicaElectionHandler.path)
  zkClient.unregisterZNodeChildChangeHandler(
    logDirEventNotificationHandler.path)
  unregisterBrokerModificationsHandler(
    brokerModificationsHandlers.keySet)
  // 关闭Kafka线程调度器其实就是取消定期的Leader重选举
  kafkaScheduler.shutdown()
  // 将统计字段全部清0
  offlinePartitionCount = 0
  preferredReplicaImbalanceCount = 0
  globalTopicCount = 0
  globalPartitionCount = 0
  topicsToDeleteCount = 0
  replicasToDeleteCount = 0
  ineligibleTopicsToDeleteCount = 0
  ineligibleReplicasToDeleteCount = 0
  // 关闭Token过期检查调度器
  if (tokenCleanScheduler.isStarted)
    tokenCleanScheduler.shutdown()
  // 取消分区重分配监听器的注册
  unregisterPartitionReassignmentIsrChangeHandlers()
  // 关闭分区状态机
  partitionStateMachine.shutdown()
  // 取消主题变更监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
  // 取消分区变更监听器的注册
  unregisterPartitionModificationsHandlers(
    partitionModificationsHandlers.keys.toSeq)
  // 取消主题删除监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(
    topicDeletionHandler.path)
  // 关闭副本状态机
  replicaStateMachine.shutdown()
  // 取消Broker变更监听器的注册
  zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
  // 关闭Controller通道管理器
  controllerChannelManager.shutdown()
  // 清空集群元数据
  controllerContext.resetContext()
  info("Resigned")
}


选举Controller

讲完了触发场景接下来我们就要学习Controller选举的源码了。前面说过了这三种选举场景最后都会调用elect方法来执行选举逻辑。我们来看下它的实现

private def elect(): Unit = {
    // 第1步获取当前Controller所在Broker的序号如果Controller不存在显式标记为-1
    activeControllerId = zkClient.getControllerId.getOrElse(-1)

    // 第2步如果当前Controller已经选出来了直接返回即可
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    }

    try {
      // 第3步注册Controller相关信息
      // 主要是创建/controller节点
      val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
      controllerContext.epoch = epoch
      controllerContext.epochZkVersion = epochZkVersion
      activeControllerId = config.brokerId

      info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
        s"and epoch zk version is now ${controllerContext.epochZkVersion}")

      // 第4步执行当选Controller的后续逻辑
      onControllerFailover()
    } catch {
      case e: ControllerMovedException =>
        maybeResign()

        if (activeControllerId != -1)
          debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
        else
          warn("A controller has been elected but just resigned, this will result in another round of election", e)

      case t: Throwable =>
        error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
          s"Trigger controller movement immediately", t)
        triggerControllerMove()
    }
  }

为了帮助你更好地理解这个方法,我再画一张图来进行说明:

该方法首先检查Controller是否已经选出来了。要知道集群中的所有Broker都要执行这些逻辑因此非常有可能出现某些Broker在执行elect方法时Controller已经被选出来的情况。如果Controller已经选出来了那么自然也就不用再做什么了。相反地如果Controller尚未被选举出来那么代码会尝试创建/controller节点去抢注Controller。

一旦抢注成功就调用onControllerFailover方法执行选举成功后的动作。这些动作包括注册各类ZooKeeper监听器、删除日志路径变更和ISR副本变更通知事件、启动Controller通道管理器以及启动副本状态机和分区状态机。

如果抢注失败了代码会抛出ControllerMovedException异常。这通常表明Controller已经被其他Broker抢先占据了那么此时代码调用maybeResign方法去执行卸任逻辑。

总结

今天我们梳理了Controller选举的全过程包括Controller如何借助ZooKeeper监听器实现监听Controller节点以及Controller的选举触发场景和完整流程。我们来回顾一下这节课的重点。

  • Controller依赖ZooKeeper实现Controller选举主要是借助于/controller临时节点和ZooKeeper的监听器机制。
  • Controller触发场景有3种集群启动时/controller节点被删除时/controller节点数据变更时。
  • 源码最终调用elect方法实现Controller选举。

下节课我将带你学习Controller的其他重要功能包括它如何管理Broker和副本等。你千万不要错过。

课后讨论

在这节课刚开始的时候,我提到,删除/controller会触发Controller选举之后会同步集群元数据信息。那么你知道源码是在哪里更新的元数据请求吗

欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。