You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

27 KiB

32 | GroupCoordinator在Rebalance中Coordinator如何处理成员入组

你好我是胡夕。不知不觉间课程已经接近尾声了最后这两节课我们来学习一下消费者组的Rebalance流程是如何完成的。

提到Rebalance你的第一反应一定是“爱恨交加”。毕竟如果使用得当它能够自动帮我们实现消费者之间的负载均衡和故障转移但如果配置失当我们就可能触碰到它被诟病已久的缺陷耗时长而且会出现消费中断。

在使用消费者组的实践中你肯定想知道应该如何避免Rebalance。如果你不了解Rebalance的源码机制的话就很容易掉进它无意中铺设的“陷阱”里。

举个小例子。有些人认为Consumer端参数session.timeout.ms决定了完成一次Rebalance流程的最大时间。这种认知是不对的实际上这个参数是用于检测消费者组成员存活性的即如果在这段超时时间内没有收到该成员发给Coordinator的心跳请求则把该成员标记为Dead而且要显式地将其从消费者组中移除并触发新一轮的Rebalance。而真正决定单次Rebalance所用最大时长的参数是Consumer端的max.poll.interval.ms。显然,如果没有搞懂这部分的源码,你就没办法为这些参数设置合理的数值。

总体而言, Rebalance的流程大致分为两大步加入组JoinGroup和组同步SyncGroup

加入组是指消费者组下的各个成员向Coordinator发送JoinGroupRequest请求加入进组的过程。这个过程有一个超时时间如果有成员在超时时间之内无法完成加入组操作它就会被排除在这轮Rebalance之外。

组同步是指当所有成员都成功加入组之后Coordinator指定其中一个成员为Leader然后将订阅分区信息发给Leader成员。接着所有成员包括Leader成员向Coordinator发送SyncGroupRequest请求。需要注意的是只有Leader成员发送的请求中包含了订阅分区消费分配方案在其他成员发送的请求中这部分的内容为空。当Coordinator接收到分配方案后会通过向成员发送响应的方式通知各个成员要消费哪些分区。

当组同步完成后Rebalance宣告结束。此时消费者组处于正常工作状态。

今天我们就学习下第一大步也就是加入组的源码实现它们位于GroupCoordinator.scala文件中。下节课我们再深入地学习组同步的源码实现。

要搞懂加入组的源码机制我们必须要学习4个方法分别是handleJoinGroup、doUnknownJoinGroup、doJoinGroup和addMemberAndRebalance。handleJoinGroup是执行加入组的顶层方法被KafkaApis类调用该方法依据给定消费者组成员是否了设置成员ID来决定是调用doUnknownJoinGroup还是doJoinGroup前者对应于未设定成员ID的情形后者对应于已设定成员ID的情形。而这两个方法都会调用addMemberAndRebalance执行真正的加入组逻辑。为了帮助你理解它们之间的交互关系我画了一张图借用它展示了这4个方法的调用顺序。

handleJoinGroup方法

如果你翻开KafkaApis.scala这个API入口文件就可以看到处理JoinGroupRequest请求的方法是handleJoinGroupRequest。而它的主要逻辑就是调用GroupCoordinator的handleJoinGroup方法来处理消费者组成员发送过来的加入组请求所以我们要具体学习一下handleJoinGroup方法。先看它的方法签名:

def handleJoinGroup(
  groupId: String, // 消费者组名
  memberId: String, // 消费者组成员ID
  groupInstanceId: Option[String], // 组实例ID用于标识静态成员
  requireKnownMemberId: Boolean, // 是否需要成员ID不为空
  clientId: String, // client.id值
  clientHost: String, // 消费者程序主机名
  rebalanceTimeoutMs: Int, // Rebalance超时时间,默认是max.poll.interval.ms值
  sessionTimeoutMs: Int, // 会话超时时间
  protocolType: String, // 协议类型
  protocols: List[(String, Array[Byte])], // 按照分配策略分组的订阅分区
  responseCallback: JoinCallback // 回调函数
  ): Unit = {
  ......
} 

这个方法的参数有很多,我介绍几个比较关键的。接下来在阅读其他方法的源码时,你还会看到这些参数,所以,这里你一定要提前掌握它们的含义。

  • groupId消费者组名。
  • memberId消费者组成员ID。如果成员是新加入的那么该字段是空字符串。
  • groupInstanceId这是社区于2.4版本引入的静态成员字段。静态成员的引入可以有效避免因系统升级或程序更新而导致的Rebalance场景。它属于比较高阶的用法而且目前还没有被大规模使用因此这里你只需要简单了解一下它的作用。另外后面在讲其他方法时我会直接省略静态成员的代码我们只关注核心逻辑就行了。
  • requireKnownMemberId是否要求成员ID不为空即是否要求成员必须设置ID的布尔字段。这个字段如果为True的话那么Kafka要求消费者组成员必须设置ID。未设置ID的成员会被拒绝加入组。直到它设置了ID之后才能重新加入组。
  • clientId消费者端参数client.id值。Coordinator使用它来生成memberId。memberId的格式是clientId值-UUID。
  • clientHost消费者程序的主机名。
  • rebalanceTimeoutMsRebalance超时时间。如果在这个时间段内消费者组成员没有完成加入组的操作就会被禁止入组。
  • sessionTimeoutMs会话超时时间。如果消费者组成员无法在这段时间内向Coordinator汇报心跳那么将被视为“已过期”从而引发新一轮Rebalance。
  • responseCallback完成加入组之后的回调逻辑方法。当消费者组成员成功加入组之后需要执行该方法。

说完了方法签名,我们看下它的主体代码:

// 验证消费者组状态的合法性
validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
  responseCallback(JoinGroupResult(memberId, error))
  return
}
// 确保sessionTimeoutMs介于
// [group.min.session.timeout.ms值group.max.session.timeout.ms值]之间
// 否则抛出异常,表示超时时间设置无效
if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
  sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
  responseCallback(JoinGroupResult(memberId, Errors.INVALID_SESSION_TIMEOUT))
} else {
  // 消费者组成员ID是否为空
  val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
  // 获取消费者组信息,如果组不存在,就创建一个新的消费者组
  groupManager.getOrMaybeCreateGroup(groupId, isUnknownMember) match {
    case None =>
      responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
    case Some(group) =>
      group.inLock {
        // 如果该消费者组已满员
        if (!acceptJoiningMember(group, memberId)) {
          // 移除该消费者组成员
          group.remove(memberId)
          group.removeStaticMember(groupInstanceId)
          // 封装异常表明组已满员
          responseCallback(JoinGroupResult(
            JoinGroupRequest.UNKNOWN_MEMBER_ID, 
            Errors.GROUP_MAX_SIZE_REACHED))
        // 如果消费者组成员ID为空
        } else if (isUnknownMember) {
          // 为空ID成员执行加入组操作
          doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
        } else {
          // 为非空ID成员执行加入组操作
          doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
        }
        // 如果消费者组正处于PreparingRebalance状态
        if (group.is(PreparingRebalance)) {
          // 放入Purgatory等待后面统一延时处理
          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
        }
      }
  }
}

为了方便你更直观地理解,我画了一张图来说明它的完整流程。

第1步调用validateGroupStatus方法验证消费者组状态的合法性。所谓的合法性也就是消费者组名groupId不能为空以及JoinGroupRequest请求发送给了正确的Coordinator这两者必须同时满足。如果没有通过这些检查那么handleJoinGroup方法会封装相应的错误并调用回调函数返回。否则就进入到下一步。

第2步代码检验sessionTimeoutMs的值是否介于[group.min.session.timeout.msgroup.max.session.timeout.ms]之间,如果不是,就认定该值是非法值,从而封装一个对应的异常调用回调函数返回,这两个参数分别表示消费者组允许配置的最小和最大会话超时时间;如果是的话,就进入下一步。

第3步代码获取当前成员的ID信息并查看它是否为空。之后通过GroupMetadataManager获取消费者组的元数据信息如果该组的元数据信息存在则进入到下一步如果不存在代码会看当前成员ID是否为空如果为空就创建一个空的元数据对象然后进入到下一步如果不为空则返回None。一旦返回了NonehandleJoinGroup方法会封装“未知成员ID”的异常调用回调函数返回。

第4步检查当前消费者组是否已满员。该逻辑是通过acceptJoiningMember方法实现的。这个方法根据消费者组状态确定是否满员。这里的消费者组状态有三种。

状态一如果是Empty或Dead状态肯定不会是满员直接返回True表示可以接纳申请入组的成员

状态二如果是PreparingRebalance状态那么批准成员入组的条件是必须满足一下两个条件之一。

  • 该成员是之前已有的成员,且当前正在等待加入组;
  • 当前等待加入组的成员数小于Broker端参数group.max.size值。

只要满足这两个条件中的任意一个,当前消费者组成员都会被批准入组。

状态三:如果是其他状态,那么,入组的条件是该成员是已有成员或者是当前组总成员数小于Broker端参数group.max.size值。需要注意的是,这里比较的是组当前的总成员数而不是等待入组的成员数这是因为一旦Rebalance过渡到CompletingRebalance之后没有完成加入组的成员就会被移除。

倘若成员不被批准入组那么代码需要将该成员从元数据缓存中移除同时封装“组已满员”的异常并调用回调函数返回如果成员被批准入组则根据Member ID是否为空就执行doUnknownJoinGroup或doJoinGroup方法执行加入组的逻辑。

第5步是尝试完成JoinGroupRequest请求的处理。如果消费者组处于PreparingRebalance状态那么就将该请求放入Purgatory尝试立即完成如果是其它状态则无需将请求放入Purgatory。毕竟我们处理的是加入组的逻辑而此时消费者组的状态应该要变更到PreparingRebalance后Rebalance才能完成加入组操作。当然如果延时请求不能立即完成则交由Purgatory统一进行延时处理。

至此handleJoinGroup逻辑结束。

实际上我们可以看到真正执行加入组逻辑的是doUnknownJoinGroup和doJoinGroup这两个方法。那么接下来我们就来学习下这两个方法。

doUnknownJoinGroup方法

如果是全新的消费者组成员加入组那么就需要为它们执行doUnknownJoinGroup方法因为此时它们的Member ID尚未生成。

除了memberId之外该方法的输入参数与handleJoinGroup方法几乎一模一样我就不一一地详细介绍了我们直接看它的源码。为了便于你理解我省略了关于静态成员以及DEBUG/INFO调试的部分代码。

group.inLock {
  // Dead状态
  if (group.is(Dead)) {
    // 封装异常调用回调函数返回
    responseCallback(JoinGroupResult(
      JoinGroupRequest.UNKNOWN_MEMBER_ID,         
      Errors.COORDINATOR_NOT_AVAILABLE))
  // 成员配置的协议类型/分区消费分配策略与消费者组的不匹配
  } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
  responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
  } else {
    // 根据规则为该成员创建成员ID
    val newMemberId = group.generateMemberId(clientId, groupInstanceId)
    // 如果配置了静态成员
    if (group.hasStaticMember(groupInstanceId)) {
      ......
    // 如果要求成员ID不为空
    } else if (requireKnownMemberId) {
      ......
      group.addPendingMember(newMemberId)
      addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
      responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
    } else {
      ......
      // 添加成员
      addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
        clientId, clientHost, protocolType, protocols, group, responseCallback)
    }
  }
}


为了方便你理解,我画了一张图来展示下这个方法的流程。

首先,代码会检查消费者组的状态。

如果是Dead状态则封装异常然后调用回调函数返回。你可能会觉得奇怪既然是向该组添加成员为什么组状态还能是Dead呢实际上这种情况是可能的。因为在成员加入组的同时可能存在另一个线程已经把组的元数据信息从Coordinator中移除了。比如组对应的Coordinator发生了变更移动到了其他的Broker上此时代码封装一个异常返回给消费者程序后者会去寻找最新的Coordinator然后重新发起加入组操作。

如果状态不是Dead就检查该成员的协议类型以及分区消费分配策略是否与消费者组当前支持的方案匹配如果不匹配依然是封装异常然后调用回调函数返回。这里的匹配与否是指成员的协议类型与消费者组的是否一致以及成员设定的分区消费分配策略是否被消费者组下的其它成员支持。

如果这些检查都顺利通过接着代码就会为该成员生成成员ID生成规则是clientId-UUID。这便是generateMemberId方法做的事情。然后handleJoinGroup方法会根据requireKnownMemberId的取值来决定下面的逻辑路径

  • 如果该值为True则将该成员加入到待决成员列表Pending Member List然后封装一个异常以及生成好的成员ID将该成员的入组申请“打回去”令其分配好了成员ID之后再重新申请
  • 如果为False则无需这么苛刻直接调用addMemberAndRebalance方法将其加入到组中。至此handleJoinGroup方法结束。

通常来说如果你没有启用静态成员机制的话requireKnownMemberId的值是True这是由KafkaApis中handleJoinGroupRequest方法的这行语句决定的

val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty

可见, 如果你使用的是比较新的Kafka客户端版本而且没有配置过Consumer端参数group.instance.id的话那么这个字段的值就是True这说明Kafka要求消费者成员加入组时必须要分配好成员ID。

关于addMemberAndRebalance方法的源码一会儿在学习doJoinGroup方法时我再给你具体解释。

doJoinGroup方法

接下来我们看下doJoinGroup方法。这是为那些设置了成员ID的成员执行加入组逻辑的方法。它的输入参数全部承袭自handleJoinGroup方法输入参数你应该已经很熟悉了因此我们直接看它的源码实现。由于代码比较长我分成两个部分来介绍。同时我再画一张图帮助你理解整个方法的逻辑。

第1部分

这部分主要做一些校验和条件检查。

// 如果是Dead状态封装COORDINATOR_NOT_AVAILABLE异常调用回调函数返回
if (group.is(Dead)) {
  responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
// 如果协议类型或分区消费分配策略与消费者组的不匹配
// 封装INCONSISTENT_GROUP_PROTOCOL异常调用回调函数返回
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
  responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
// 如果是待决成员由于这次分配了成员ID故允许加入组
} else if (group.isPendingMember(memberId)) {
  if (groupInstanceId.isDefined) {
    ......
  } else {
    ......
    // 令其加入组
    addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
      clientId, clientHost, protocolType, protocols, group, responseCallback)
  }
} else {
  // 第二部分代码......
}

doJoinGroup方法开头和doUnkwownJoinGroup非常类似也是判断是否处于Dead状态并且检查协议类型和分区消费分配策略是否与消费者组的相匹配。

不同的是doJoinGroup要判断当前申请入组的成员是否是待决成员。如果是的话那么这次成员已经分配好了成员ID因此就直接调用addMemberAndRebalance方法令其入组如果不是的话那么方法进入到第2部分即处理一个非待决成员的入组申请。

第2部分

代码如下:

// 获取该成员的元数据信息
val member = group.get(memberId)
group.currentState match {
  // 如果是PreparingRebalance状态
  case PreparingRebalance =>
    // 更新成员信息并开始准备Rebalance
    updateMemberAndRebalance(group, member, protocols, responseCallback)
  // 如果是CompletingRebalance状态
  case CompletingRebalance =>
    // 如果成员以前申请过加入组
    if (member.matches(protocols)) {
      // 直接返回当前组信息
      responseCallback(JoinGroupResult(
        members = if (group.isLeader(memberId)) {
          group.currentMemberMetadata
        } else {
          List.empty
        },
        memberId = memberId,
        generationId = group.generationId,
        protocolType = group.protocolType,
        protocolName = group.protocolName,
        leaderId = group.leaderOrNull,
        error = Errors.NONE))
    // 否则更新成员信息并开始准备Rebalance
    } else {
      updateMemberAndRebalance(group, member, protocols, responseCallback)
    }
  // 如果是Stable状态
  case Stable =>
    val member = group.get(memberId)
    // 如果成员是Leader成员或者成员变更了分区分配策略
    if (group.isLeader(memberId) || !member.matches(protocols)) {
      // 更新成员信息并开始准备Rebalance
      updateMemberAndRebalance(group, member, protocols, responseCallback)
    } else {
      responseCallback(JoinGroupResult(
        members = List.empty,
        memberId = memberId,
        generationId = group.generationId,
        protocolType = group.protocolType,
        protocolName = group.protocolName,
        leaderId = group.leaderOrNull,
        error = Errors.NONE))
    }
  // 如果是其它状态,封装异常调用回调函数返回
  case Empty | Dead =>
    warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
      s"unexpected group state ${group.currentState}")
    responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}

这部分代码的第1步,是获取要加入组成员的元数据信息。

第2步是查询消费者组的当前状态。这里有4种情况。

  1. 如果是PreparingRebalance状态就说明消费者组正要开启Rebalance流程那么调用updateMemberAndRebalance方法更新成员信息并开始准备Rebalance即可。

  2. 如果是CompletingRebalance状态那么就判断一下该成员的分区消费分配策略与订阅分区列表是否和已保存记录中的一致如果相同就说明该成员已经应该发起过加入组的操作并且Coordinator已经批准了只是该成员没有收到因此针对这种情况代码构造一个JoinGroupResult对象直接返回当前的组信息给成员。但是如果protocols不相同那么就说明成员变更了订阅信息或分配策略就要调用updateMemberAndRebalance方法更新成员信息并开始准备新一轮Rebalance。

  3. 如果是Stable状态那么就判断该成员是否是Leader成员或者是它的订阅信息或分配策略发生了变更。如果是这种情况就调用updateMemberAndRebalance方法强迫一次新的Rebalance。否则的话返回当前组信息给该成员即可通知它们可以发起Rebalance的下一步操作。

  4. 如果这些状态都不是而是Empty或Dead状态那么就封装UNKNOWN_MEMBER_ID异常并调用回调函数返回。

可以看到这部分代码频繁地调用updateMemberAndRebalance方法。如果你翻开它的代码会发现它仅仅做两件事情。

  • 更新组成员信息调用GroupMetadata的updateMember方法来更新消费者组成员
  • 准备Rebalance这一步的核心思想是将消费者组状态变更到PreparingRebalance然后创建DelayedJoin对象并交由Purgatory等待延时处理加入组操作。

这个方法的代码行数不多而且逻辑很简单就是变更消费者组状态以及处理延时请求并放入Purgatory因此我不展开说了你可以自行阅读下这部分代码。

addMemberAndRebalance方法

现在我们学习下doUnknownJoinGroup和doJoinGroup方法都会用到的addMemberAndRebalance方法。从名字上来看它的作用有两个

  • 向消费者组添加成员;
  • 准备Rebalance。
private def addMemberAndRebalance(
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  memberId: String,
  groupInstanceId: Option[String],
  clientId: String,
  clientHost: String,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  group: GroupMetadata,
  callback: JoinCallback): Unit = {
  // 创建MemberMetadata对象实例
  val member = new MemberMetadata(
    memberId, group.groupId, groupInstanceId,
    clientId, clientHost, rebalanceTimeoutMs,
    sessionTimeoutMs, protocolType, protocols)
  // 标识该成员是新成员
  member.isNew = true
  // 如果消费者组准备开启首次Rebalance设置newMemberAdded为True
  if (group.is(PreparingRebalance) && group.generationId == 0)
    group.newMemberAdded = true
  // 将该成员添加到消费者组
  group.add(member, callback)
  // 设置下次心跳超期时间
  completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
  if (member.isStaticMember) {
    info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
    group.addStaticMember(groupInstanceId, memberId)
  } else {
    // 从待决成员列表中移除
    group.removePendingMember(memberId)
  }
  // 准备开启Rebalance
  maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
}

这个方法的参数列表虽然很长,但我相信,你对它们已经非常熟悉了,它们都是承袭自其上层调用方法的参数。

我来介绍一下这个方法的执行逻辑。

第1步该方法会根据传入参数创建一个MemberMetadata对象实例并设置isNew字段为True标识其是一个新成员。isNew字段与心跳设置相关联你可以阅读下MemberMetadata的hasSatisfiedHeartbeat方法的代码搞明白该字段是如何帮助Coordinator确认消费者组成员心跳的。

第2步代码会判断消费者组是否是首次开启Rebalance。如果是的话就把newMemberAdded字段设置为True如果不是则无需执行这个赋值操作。这个字段的作用是Kafka为消费者组Rebalance流程做的一个性能优化。大致的思想是在消费者组首次进行Rebalance时让Coordinator多等待一段时间从而让更多的消费者组成员加入到组中以免后来者申请入组而反复进行Rebalance。这段多等待的时间就是Broker端参数group.initial.rebalance.delay.ms的值。这里的newMemberAdded字段就是用于判断是否需要多等待这段时间的一个变量。

我们接着说回addMemberAndRebalance方法。该方法的第3步是调用GroupMetadata的add方法将新成员信息加入到消费者组元数据中同时设置该成员的下次心跳超期时间。

第4步,代码将该成员从待决成员列表中移除。毕竟,它已经正式加入到组中了,就不需要待在待决列表中了。

第5步调用maybePrepareRebalance方法准备开启Rebalance。

总结

至此我们学完了Rebalance流程的第一大步也就是加入组的源码学习。在这一步中你要格外注意加入组时是区分有无消费者组成员ID。对于未设定成员ID的分支代码调用doUnkwonwJoinGroup为成员生成ID信息对于已设定成员ID的分支则调用doJoinGroup方法。而这两个方法底层都是调用addMemberAndRebalance方法实现将消费者组成员添加进组的逻辑。

我们来简单回顾一下这节课的重点。

  • Rebalance流程包括JoinGroup和SyncGroup两大步。
  • handleJoinGroup方法Coordinator端处理成员加入组申请的方法。
  • Member Id成员ID。Kafka源码根据成员ID的有无决定调用哪种加入组逻辑方法比如doUnknownJoinGroup或doJoinGroup方法。
  • addMemberAndRebalance方法实现加入组功能的实际方法用于完成“加入组+开启Rebalance”这两个操作。

当所有成员都成功加入到组之后所有成员会开启Rebalance的第二大步组同步。在这一步中成员会发送SyncGroupRequest请求给Coordinator。那么Coordinator又是如何应对的呢咱们下节课见分晓。

课后讨论

今天我们曾多次提到maybePrepareRebalance方法从名字上看它并不一定会开启Rebalance。那么你能否结合源码说说看到底什么情况下才能开启Rebalance

欢迎在留言区写下你的思考和答案,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。