You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

14 KiB

18 | Kafka中位移提交那些事儿

你好我是胡夕。今天我们来聊聊Kafka中位移提交的那些事儿。

之前我们说过Consumer端有个位移的概念它和消息在分区中的位移不是一回事儿虽然它们的英文都是Offset。今天我们要聊的位移是Consumer的消费位移它记录了Consumer要消费的下一条消息的位移。这可能和你以前了解的有些出入不过切记是下一条消息的位移而不是目前最新消费消息的位移。

我来举个例子说明一下。假设一个分区中有10条消息位移分别是0到9。某个Consumer应用已消费了5条消息这就说明该Consumer消费了位移为0到4的5条消息此时Consumer的位移是5指向了下一条消息的位移。

Consumer需要向Kafka汇报自己的位移数据这个汇报过程被称为提交位移Committing Offsets。因为Consumer能够同时消费多个分区的数据所以位移的提交实际上是在分区粒度上进行的Consumer需要为分配给它的每个分区提交各自的位移数据

提交位移主要是为了表征Consumer的消费进度这样当Consumer发生故障重启之后就能够从Kafka中读取之前提交的位移值然后从相应的位移处继续消费从而避免整个消费过程重来一遍。换句话说位移提交是Kafka提供给你的一个工具或语义保障你负责维持这个语义保障即如果你提交了位移X那么Kafka会认为所有位移值小于X的消息你都已经成功消费了。

这一点特别关键。因为位移提交非常灵活你完全可以提交任何位移值但由此产生的后果你也要一并承担。假设你的Consumer消费了10条消息你提交的位移值却是20那么从理论上讲位移介于1119之间的消息是有可能丢失的相反地如果你提交的位移值是5那么位移介于59之间的消息就有可能被重复消费。所以我想再强调一下位移提交的语义保障是由你来负责的Kafka只会“无脑”地接受你提交的位移。你对位移提交的管理直接影响了你的Consumer所能提供的消息语义保障。

鉴于位移提交甚至是位移管理对Consumer端的巨大影响Kafka特别是KafkaConsumer API提供了多种提交位移的方法。从用户的角度来说位移提交分为自动提交和手动提交从Consumer端的角度来说位移提交分为同步提交和异步提交

我们先来说说自动提交和手动提交。所谓自动提交就是指Kafka Consumer在后台默默地为你提交位移作为用户的你完全不必操心这些事而手动提交则是指你要自己提交位移Kafka Consumer压根不管。

开启自动提交位移的方法很简单。Consumer端有个参数enable.auto.commit把它设置为true或者压根不设置它就可以了。因为它的默认值就是true即Java Consumer默认就是自动提交位移的。如果启用了自动提交Consumer端还有个参数就派上用场了auto.commit.interval.ms。它的默认值是5秒表明Kafka每5秒会为你自动提交一次位移。

为了把这个问题说清楚我给出了完整的Java代码。这段代码展示了设置自动提交位移的方法。有了这段代码做基础今天后面的讲解我就不再展示完整的代码了。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "2000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

上面的第3、第4行代码就是开启自动提交位移的方法。总体来说还是很简单的吧。

和自动提交相反的就是手动提交了。开启手动提交位移的方法就是设置enable.auto.commit为false。但是仅仅设置它为false还不够因为你只是告诉Kafka Consumer不要自动提交位移而已你还需要调用相应的API手动提交位移。

最简单的API就是KafkaConsumer#commitSync()。该方法会提交KafkaConsumer#poll()返回的最新位移。从名字上来看它是一个同步操作即该方法会一直等待直到位移被成功提交才会返回。如果提交过程中出现异常该方法会将异常信息抛出。下面这段代码展示了commitSync()的使用方法:

while (true) {
            ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofSeconds(1));
            process(records); // 处理消息
            try {
                        consumer.commitSync();
            } catch (CommitFailedException e) {
                        handle(e); // 处理提交失败异常
            }
}

可见调用consumer.commitSync()方法的时机是在你处理完了poll()方法返回的所有消息之后。如果你莽撞地过早提交了位移,就可能会出现消费数据丢失的情况。那么你可能会问,自动提交位移就不会出现消费数据丢失的情况了吗?它能恰到好处地把握时机进行位移提交吗?为了搞清楚这个问题,我们必须要深入地了解一下自动提交位移的顺序。

一旦设置了enable.auto.commit为trueKafka会保证在开始调用poll方法时提交上次poll返回的所有消息。从顺序上来说poll方法的逻辑是先提交上一批消息的位移再处理下一批消息因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于它可能会出现重复消费

在默认情况下Consumer每5秒自动提交一次位移。现在我们假设提交位移之后的3秒发生了Rebalance操作。在Rebalance之后所有Consumer从上一次提交的位移处继续消费但该位移已经是3秒前的位移数据了故在Rebalance发生前3秒消费的所有数据都要重新再消费一次。虽然你能够通过减少auto.commit.interval.ms的值来提高提交频率但这么做只能缩小重复消费的时间窗口不可能完全消除它。这是自动提交机制的一个缺陷。

反观手动提交位移它的好处就在于更加灵活你完全能够把控位移提交的时机和频率。但是它也有一个缺陷就是在调用commitSync()时Consumer程序会处于阻塞状态直到远端的Broker返回提交结果这个状态才会结束。在任何系统中因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈会影响整个应用程序的TPS。当然你可以选择拉长提交间隔但这样做的后果是Consumer的提交频率下降在下次Consumer重启回来后会有更多的消息被重新消费。

鉴于这个问题Kafka社区为手动提交位移提供了另一个API方法KafkaConsumer#commitAsync()。从名字上来看它就不是同步的而是一个异步操作。调用commitAsync()之后它会立即返回不会阻塞因此不会影响Consumer应用的TPS。由于它是异步的Kafka提供了回调函数callback供你实现提交之后的逻辑比如记录日志或处理异常等。下面这段代码展示了调用commitAsync()的方法:

while (true) {
            ConsumerRecords<String, String> records = 
	consumer.poll(Duration.ofSeconds(1));
            process(records); // 处理消息
            consumer.commitAsync((offsets, exception) -> {
	if (exception != null)
	handle(exception);
	});
}

commitAsync是否能够替代commitSync呢答案是不能。commitAsync的问题在于出现问题时它不会自动重试。因为它是异步操作倘若提交失败后自动重试那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此异步提交的重试其实没有意义所以commitAsync是不会重试的。

显然如果是手动提交我们需要将commitSync和commitAsync组合使用才能达到最理想的效果原因有两个

  1. 我们可以利用commitSync的自动重试来规避那些瞬时错误比如网络的瞬时抖动Broker端GC等。因为这些问题都是短暂的自动重试通常都会成功因此我们不想自己重试而是希望Kafka Consumer帮我们做这件事。
  2. 我们不希望程序总处于阻塞状态影响TPS。

我们来看一下下面这段代码它展示的是如何将两个API方法结合使用进行手动提交。

   try {
           while(true) {
                        ConsumerRecords<String, String> records = 
                                    consumer.poll(Duration.ofSeconds(1));
                        process(records); // 处理消息
                        commitAysnc(); // 使用异步提交规避阻塞
            }
} catch(Exception e) {
            handle(e); // 处理异常
} finally {
            try {
                        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
	} finally {
	     consumer.close();
}
}

这段代码同时使用了commitSync()和commitAsync()。对于常规性、阶段性的手动提交我们调用commitAsync()避免程序阻塞而在Consumer要关闭前我们调用commitSync()方法执行同步阻塞式的位移提交以确保Consumer关闭前能够保存正确的位移数据。将两者结合后我们既实现了异步无阻塞式的位移管理也确保了Consumer位移的正确性所以如果你需要自行编写代码开发一套Kafka Consumer应用那么我推荐你使用上面的代码范例来实现手动的位移提交。

我们说了自动提交和手动提交也说了同步提交和异步提交这些就是Kafka位移提交的全部了吗其实我们还差一部分。

实际上Kafka Consumer API还提供了一组更为方便的方法可以帮助你实现更精细化的位移管理功能。刚刚我们聊到的所有位移提交都是提交poll方法返回的所有消息的位移比如poll方法一次返回了500条消息当你处理完这500条消息之后前面我们提到的各种方法会一次性地将这500条消息的位移一并处理。简单来说就是直接提交最新一条消息的位移。但如果我想更加细粒度化地提交位移,该怎么办呢?

设想这样一个场景你的poll方法返回的不是500条消息而是5000条。那么你肯定不想把这5000条消息都处理完之后再提交位移因为一旦中间出现差错之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候我们希望将一个大事务分割成若干个小事务分别提交这能够有效减少错误恢复的时间。

在Kafka中也是相同的道理。对于一次要处理很多消息的Consumer而言它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个5000条消息的例子你可能希望每处理完100条消息就提交一次位移这样能够避免大批量的消息重新消费。

庆幸的是Kafka Consumer API为手动提交提供了这样的方法commitSync(Map<TopicPartition, OffsetAndMetadata>)和commitAsync(Map<TopicPartition, OffsetAndMetadata>)。它们的参数是一个Map对象键就是TopicPartition即消费的分区而值是一个OffsetAndMetadata对象保存的主要是位移数据。

就拿刚刚提过的那个例子来说如何每处理100条消息就提交一次位移呢在这里我以commitAsync为例展示一段代码实际上commitSync的调用方法和它是一模一样的。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
            ConsumerRecords<String, String> records = 
	consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record: records) {
                        process(record);  // 处理消息
                        offsets.put(new TopicPartition(record.topic(), record.partition()),
                                   new OffsetAndMetadata(record.offset() + 1)
                       ifcount % 100 == 0
                                    consumer.commitAsync(offsets, null); // 回调处理逻辑是null
                        count++;
	}
}

简单解释一下这段代码。程序先是创建了一个Map对象用于保存Consumer消费处理过程中要提交的分区位移之后开始逐条处理消息并构造要提交的位移值。还记得之前我说过要提交下一条消息的位移吗这就是这里构造OffsetAndMetadata对象时使用当前消息位移加1的原因。代码的最后部分是做位移的提交。我在这里设置了一个计数器每累计100条消息就统一提交一次位移。与调用无参的commitAsync不同这里调用了带Map对象参数的commitAsync进行细粒度的位移提交。这样这段代码就能够实现每处理100条消息就提交一次位移不用再受poll方法返回的消息总数的限制了。

小结

好了我们来总结一下今天的内容。Kafka Consumer的位移提交是实现Consumer端语义保障的重要手段。位移提交分为自动提交和手动提交而手动提交又分为同步提交和异步提交。在实际使用过程中推荐你使用手动提交机制因为它更加可控也更加灵活。另外建议你同时采用同步提交和异步提交两种方式这样既不影响TPS又支持自动重试改善Consumer应用的高可用性。总之Kafka Consumer API提供了多种灵活的提交方法方便你根据自己的业务场景定制你的提交策略。

开放讨论

实际上手动提交也不能避免消息重复消费。假设Consumer在处理完消息和提交位移前出现故障下次重启后依然会出现消息重复消费的情况。请你思考一下如何实现你的业务场景中的去重逻辑呢

欢迎写下你的思考和答案,我们一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。