You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

12 KiB

17 | 消费者组重平衡能避免吗?

你好,我是胡夕。今天我要和你分享的内容是:消费者组重平衡能避免吗?

其实在专栏第15期我们讲过重平衡也就是Rebalance现在先来回顾一下这个概念的原理和用途。Rebalance就是让一个Consumer Group下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程。在Rebalance过程中所有Consumer实例共同参与在协调者组件的帮助下完成订阅主题分区的分配。但是在整个过程中所有实例都不能消费任何消息因此它对Consumer的TPS影响很大。

你可能会对这里提到的“协调者”有些陌生我来简单介绍下。所谓协调者在Kafka中对应的术语是Coordinator它专门为Consumer Group服务负责为Group执行Rebalance以及提供位移管理和组成员管理等。

具体来讲Consumer端应用程序在提交位移时其实是向Coordinator所在的Broker提交位移。同样地当Consumer应用启动时也是向Coordinator所在的Broker发送各种请求然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。

所有Broker在启动时都会创建和开启相应的Coordinator组件。也就是说所有Broker都有各自的Coordinator组件。那么Consumer Group如何确定为它服务的Coordinator在哪台Broker上呢答案就在我们之前说过的Kafka内部位移主题__consumer_offsets身上。

目前Kafka为某个Consumer Group确定Coordinator所在的Broker的算法有2个步骤。

第1步确定由位移主题的哪个分区来保存该Group数据partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。

第2步找出该分区Leader副本所在的Broker该Broker即为对应的Coordinator。

简单解释一下上面的算法。首先Kafka会计算该Group的group.id参数的哈希值。比如你有个Group的group.id设置成了“test-group”那么它的hashCode值就应该是627841412。其次Kafka会计算__consumer_offsets的分区数通常是50个分区之后将刚才那个哈希值对分区数进行取模加求绝对值计算即abs(627841412 % 50) = 12。此时我们就知道了位移主题的分区12负责保存这个Group的数据。有了分区号算法的第2步就变得很简单了我们只需要找出位移主题分区12的Leader副本在哪个Broker上就可以了。这个Broker就是我们要找的Coordinator。

在实际使用过程中Consumer应用程序特别是Java Consumer API能够自动发现并连接正确的Coordinator我们不用操心这个问题。知晓这个算法的最大意义在于它能够帮助我们解决定位问题。当Consumer Group出现问题需要快速排查Broker端日志时我们能够根据这个算法准确定位Coordinator对应的Broker不必一台Broker一台Broker地盲查。

好了我们说回Rebalance。既然我们今天要讨论的是如何避免Rebalance那就说明Rebalance这个东西不好或者说至少有一些弊端需要我们去规避。那么Rebalance的弊端是什么呢总结起来有以下3点

  1. Rebalance影响Consumer端TPS。这个之前也反复提到了这里就不再具体讲了。总之就是在Rebalance期间Consumer会停下手头的事情什么也干不了。

  2. Rebalance很慢。如果你的Group下成员很多就一定会有这样的痛点。还记得我曾经举过的那个国外用户的例子吧他的Group下有几百个Consumer实例Rebalance一次要几个小时。在那种场景下Consumer Group的Rebalance已经完全失控了。

  3. Rebalance效率不高。当前Kafka的设计机制决定了每次Rebalance时Group下的所有成员都要参与进来而且通常不会考虑局部性原理但局部性原理对提升系统性能是特别重要的。

关于第3点我们来举个简单的例子。比如一个Group下有10个成员每个成员平均消费5个分区。假设现在有一个成员退出了此时就需要开启新一轮的Rebalance把这个成员之前负责的5个分区“转移”给其他成员。显然比较好的做法是维持当前9个成员消费分区的方案不变然后将5个分区随机分配给这9个成员这样能最大限度地减少Rebalance对剩余Consumer成员的冲击。

遗憾的是目前Kafka并不是这样设计的。在默认情况下每次Rebalance时之前的分配方案都不会被保留。就拿刚刚这个例子来说当Rebalance开始时Group会打散这50个分区10个成员 * 5个分区由当前存活的9个成员重新分配它们。显然这不是效率很高的做法。基于这个原因社区于0.11.0.0版本推出了StickyAssignor即有粘性的分区分配策略。所谓的有粘性是指每次Rebalance时该策略会尽可能地保留之前的分配方案尽量实现分区分配的最小变动。不过有些遗憾的是这个策略目前还有一些bug而且需要升级到0.11.0.0才能使用,因此在实际生产环境中用得还不是很多。

总而言之Rebalance有以上这三个方面的弊端。你可能会问这些问题有解吗特别是针对Rebalance慢和影响TPS这两个弊端社区有解决办法吗针对这两点我可以很负责任地告诉你“无解”特别是Rebalance慢这个问题Kafka社区对此无能为力。“本事大不如不摊上”既然我们没办法解决Rebalance过程中的各种问题干脆就避免Rebalance吧特别是那些不必要的Rebalance。

就我个人经验而言,在真实的业务场景中很多Rebalance都是计划外的或者说是不必要的。我们应用的TPS大多是被这类Rebalance拖慢的因此避免这类Rebalance就显得很有必要了。下面我们就来说说如何避免Rebalance。

要避免Rebalance还是要从Rebalance发生的时机入手。我们在前面说过Rebalance发生的时机有三个

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作所以它们引发的Rebalance大都是不可避免的。接下来我们主要说说因为组成员数量变化而引发的Rebalance该如何避免。

如果Consumer Group下的Consumer实例数量发生变化就一定会引发Rebalance。这是Rebalance发生的最常见的原因。我碰到的99%的Rebalance都是这个原因导致的。

Consumer实例增加的情况很好理解当我们启动一个配置有相同group.id值的Consumer程序时实际上就向这个Group添加了一个新的Consumer实例。此时Coordinator会接纳这个新实例将其加入到组中并重新分配分区。通常来说增加Consumer实例的操作都是计划内的可能是出于增加TPS或提高伸缩性的需要。总之它不属于我们要规避的那类“不必要Rebalance”。

我们更在意的是Group下实例数减少这件事。如果你就是要停掉某些Consumer实例那自不必说关键是在某些情况下Consumer实例会被Coordinator错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的Rebalance我们就不能不管了。

Coordinator会在什么情况下认为某个Consumer实例已挂从而要退组呢这个绝对是需要好好讨论的话题我们来详细说说。

当Consumer Group完成Rebalance之后每个Consumer实例都会定期地向Coordinator发送心跳请求表明它还存活着。如果某个Consumer实例不能及时地发送这些心跳请求Coordinator就会认为该Consumer已经“死”了从而将其从Group中移除然后开启新一轮Rebalance。Consumer端有个参数叫session.timeout.ms就是被用来表征此事的。该参数的默认值是10秒即如果Coordinator在10秒之内没有收到Group下某Consumer实例的心跳它就会认为这个Consumer实例已经挂了。可以这么说session.timeout.ms决定了Consumer存活性的时间间隔。

除了这个参数Consumer还提供了一个允许你控制发送心跳请求频率的参数就是heartbeat.interval.ms。这个值设置得越小Consumer实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源但好处是能够更加快速地知晓当前是否开启Rebalance因为目前Coordinator通知各个Consumer实例开启Rebalance的方法就是将REBALANCE_NEEDED标志封装进心跳请求的响应体中。

除了以上两个参数Consumer端还有一个参数用于控制Consumer实际消费能力对Rebalance的影响即max.poll.interval.ms参数。它限定了Consumer端应用程序两次调用poll方法的最大时间间隔。它的默认值是5分钟表示你的Consumer程序如果在5分钟之内无法消费完poll方法返回的消息那么Consumer会主动发起“离开组”的请求Coordinator也会开启新一轮Rebalance。

搞清楚了这些参数的含义接下来我们来明确一下到底哪些Rebalance是“不必要的”。

第一类非必要Rebalance是因为未能及时发送心跳导致Consumer被“踢出”Group而引发的。因此,你需要仔细地设置session.timeout.ms和heartbeat.interval.ms的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置session.timeout.ms = 6s。
  • 设置heartbeat.interval.ms = 2s。
  • 要保证Consumer实例在被判定为“dead”之前能够发送至少3轮的心跳请求即session.timeout.ms >= 3 * heartbeat.interval.ms。

将session.timeout.ms设置成6s主要是为了让Coordinator能够更快地定位已经挂掉的Consumer。毕竟我们还是希望能尽快揪出那些“尸位素餐”的Consumer早日把它们踢出Group。希望这份配置能够较好地帮助你规避第一类“不必要”的Rebalance。

第二类非必要Rebalance是Consumer消费时间过长导致的。我之前有一个客户在他们的场景中Consumer消费数据时需要将消息处理之后写入到MongoDB。显然这是一个很重的消费逻辑。MongoDB的一丁点不稳定都会导致Consumer程序消费时长的增加。此时max.poll.interval.ms参数值的设置显得尤为关键。如果要避免非预期的Rebalance你最好将该参数值设置得大一点比你的下游最大处理时间稍长一点。就拿MongoDB这个例子来说如果写MongoDB的最长时间是7分钟那么你可以将该参数设置为8分钟左右。

总之你要为你的业务处理逻辑留下充足的时间。这样Consumer就不会因为处理这些消息的时间太长而引发Rebalance了。

如果你按照上面的推荐数值恰当地设置了这几个参数却发现还是出现了Rebalance那么我建议你去排查一下Consumer端的GC表现比如是否出现了频繁的Full GC导致的长时间停顿从而引发了Rebalance。为什么特意说GC那是因为在实际场景中我见过太多因为GC设置不合理导致程序频发Full GC而引发的非预期Rebalance了。

小结

总而言之,我们一定要避免因为各种参数或逻辑不合理而导致的组成员意外离组或退出的情形,与之相关的主要参数有:

  • session.timeout.ms
  • heartbeat.interval.ms
  • max.poll.interval.ms
  • GC参数

按照我们今天所说的内容恰当地设置这些参数你一定能够大幅度地降低生产环境中的Rebalance数量从而整体提升Consumer端TPS。

开放讨论

说说在你的业务场景中Rebalance发生的频率、原因以及你是怎么应对的我们一起讨论下是否有更好的解决方案。

欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。