You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

16 KiB

23 | ReplicaManager必须要掌握的副本管理类定义和核心字段

你好,我是胡夕。

今天我们要学习的是Kafka中的副本管理器ReplicaManager。它负责管理和操作集群中Broker的副本还承担了一部分的分区管理工作比如变更整个分区的副本日志路径等。

你一定还记得前面讲到状态机的时候我说过Kafka同时实现了副本状态机和分区状态机。但对于管理器而言Kafka源码却没有专门针对分区定义一个类似于“分区管理器”这样的类而是只定义了ReplicaManager类。该类不只实现了对副本的管理还包含了很多操作分区对象的方法。

ReplicaManager类的源码非常重要它是构建Kafka副本同步机制的重要组件之一。副本同步过程中出现的大多数问题都是很难定位和解决的因此熟练掌握这部分源码将有助于我们深入探索线上生产环境问题的根本原因防止以后踩坑。下面我给你分享一个真实的案例。

我们团队曾碰到过一件古怪事在生产环境系统中执行删除消息的操作之后该操作引发了Follower端副本与Leader端副本的不一致问题。刚碰到这个问题时我们一头雾水在正常情况下Leader端副本执行了消息删除后日志起始位移值被更新了Follower端副本也应该更新日志起始位移值但是这里的Follower端的更新失败了。我们查遍了所有日志依然找不到原因最后还是通过分析ReplicaManager类源码才找到了答案。

我们先看一下这个错误的详细报错信息:

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 22786435 of partition XXX-12 since it is larger than the high watermark 22786129

这是Follower副本抛出来的异常对应的Leader副本日志则一切如常。下面的日志显示出Leader副本的Log Start Offset已经被成功调整了。

INFO Incrementing log start offset of partition XXX-12 to 22786435

碰到这个问题时我相信你的第一反应和我一样这像是一个Bug但又不确定到底是什么原因导致的。后来我们顺着KafkaApis一路找下去直到找到了ReplicaManager的deleteRecords方法才看出点端倪。

Follower副本从Leader副本拉取到消息后会做两个操作

  1. 写入到自己的本地日志;
  2. 更新Follower副本的高水位值和Log Start Offset。

如果删除消息的操作deleteRecords发生在这两步之间因为deleteRecords会变更Log Start Offset所以Follower副本在进行第2步操作时它使用的可能是已经过期的值了因而会出现上面的错误。由此可见这的确是一个Bug。在确认了这一点之后后面的解决方案也就呼之欲出了虽然deleteRecords功能实用方便但鉴于这个Bug我们还是应该尽力避免在线上环境直接使用该功能。

说到这儿我想说一句碰到实际的线上问题不可怕可怕的是我们无法定位到问题的根本原因。写过Java项目的你一定有这种体会很多时候单纯依靠栈异常信息是不足以定位问题的。特别是涉及到Kafka副本同步这块如果只看输出日志的话你是很难搞清楚这里面的原理的因此我们必须要借助源码这也是我们今天学习ReplicaManager类的主要目的。

接下来我们就重点学习一下这个类。它位于server包下的同名scala文件中。这是一个有着将近1900行的大文件里面的代码结构很琐碎。

因为副本的读写操作和管理操作都是重磅功能所以在深入细节之前我们必须要理清ReplicaManager类的结构之间的关系并且搞懂类定义及核心字段这就是我们这节课的重要目标。

在接下来的两节课里我会给你详细地解释副本读写操作和副本管理操作。学完这些之后你就能清晰而深入地掌握ReplicaManager类的主要源码了最重要的是你会搞懂副本成为Leader或者是Follower时需要执行的逻辑这就足以帮助你应对实际遇到的副本操作问题了。

代码结构

我们首先看下这个scala文件的代码结构。我用一张思维导图向你展示下

虽然从代码结构上看该文件下有8个部分 不过HostedPartition接口以及实现对象放在一起更好理解所以我把ReplicaManager.scala分为7大部分。

  • ReplicaManager类它是副本管理器的具体实现代码里面定义了读写副本、删除副本消息的方法以及其他管理方法。
  • ReplicaManager对象ReplicaManager类的伴生对象仅仅定义了3个常量。
  • HostedPartition及其实现对象表示Broker本地保存的分区对象的状态。可能的状态包括不存在状态None、在线状态Online和离线状态Offline
  • FetchPartitionData定义获取到的分区数据以及重要元数据信息如高水位值、Log Start Offset值等。
  • LogReadResult表示副本管理器从副本本地日志中读取到的消息数据以及相关元数据信息如高水位值、Log Start Offset值等。
  • LogDeleteRecordsResult表示副本管理器执行副本日志删除操作后返回的结果信息。
  • LogAppendResult表示副本管理器执行副本日志写入操作后返回的结果信息。

从含义来看FetchPartitionData和LogReadResult很类似它们的区别在哪里呢

其实它们之间的差别非常小。如果翻开源码的话你会发现FetchPartitionData类总共有8个字段而构建FetchPartitionData实例的前7个字段都是用LogReadResult的字段来赋值的。你大致可以认为两者的作用是类似的。只是FetchPartitionData还有个字段标识该分区是不是处于重分配中。如果是的话需要更新特定的JXM监控指标。这是这两个类的主要区别。

在这7个部分中ReplicaManager类是我们学习的重点。其他类要么仅定义常量要么就是保存数据的POJO类作用一目了然我就不展开讲了。

ReplicaManager类定义

接下来我们就从Replica类的定义和重要字段这两个维度入手进行学习。首先看ReplicaManager类的定义。

class ReplicaManager(
  val config: KafkaConfig,  // 配置管理类
  metrics: Metrics,  // 监控指标类
  time: Time,  // 定时器类
  val zkClient: KafkaZkClient,  // ZooKeeper客户端
  scheduler: Scheduler,   // Kafka调度器
  val isShuttingDown: AtomicBoolean,  // 是否已经关闭
  quotaManagers: QuotaManagers,  // 配额管理器
  val brokerTopicStats: BrokerTopicStats,  // Broker主题监控指标类
  val metadataCache: MetadataCache,  // Broker元数据缓存
  logDirFailureChannel: LogDirFailureChannel,
  // 处理延时PRODUCE请求的Purgatory
  val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
  // 处理延时FETCH请求的Purgatory
  val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
  // 处理延时DELETE_RECORDS请求的Purgatory
  val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
  // 处理延时ELECT_LEADERS请求的Purgatory
  val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
  threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
  ......
}          

ReplicaManager类构造函数的字段非常多。有的字段含义很简单像time和metrics这类字段你一看就明白了我就不多说了我详细解释几个比较关键的字段。这些字段是我们理解副本管理器的重要基础。

1.logManager

这是日志管理器。它负责创建和管理分区的日志对象里面定义了很多操作日志对象的方法如getOrCreateLog等。

2.metadataCache

这是Broker端的元数据缓存保存集群上分区的Leader、ISR等信息。注意它和我们之前说的Controller端元数据缓存是有联系的。每台Broker上的元数据缓存是从Controller端的元数据缓存异步同步过来的。

3.logDirFailureChannel

这是失效日志路径的处理器类。Kafka 1.1版本新增了对于JBOD的支持。这也就是说Broker如果配置了多个日志路径当某个日志路径不可用之后比如该路径所在的磁盘已满Broker能够继续工作。那么这就需要一整套机制来保证在出现磁盘I/O故障时Broker的正常磁盘下的副本能够正常提供服务。

其中logDirFailureChannel是暂存失效日志路径的管理器类。我们不用具体学习这个特性的源码但你最起码要知道该功能算是Kafka提升服务器端高可用性的一个改进。有了它之后即使Broker上的单块磁盘坏掉了整个Broker的服务也不会中断。

4.四个Purgatory相关的字段

这4个字段是delayedProducePurgatory、delayedFetchPurgatory、delayedDeleteRecordsPurgatory和delayedElectLeaderPurgatory它们分别管理4类延时请求的。其中前两类我们应该不陌生就是处理延时生产者请求和延时消费者请求后面两类是处理延时消息删除请求和延时Leader选举请求属于比较高阶的用法可以暂时不用理会

在副本管理过程中状态的变更大多都会引发对延时请求的处理这时候这些Purgatory字段就派上用场了。

只要掌握了刚刚的这些字段就可以应对接下来的副本管理操作了。其中最重要的就是logManager。它是协助副本管理器操作集群副本对象的关键组件。

重要的自定义字段

学完了类定义我们看下ReplicaManager类中那些重要的自定义字段。这样的字段大约有20个我们不用花时间逐一学习它们。像isrExpandRate、isrShrinkRate这样的字段我们只看名字就能知道它们是衡量ISR变化的监控指标。下面我详细介绍几个对理解副本管理器至关重要的字段。我会结合代码具体讲解它们的含义同时还会说明它们的重要用途。

controllerEpoch

我们首先来看controllerEpoch字段。

这个字段的作用是隔离过期Controller发送的请求。这就是说老的Controller发送的请求不能再被继续处理了。至于如何区分是老Controller发送的请求还是新Controller发送的请求就是看请求携带的controllerEpoch值是否等于这个字段的值。以下是它的定义代码:

@volatile var controllerEpoch: Int = 
  KafkaController.InitialControllerEpoch

该字段表示最新一次变更分区Leader的Controller的Epoch值其默认值为0。Controller每发生一次变更该字段值都会+1。

在ReplicaManager的代码中很多地方都会用到它来判断Controller发送过来的控制类请求是否合法。如果请求中携带的controllerEpoch值小于该字段值就说明这个请求是由一个老的Controller发出的因此ReplicaManager直接拒绝该请求的处理。

值得注意的是它是一个var类型这就说明它的值是能够动态修改的。当ReplicaManager在处理控制类请求时会更新该字段。可以看下下面的代码

// becomeLeaderOrFollower方法
// 处理LeaderAndIsrRequest请求时
controllerEpoch = leaderAndIsrRequest.controllerEpoch
// stopReplicas方法
// 处理StopReplicaRequest请求时
this.controllerEpoch = controllerEpoch
// maybeUpdateMetadataCache方法
// 处理UpdateMetadataRequest请求时
controllerEpoch = updateMetadataRequest.controllerEpoch

Broker上接收的所有请求都是由Kafka I/O线程处理的而I/O线程可能有多个因此这里的controllerEpoch字段被声明为volatile型以保证其内存可见性。

allPartitions

下一个重要的字段是allPartitions。这节课刚开始时我说过Kafka没有所谓的分区管理器ReplicaManager类承担了一部分分区管理的工作。这里的allPartitions就承载了Broker上保存的所有分区对象数据。其定义代码如下

private val allPartitions = new Pool[TopicPartition, HostedPartition](
  valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, this)))
)

从代码可见allPartitions是分区Partition对象实例的容器。这里的HostedPartition是代表分区状态的类。allPartitions会将所有分区对象初始化成Online状态。

值得注意的是这里的分区状态和我们之前讲到的分区状态机里面的状态完全隶属于两套“领导班子”。也许未来它们会有合并的可能。毕竟它们二者的功能是有重叠的地方的表示的含义也有相似之处。比如它们都定义了Online状态其实都是表示正常工作状态下的分区状态。当然这只是我根据源码功能做的一个大胆推测至于是否会合并我们拭目以待吧。

再多说一句Partition类是表征分区的对象。一个Partition实例定义和管理单个分区它主要是利用logManager帮助它完成对分区底层日志的操作。ReplicaManager类对于分区的管理都是通过Partition对象完成的。

replicaFetcherManager

第三个比较关键的字段是replicaFetcherManager。它的主要任务是创建ReplicaFetcherThread类实例。上节课我们学习了ReplicaFetcherThread类的源码它的主要职责是帮助Follower副本向Leader副本拉取消息并写入到本地日志中

下面展示了ReplicaFetcherManager类的主要方法createFetcherThread源码

override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
  val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
  val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
  // 创建ReplicaFetcherThread线程实例并返回
  new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, failedPartitions, replicaManager,
    metrics, time, quotaManager)
}

该方法的主要目的是创建ReplicaFetcherThread实例供Follower副本使用。线程的名字是根据fetcherId和Broker ID来确定的。ReplicaManager类利用replicaFetcherManager字段对所有Fetcher线程进行管理包括线程的创建、启动、添加、停止和移除。

总结

这节课我主要介绍了ReplicaManager类的定义以及重要字段。它们是理解后面ReplicaManager类管理功能的基础。

总的来说ReplicaManager类是Kafka Broker端管理分区和副本对象的重要组件。每个Broker在启动的时候都会创建ReplicaManager实例。该实例一旦被创建就会开始行使副本管理器的职责对其下辖的Leader副本或Follower副本进行管理。

我们再简单回顾一下这节课的重点。

  • ReplicaManager类副本管理器的具体实现代码里面定义了读写副本、删除副本消息的方法以及其他的一些管理方法。
  • allPartitions字段承载了Broker上保存的所有分区对象数据。ReplicaManager类通过它实现对分区下副本的管理。
  • replicaFetcherManager字段创建ReplicaFetcherThread类实例该线程类实现Follower副本向Leader副本实时拉取消息的逻辑。

今天我多次提到ReplicaManager是副本管理器这件事。实际上副本管理中的两个重要功能就是读取副本对象和写入副本对象。对于Leader副本而言Follower副本需要读取它的消息数据对于Follower副本而言它拿到Leader副本的消息后需要将消息写入到自己的底层日志上。那么读写副本的机制是怎么样的呢下节课我们深入地探究一下ReplicaManager类重要的副本读写方法。

课后讨论

在ReplicaManager类中有一个offlinePartitionCount方法它的作用是统计Offline状态的分区数你能写一个方法统计Online状态的分区数吗

欢迎在留言区写下你的思考和答案,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。