You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

19 KiB

30 | GroupMetadataManager位移主题保存的只是位移吗

你好,我是胡夕。今天,我们学习位移主题管理的源码。

位移主题即__consumer_offsets是Kafka的两大内部主题之一另一个内部主题是管理Kafka事务的名字是__transaction_state用于保存Kafka事务的状态信息

Kafka创建位移主题的目的保存消费者组的注册消息和提交位移消息。前者保存能够标识消费者组的身份信息后者保存消费者组消费的进度信息。在Kafka源码中GroupMetadataManager类定义了操作位移主题消息类型以及操作位移主题的方法。该主题下都有哪些消息类型是我们今天学习的重点。

说到位移主题你是否对它里面的消息内容感到很好奇呢我见过很多人直接使用kafka-console-consumer命令消费该主题想要知道里面保存的内容可输出的结果却是一堆二进制乱码。其实如果你不阅读今天的源码是无法知晓如何通过命令行工具查询该主题消息的内容的。因为这些知识只包含在源码中官方文档并没有涉及到。

好了我不卖关子了。简单来说你在运行kafka-console-consumer命令时必须指定--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"才能查看提交的位移消息数据。类似地你必须指定GroupMetadataMessageFormatter才能读取消费者组的注册消息数据。

今天我们就来学习位移主题下的这两大消息类型。除此之外我还会给你介绍消费者组是如何寻找自己的Coordinator的。毕竟对位移主题进行读写的前提就是要能找到正确的Coordinator所在。

消息类型

位移主题有两类消息:消费者组注册消息Group Metadata消费者组的已提交位移消息Offset Commit。很多人以为位移主题里面只保存消费者组位移这是错误的它还保存了消费者组的注册信息或者说是消费者组的元数据。这里的元数据主要是指消费者组名称以及成员分区消费分配方案。

在分别介绍这两类消息的实现代码之前我们先看下Kafka为它们定义的公共服务代码。毕竟它们是这两类消息都会用到的代码组件。这些公共代码主要由两部分组成GroupTopicPartition类和BaseKey接口。

我们首先来看POJO类GroupTopicPartition。它的作用是封装<消费者组名,主题,分区号>的三元组,代码如下:

case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
  def this(group: String, topic: String, partition: Int) =
    this(group, new TopicPartition(topic, partition))
  // toString方法......
}

显然,这个类就是一个数据容器类。我们后面在学习已提交位移消息时,还会看到它的身影。

其次是**BaseKey接口**它表示位移主题的两类消息的Key类型。强调一下无论是该主题下的哪类消息都必须定义Key。这里的BaseKey接口定义的就是这两类消息的Key类型。我们看下它的代码

trait BaseKey{
  def version: Short  // 消息格式版本
  def key: Any        // 消息key
}

这里的version是Short型的消息格式版本。随着Kafka代码的不断演进位移主题的消息格式也在不断迭代因此这里出现了版本号的概念。至于key字段它保存的是实际的Key值。在Scala中Any类型类似于Java中的Object类表示该值可以是任意类型。稍后讲到具体的消息类型时你就会发现这两类消息的Key类型其实是不同的数据类型。

好了基础知识铺垫完了有了对GroupTopicPartition和BaseKey的理解你就能明白位移主题的具体消息类型是如何构造Key的。

接下来我们开始学习具体消息类型的实现代码包括注册消息、提交位移消息和Tombstone消息。由于消费者组必须要先向Coordinator组件注册然后才能提交位移所以我们先阅读注册消息的代码。

注册消息

所谓的注册消息,就是指消费者组向位移主题写入注册类的消息。该类消息的写入时机有两个。

  • 所有成员都加入组后Coordinator向位移主题写入注册消息只是该消息不含分区消费分配方案
  • Leader成员发送方案给Coordinator后当Leader成员将分区消费分配方案发给Coordinator后Coordinator写入携带分配方案的注册消息。

我们首先要知道注册消息的Key是如何定义以及如何被封装到消息里的。

Key的定义在GroupMetadataKey类代码中

case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
  override def toString: String = key
}

该类的key字段是一个字符串类型保存的是消费者组的名称。可见注册消息的Key就是消费者组名

GroupMetadataManager对象有个groupMetadataKey方法负责将注册消息的Key转换成字节数组用于后面构造注册消息。这个方法的代码如下

def groupMetadataKey(group: String): Array[Byte] = {
  val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
  key.set(GROUP_KEY_GROUP_FIELD, group)
  // 构造一个ByteBuffer对象容纳version和key数据
  val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
  byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
  key.writeTo(byteBuffer)
  byteBuffer.array()
}

该方法首先会接收消费者组名构造ByteBuffer对象然后依次向Buffer写入Short型的消息格式版本以及消费者组名最后返回该Buffer底层的字节数组。

你不用关心这里的格式版本变量以及Struct类型都是怎么实现的因为它们不是我们理解位移主题内部原理的关键。你需要掌握的注册消息的Key和Value都是怎么定义的

接下来我们就来了解下消息体Value的代码实现。既然有groupMetadataKey方法那么源码也提供了相应的groupMetadataValue方法。它的目的是将消费者组重要的元数据写入到字节数组。我们看下它的代码实现:

def groupMetadataValue(
  groupMetadata: GroupMetadata,  // 消费者组元数据对象
  assignment: Map[String, Array[Byte]], // 分区消费分配方案
  apiVersion: ApiVersion // Kafka API版本号
): Array[Byte] = {
  // 确定消息格式版本以及格式结构
  val (version, value) = {
    if (apiVersion < KAFKA_0_10_1_IV0)
      (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
    else if (apiVersion < KAFKA_2_1_IV0)
      (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
    else if (apiVersion < KAFKA_2_3_IV0)
      (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
    else
      (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
  }
  // 依次写入消费者组主要的元数据信息
  // 包括协议类型、Generation ID、分区分配策略和Leader成员ID
  value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
  value.set(GENERATION_KEY, groupMetadata.generationId)
  value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
  value.set(LEADER_KEY, groupMetadata.leaderOrNull)
  // 写入最近一次状态变更时间戳
  if (version >= 2)
    value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault)
  // 写入各个成员的元数据信息
  // 包括成员ID、client.id、主机名以及会话超时时间
  val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
    val memberStruct = value.instance(MEMBERS_KEY)
    memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
    memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
    memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
    memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
    // 写入Rebalance超时时间
    if (version > 0)
      memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
    // 写入用于静态消费者组管理的Group Instance ID
    if (version >= 3)
      memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull)
    // 必须定义分区分配策略,否则抛出异常
    val protocol = groupMetadata.protocolName.orNull
    if (protocol == null)
      throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol")
    // 写入成员消费订阅信息
    val metadata = memberMetadata.metadata(protocol)
    memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
    val memberAssignment = assignment(memberMetadata.memberId)
    assert(memberAssignment != null)
    // 写入成员消费分配信息
    memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
    memberStruct
  }
  value.set(MEMBERS_KEY, memberArray.toArray)
  // 向Buffer依次写入版本信息和以上写入的元数据信息
  val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
  byteBuffer.putShort(version)
  value.writeTo(byteBuffer)
  // 返回Buffer底层的字节数组
  byteBuffer.array()
}

代码比较长,我结合一张图来帮助你理解这个方法的执行逻辑。

第1步代码根据传入的apiVersion字段确定要使用哪个格式版本并创建对应版本的结构体Struct来保存这些元数据。apiVersion的取值是Broker端参数inter.broker.protocol.version的值。你打开Kafka官网的话就可以看到这个参数的值永远指向当前最新的Kafka版本。

第2步代码依次向结构体写入消费者组的协议类型Protocol Type、Generation ID、分区分配策略Protocol Name和Leader成员ID。在学习GroupMetadata时我说过对于普通的消费者组而言协议类型就是"consumer"字符串,分区分配策略可能是"range""round-robin"等。之后代码还会为格式版本≥2的结构体写入消费者组状态最近一次变更的时间戳。

第3步遍历消费者组的所有成员为每个成员构建专属的结构体对象并依次向结构体写入成员的ID、Client ID、主机名以及会话超时时间信息。对于格式版本≥0的结构体代码要写入成员配置的Rebalance超时时间而对于格式版本≥3的结构体代码还要写入用于静态消费者组管理的Group Instance ID。待这些都做完之后groupMetadataValue方法必须要确保消费者组选出了分区分配策略否则就抛出异常。再之后方法依次写入成员消费订阅信息和成员消费分配信息。

第4步代码向Buffer依次写入版本信息和刚刚说到的写入的元数据信息并返回Buffer底层的字节数组。至此方法逻辑结束。

关于注册消息Key和Value的内容我就介绍完了。为了帮助你更直观地理解注册消息到底包含了什么数据我再用一张图向你展示一下它们的构成。

这张图完整地总结了groupMetadataKey和groupMetadataValue方法要生成的注册消息内容。灰色矩形中的字段表示可选字段有可能不会包含在Value中。

已提交位移消息

接下来我们再学习一下提交位移消息的Key和Value构成。

OffsetKey类定义了提交位移消息的Key值代码如下

case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
  override def toString: String = key.toString
}

可见这类消息的Key是一个GroupTopicPartition类型也就是<消费者组名,主题,分区号>三元组。

offsetCommitKey方法负责将这个三元组转换成字节数组用于后续构造提交位移消息。

def offsetCommitKey(
  group: String,  // 消费者组名
  topicPartition: TopicPartition // 主题 + 分区号
): Array[Byte] = {
  // 创建结构体,依次写入消费者组名、主题和分区号
  val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
  key.set(OFFSET_KEY_GROUP_FIELD, group)
  key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
  key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
  // 构造ByteBuffer写入格式版本和结构体
  val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
  byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
  key.writeTo(byteBuffer)
  // 返回字节数组
  byteBuffer.array()
}


该方法接收三元组中的数据然后创建一个结构体对象依次写入消费者组名、主题和分区号。接下来构造ByteBuffer写入格式版本和结构体最后返回它底层的字节数组。

说完了Key我们看下Value的定义。

offsetCommitValue方法决定了Value中都有哪些元素我们一起看下它的代码。这里我只列出了最新版本对应的结构体对象其他版本要写入的元素大同小异课下你可以阅读下其他版本的结构体内容也就是我省略的if分支下的代码。

def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
                      apiVersion: ApiVersion): Array[Byte] = {
  // 确定消息格式版本以及创建对应的结构体对象
  val (version, value) = {
    if (......) {
      ......
    } else {
      val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V3)
      // 依次写入位移值、Leader Epoch值、自定义元数据以及时间戳
      value.set(
        OFFSET_VALUE_OFFSET_FIELD_V3, offsetAndMetadata.offset)
      value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3,
 offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
      value.set(OFFSET_VALUE_METADATA_FIELD_V3, offsetAndMetadata.metadata)
      value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3, offsetAndMetadata.commitTimestamp)
(3, value)
    }
  }
  // 构建ByteBuffer写入消息格式版本和结构体
  val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
  byteBuffer.putShort(version.toShort)
  value.writeTo(byteBuffer)
  // 返回ByteBuffer底层字节数组
  byteBuffer.array()
}

offsetCommitValue方法首先确定消息格式版本以及创建对应的结构体对象。对于当前最新版本V3而言结构体的元素包括位移值、Leader Epoch值、自定义元数据和时间戳。如果我们使用Java Consumer API的话那么在提交位移时这个自定义元数据一般是空。

接下来构建ByteBuffer写入消息格式版本和结构体。

最后返回ByteBuffer底层字节数组。

与注册消息的消息体相比提交位移消息的Value要简单得多。我再用一张图展示一下提交位移消息的Key、Value构成。

Tombstone消息

关于位移主题Kafka源码中还存在一类消息那就是Tombstone消息。其实它并没有任何稀奇之处就是Value为null的消息。因此注册消息和提交位移消息都有对应的Tombstone消息。这个消息的主要作用是让Kafka识别哪些Key对应的消息是可以被删除的有了它Kafka就能保证内部位移主题不会持续增加磁盘占用空间。

你可以看下下面两行代码它们分别表示两类消息对应的Tombstone消息。

// 提交位移消息对应的Tombstone消息 
tombstones += new SimpleRecord(timestamp, commitKey, null)
// 注册消息对应的Tombstone消息 
tombstones += new SimpleRecord(timestamp, groupMetadataKey, null)

无论是哪类消息,它们的Value字段都是null。一旦注册消息中出现了Tombstone消息就表示Kafka可以将该消费者组元数据从位移主题中删除一旦提交位移消息中出现了Tombstone就表示Kafka能够将该消费者组在某主题分区上的位移提交数据删除。

如何确定Coordinator

接下来我们要再学习一下位移主题和消费者组Coordinator之间的关系。Coordinator组件是操作位移主题的唯一组件它在内部对位移主题进行读写操作

每个Broker在启动时都会启动Coordinator组件但是一个消费者组只能被一个Coordinator组件所管理。Kafka是如何确定哪台Broker上的Coordinator组件为消费者组服务呢答案是位移主题某个特定分区Leader副本所在的Broker被选定为指定消费者组的Coordinator。

那么这个特定分区是怎么计算出来的呢我们来看GroupMetadataManager类的partitionFor方法代码

def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

看到了吧,消费者组名哈希值与位移主题分区数求模的绝对值结果,就是该消费者组要写入位移主题的目标分区。

假设位移主题默认是50个分区我们的消费者组名是“testgroup”因此Math.abs(“testgroup”.hashCode % 50)的结果是27那么目标分区号就是27。也就是说这个消费者组的注册消息和提交位移消息都会写入到位移主题的分区27中而分区27的Leader副本所在的Broker就成为该消费者组的Coordinator。

总结

Kafka内部位移主题是Coordinator端用来保存和记录消费者组信息的重要工具。具体而言消费者组信息包括消费者组元数据以及已提交位移它们分别对应于我们今天讲的位移主题中的注册消息和已提交位移消息。前者定义了消费者组的元数据信息包括组名、成员列表和分区消费分配方案后者则是消费者组各个成员提交的位移值。这两部分信息共同构成了位移主题的消息类型。

除了消息类型我还介绍了消费者组确定Coordinator端的代码。明白了这一点下次你的消费者组成员出现问题的时候你就会知道要去哪台Broker上去查找相应的日志了。

我们来回顾一下这节课的重点。

  • 位移主题即__consumer_offsets。该主题是内部主题默认有50个分区Kafka负责将其创建出来因此你不需要亲自执行创建主题操作。
  • 消息类型:位移主题分为注册消息和已提交位移消息。
  • Tombstone消息Value为null的位移主题消息用于清除消费者组已提交的位移值和注册信息。
  • Coordinator确认原则消费者组名的哈希值与位移主题分区数求模的绝对值即为目标分区目标分区Leader副本所在的Broker即为Coordinator。

定义了消息格式明确了Coordinator下一步就是Coordinator对位移主题进行读写操作了。具体来说就是构建今天我们所学的两类消息并将其序列化成字节数组写入到位移主题以及从位移主题中读取出字节数组并反序列化成对应的消息类型。下节课我们一起研究下这个问题。

课后讨论

请你根据今天的内容用kafka-console-consumer脚本去读取一下你线上环境中位移主题的已提交位移消息并结合readOffsetMessageValue方法的源码说一下输出中的每个字段都是什么含义。

欢迎在留言区写下你的思考和答案,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。