You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
12 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 19 | 消息队列:如何降低消息队列系统中消息的延迟?
你好,我是唐扬。
学完前面两节课之后,相信你对在垂直电商项目中如何使用消息队列应对秒杀时的峰值流量已经有所了解。当然了,你也应该知道要如何做才能保证消息不会丢失,尽量避免消息重复带来的影响。**那么我想让你思考一下:**除了这些内容,你在使用消息队列时还需要关注哪些点呢?
**先来看一个场景:**在你的垂直电商项目中,你会在用户下单支付之后向消息队列里面发送一条消息,队列处理程序消费了消息后会增加用户的积分或者给用户发送优惠券。用户在下单之后,等待几分钟或者十几分钟拿到积分和优惠券是可以接受的,但是一旦消息队列出现大量堆积,用户消费完成后几小时还拿到优惠券,那就会有用户投诉了。
这时你要关注的就是消息队列中消息的延迟了,这其实是消费性能的问题,那么你要如何提升消费性能保证更短的消息延迟呢?**在我看来,**首先需要掌握如何来监控消息的延迟,因为有了数据之后你才可以知道目前的延迟数据是否满足要求,也可以评估优化之后的效果。然后你要掌握使用消息队列的正确姿势以及关注消息队列本身是如何保证消息尽快被存储和投递的。
接下来,我们先来看看第一点:如何监控消息延迟。
## 如何监控消息延迟
在我看来,监控消息的延迟有两种方式:
* 使用消息队列提供的工具,通过监控消息的堆积来完成;
* 通过生成监控消息的方式来监控消息的延迟情况。
接下来,我带你实际了解一下。
假设在开篇的场景之下电商系统中的消息队列已经堆积了大量的消息那么你要想监控消息的堆积情况首先需要从原理上了解在消息队列中消费者的消费进度是多少因为这样才方便计算当前的消费延迟是多少。比如生产者向队列中一共生产了1000条消息某一个消费者消费进度是900条那么这个消费者的消费延迟就是100条消息。
**在Kafka中消费者的消费进度在不同的版本上是不同的。**
在Kafka0.9之前的版本中消费进度是存储在ZooKeeper中的消费者在消费消息的时候先要从ZooKeeper中获取最新的消费进度再从这个进度的基础上消费后面的消息。
在Kafka0.9版本之后消费进度被迁入到Kakfa的一个专门的topic叫“\_\_consumer\_offsets”里面。所以如果你了解kafka的原理可以依据不同的版本从不同的位置获取到这个消费进度的信息。
当然作为一个成熟的组件Kafka也提供了一些工具来获取这个消费进度的信息帮助你实现自己的监控这个工具主要有两个
**首先Kafka提供了工具叫做“kafka-consumer-groups.sh”**它在Kafka安装包的bin目录下
为了帮助你理解我简单地搭建了一个Kafka节点并且写入和消费了一些信息然后我来使用命令看看消息累积情况具体的命令如下
```
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
```
结果如下:
![](https://static001.geekbang.org/resource/image/40/4b/404811b07db8edb4c1bb9f1cfc0bc94b.jpg)
* 图中的前两列是队列的基本信息,包括话题名和分区名;
* 第三列是当前消费者的消费进度;
* 第四列是当前生产消息的总数;
* 第五列就是消费消息的堆积数(也就是第四列与第三列的差值)。
通过这个命令你可以很方便地了解消费者的消费情况。
**第二个工具是JMX。**
Kafka通过JMX暴露了消息堆积的数据我在本地启动了一个console consumer然后使用jconsole连接consumer就可以看到consumer的堆积数据了就是下图中红框里的数据。这些数据你可以写代码来获取这样也可以方便地输出到监控系统中**我比较推荐这种方式。**
![](https://static001.geekbang.org/resource/image/33/2c/3384d3fcb52f98815fac667e5b543e2c.jpg)
除了使用消息队列提供的工具以外,你还可以通过生成监控消息的方式来监控消息的延迟。**具体怎么做呢?**
你先定义一种特殊的消息,然后启动一个监控程序将这个消息定时地循环写入到消息队列中,消息的内容可以是生成消息的时间戳并且也会作为队列的消费者消费数据。业务处理程序消费到这个消息时直接丢弃掉,而监控程序在消费到这个消息时就可以和这个消息的生成时间做比较,如果时间差达到某一个阈值就可以向我们报警。
![](https://static001.geekbang.org/resource/image/34/7f/34820c0b27e66af37fda116a1a98347f.jpg)
这两种方式都可以监控消息的消费延迟情况,**而从我的经验出发,我比较推荐两种方式结合来使用。**比如在实际项目中我会优先在监控程序中获取JMX中的队列堆积数据做到dashboard报表中同时也会启动探测进程确认消息的延迟情况是怎样的。
在我看来,消息的堆积是对于消息队列的基础监控,这是你无论如何都要做的。但是了解了消息的堆积情况并不能很直观地了解消息消费的延迟,你也只能利用经验来确定堆积的消息量到了多少才会影响到用户的体验;而第二种方式对于消费延迟的监控则更加直观,而且从时间的维度来做监控也比较容易确定报警阈值。
了解了消息延迟的监控方式之后,我们再来看看如何提升消息的写入和消费性能,这样才会让异步的消息得到尽快的处理。
## 减少消息延迟的正确姿势
想要减少消息的处理延迟,我们需要在**消费端和消息队列**两个层面来完成。
在消费端的目标是提升消费者的消息处理能力,你能做的是:
* 优化消费代码提升性能;
* 增加消费者的数量(这个方式比较简单)。
不过第二种方式会受限于消息队列的实现。如果消息队列使用的是Kafka就无法通过增加消费者数量的方式来提升消息处理能力。
因为在Kafka中一个Topic话题可以配置多个Partition分区数据会被平均或者按照生产者指定的方式写入到多个分区中那么在消费的时候Kafka约定一个分区只能被一个消费者消费为什么要这么设计呢在我看来如果有多个consumer消费者可以消费一个分区的数据那么在操作这个消费进度的时候就需要加锁可能会对性能有一定的影响。
所以说,话题的分区数量决定了消费的并行度,增加多余的消费者也是没有用处的,你可以通过增加分区来提高消费者的处理能力。
![](https://static001.geekbang.org/resource/image/cd/39/cdd960f49f982f8b96ab466d7e4b2739.jpg)
那么,如何在不增加分区的前提下提升消费能力呢?
虽然不能增加consumer但你可以在一个consumer中提升处理消息的并行度所以可以考虑使用多线程的方式来增加处理能力你可以预先创建一个或者多个线程池在接收到消息之后把消息丢到线程池中来异步地处理这样原本串行的消费消息的流程就变成了并行的消费可以提高消息消费的吞吐量在并行处理的前提下我们就可以在一次和消息队列的交互中多拉取几条数据然后分配给多个线程来处理。
![](https://static001.geekbang.org/resource/image/2c/79/2c0eefd526eed3a1fe4df89f068daf79.jpg)
另外,在消费队列中数据的时候还需要注意消费线程空转的问题。
**我是在测试自己写的一个消息中间件的时候发现的。**当时我发现运行消费客户端的进程会偶发地出现CPU跑满的情况于是打印了JVM线程堆栈找到了那个跑满CPU的线程。这个时候才发现原来是消息队列中有一段时间没有新的消息于是消费客户端拉取不到新的消息就会不间断地轮询拉取消息这个线程就把CPU跑满了。
所以你在写消费客户端的时候要考虑这种场景拉取不到消息可以等待一段时间再来拉取等待的时间不宜过长否则会增加消息的延迟。我一般建议固定的10ms~100ms也可以按照一定步长递增比如第一次拉取不到消息等待10ms第二次20ms最长可以到100ms直到拉取到消息再回到10ms。
说完了消费端的做法之后,**再来说说消息队列本身在读取性能优化方面做了哪些事情。**
我曾经也做过一个消息中间件,在最初设计中间件的时候我主要从两方面考虑读取性能问题:
* 消息的存储;
* 零拷贝技术。
**针对第一点,**我最初在设计的时候为了实现简单使用了普通的数据库来存储消息但是受限于数据库的性能瓶颈读取QPS只能到2000后面我重构了存储模块使用本地磁盘作为存储介质。Page Cache的存在就可以提升消息的读取速度即使要读取磁盘中的数据由于消息的读取是顺序的并且不需要跨网络读取数据所以读取消息的QPS提升了一个数量级。
**另外一个优化点是零拷贝技术,**说是零拷贝,其实我们不可能消灭数据的拷贝,只是尽量减少拷贝的次数。在读取消息队列的数据的时候,其实就是把磁盘中的数据通过网络发送给消费客户端,在实现上会有四次数据拷贝的步骤:
1.数据从磁盘拷贝到内核缓冲区;
2.系统调用将内核缓存区的数据拷贝到用户缓冲区;
3.用户缓冲区的数据被写入到Socket缓冲区中
4.操作系统再将Socket缓冲区的数据拷贝到网卡的缓冲区中。
![](https://static001.geekbang.org/resource/image/52/8f/52c74ecac57e7a437442860029476d8f.jpg)
操作系统提供了Sendfile函数可以减少数据被拷贝的次数。使用了Sendfile之后在内核缓冲区的数据不会被拷贝到用户缓冲区而是直接被拷贝到Socket缓冲区节省了一次拷贝的过程提升了消息发送的性能。高级语言中对于Sendfile函数有封装比如说在Java里面的java.nio.channels.FileChannel类就提供了transferTo方法提供了Sendfile的功能。
![](https://static001.geekbang.org/resource/image/e3/ed/e38d36c7f077c6ce5b0b276efb8d4eed.jpg)
## 课程小结
本节课我带你了解了如何提升消息队列的性能来降低消息消费的延迟,这里我想让你明确的重点是:
* 我们可以使用消息队列提供的工具,或者通过发送监控消息的方式来监控消息的延迟情况;
* 横向扩展消费者是提升消费处理能力的重要方式;
* 选择高性能的数据存储方式配合零拷贝技术,可以提升消息的消费性能。
其实队列是一种常用的组件,只要涉及到队列,任务的堆积就是一个不可忽视的问题,**我遇到过的很多故障都是源于此。**
比如前一段时间处理的一个故障前期只是因为数据库性能衰减有少量的慢请求结果这些慢请求占满了Tomcat线程池导致整体服务的不可用。如果我们能对Tomcat线程池的任务堆积情况有实时的监控或者说对线程池有一些保护策略比方说线程全部使用之后丢弃请求也许就会避免故障的发生。在此我希望你在实际的工作中能够引以为戒只要有队列就要监控它的堆积情况把问题消灭在萌芽之中。
## 一课一思
在实际的项目中,你可能对于消息队列的使用已经很熟练了,那么结合今天的内容,你可以和我分享一下在研发过程中,你在降低消息延迟方面做过哪些事情呢?欢迎在留言区和我一起讨论,或者将你的实战经验分享给更多的人。
最后,感谢你的阅读,如果这篇文章让你有所收获,也欢迎你将它分享给更多的朋友。