You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

12 KiB

08 | 答疑解惑(一) : 网关如何接收服务端的秒杀结果?

你好,我是李玥。

我们的“消息队列高手课”专栏自从上线到现在同学们的学习热情和参与度都非常高。每一节课都有很多同学留言评论这些留言里有总结知识分享收获的有提出精彩问题的还有给自己加油打气立Flag的竟然还有说老师长得像黄渤的。我又仔细去看了一下配图还是真挺像的。下次老师和极客时间的设计师小姐姐说一样让她们照着吴彦祖来P图。

同学们每一条的留言我都认真看过,大部分留言我都给出了回复。在基础篇的最后一节课,我来统一解答一下大家都比较关注的一些问题。

1. 网关如何接收服务端的秒杀结果?

在《01 | 为什么需要消息队列?》这节课里面我们举了一个秒杀的例子这个例子是用来说明消息队列是如何来实现异步处理的。课后很多同学留言提问网关在发送消息之后是如何来接收后端服务的秒杀结果又如何来给APP返回响应的呢

在解答这个问题之前,我需要先说一下,实际生产环境中的秒杀系统,远比我们举的这个例子复杂得多,实现方案也是多种多样的,不是说一定要按照我们这个例子的方式来实现。

在这个例子中,网关接收后端服务秒杀结果,实现的方式也不只一种,这里我给大家提供一个比较简单的方案。

比如说用Java语言来举例子

public class RequestHandler {
  
  // ID生成器
  @Inject
  private IdGenerator idGenerator;
  // 消息队列生产者
  @Inject
  private Producer producer;
  // 保存秒杀结果的Map
  @Inject
  private Map<Long, Result> results;

  // 保存mutex的Map
  private Map<Long, Object> mutexes = new ConcurrentHashMap<>();
  // 这个网关实例的ID
  @Inject
  private long myId;

  @Inject
  private long timeout;

  // 在这里处理APP的秒杀请求
  public Response onRequest(Request request) {
    // 获取一个进程内唯一的UUID作为请求id
    Long uuid = idGenerator.next();
    try {

      Message msg = composeMsg(request, uuid, myId);

      // 生成一个mutex用于等待和通知
      Object mutex = new Object();
      mutexes.put(uuid, mutex)

      // 发消息
      producer.send(msg);

      // 等待后端处理
      synchronized(mutex) {
        mutex.wait(timeout);
      }

      // 查询秒杀结果
      Result result = results.remove(uuid);

      // 检查秒杀结果并返回响应
      if(null != result && result.success()){
        return Response.success();
      }

    } catch (Throwable ignored) {}
    finally {
      mutexes.remove(uuid);
    }
    // 返回秒杀失败
    return Response.fail();
  }

  // 在这里处理后端服务返回的秒杀结果
  public void onResult(Result result) {

    Object mutex = mutexes.get(result.uuid());
    if(null != mutex) { // 如果查询不到说明已经超时了丢弃result即可。
      // 登记秒杀结果
      results.put(result.uuid(), result);
      // 唤醒处理APP请求的线程
      synchronized(mutex) {
        mutex.notify();
      }
    }
  }
}

在这个方案中网关在收到APP的秒杀请求后直接给消息队列发消息。至于消息的内容并不一定是APP请求的Request只要包含足够的字段就行了比如用户ID、设备ID、请求时间等等。另外还需要包含这个请求的ID和网关的ID这些后面我们会用到。

如果发送消息失败可以直接给APP返回秒杀失败结果成功发送消息之后线程就阻塞等待秒杀结果。这里面不可能无限等待下去需要设定一个等待的超时时间。

等待结束之后去存放秒杀结果的Map中查询是否有返回的秒杀结果如果有就构建Response给APP返回秒杀结果如果没有按秒杀失败处理。

这是处理APP请求的线程接下来我们来看一下网关如何来接收从后端秒杀服务返回的秒杀结果。

我们可以选择用RPC的方式来返回秒杀结果这里网关节点是RPC服务端后端服务为客户端。之前网关发出去的消息中包含了网关的ID后端服务可以通过这个网关ID来找到对应的网关实例秒杀结果中需要包含请求ID这个请求ID也是从消息中获取的。

网关收到后端服务的秒杀结果之后用请求ID为Key把这个结果保存到秒杀结果的Map中然后通知对应的处理APP请求的线程结束等待。我刚刚说过处理APP请求的线程在结束等待之后会去秒杀的结果Map中查询这个结果然后再给APP返回响应。

我把这个处理过程的流程图放在这里,便于你理解:


这个解决方案还不是一个性能最优的方案处理APP请求的线程需要同步等待秒杀结果。后面课程中我们会专门来讲如何使用异步方式来提升程序的性能。

2. 详解RocketMQ和Kafka的消息模型

我在看《03 | 消息模型:主题和队列有什么区别?》这节课的留言时发现不少同学对RocketMQ和kafka的消息模型理解的还不是很透彻这两个消息队列产品的消息模型是一样的我在这里再把这个模型相关的概念通过一个例子详细地说一说。

假设有一个主题MyTopic我们为主题创建5个队列分布到2个Broker中。

先说消息生产这一端假设我们有3个生产者实例Produer0Produer1和Producer2。

这3个生产者是如何对应到2个Broker的又是如何对应到5个队列的呢这个很简单不用对应,随便发。每个生产者可以在5个队列中轮询发送也可以随机选一个队列发送或者只往某个队列发送这些都可以。比如Producer0要发5条消息可以都发到队列Q0里面也可以5个队列每个队列发一条。

然后说消费端,很多同学没有搞清楚消费组、消费者和队列这几个概念的对应关系。

每个消费组就是一份订阅它要消费主题MyTopic下所有队列的全部消息。注意队列里的消息并不是消费掉就没有了这里的“消费”只是去队列里面读了消息并没有删除消费完这条消息还是在队列里面。

多个消费组在消费同一个主题时消费组之间是互不影响的。比如我们有2个消费组G0和G1。G0消费了哪些消息G1是不知道的也不用知道。G0消费过的消息G1还可以消费。即使G0积压了很多消息对G1来说也没有任何影响。

然后我们再说消费组的内部一个消费组中可以包含多个消费者的实例。比如说消费组G1包含了2个消费者C0和C1那这2个消费者又是怎么和主题MyTopic的5个队列对应的呢

由于消费确认机制的限制这里面有一个原则是在同一个消费组里面每个队列只能被一个消费者实例占用。至于如何分配这里面有很多策略我就不展开说了。总之保证每个队列分配一个消费者就行了。比如我们可以让消费者C0消费Q0Q1和Q2C1消费Q3和Q4如果C0宕机了会触发重新分配这时候C1同时消费全部5个队列。

再强调一下队列占用只是针对消费组内部来说的对于其他的消费组来说是没有影响的。比如队列Q2被消费组G1的消费者C1占用了对于消费组G2来说是完全没有影响的G2也可以分配它的消费者来占用和消费队列Q2。

最后说一下消费位置,每个消费组内部维护自己的一组消费位置,每个队列对应一个消费位置。消费位置在服务端保存,并且,消费位置和消费者是没有关系的。每个消费位置一般就是一个整数,记录这个消费组中,这个队列消费到哪个位置了,这个位置之前的消息都成功消费了,之后的消息都没有消费或者正在消费。

我把咱们这个例子的消费位置整理成下面的表格,便于你理解。

你可以看到,这个表格中并没有消费者这一列,也就是说消费者和消费位置是没有关系的。

3. 如何实现单个队列的并行消费?

下面说一下《03 | 消息模型:主题和队列有什么区别?》这节课的思考题如果不要求严格顺序如何实现单个队列的并行消费关于这个问题有很多的实现方式在JMQ京东自研的消息队列产品它实现的思路是这样的。

比如说队列中当前有10条消息对应的编号是0-9当前的消费位置是5。同时来了三个消费者来拉消息把编号为5、6、7的消息分别给三个消费者每人一条。过了一段时间三个消费成功的响应都回来了这时候就可以把消费位置更新为8了这样就实现并行消费。

这是理想的情况。还有可能编号为6、7的消息响应回来了编号5的消息响应一直回不来怎么办这个位置5就是一个消息空洞。为了避免位置5把这个队列卡住可以先把消费位置5这条消息复制到一个特殊重试队列中然后依然把消费位置更新为8继续消费。再有消费者来拉消息的时候优先把重试队列中的那条消息给消费者就可以了。

这是并行消费的一种实现方式。需要注意的是,并行消费开销还是很大的,不应该作为一个常规的,提升消费并发的手段,如果消费慢需要增加消费者的并发数,还是需要扩容队列数。

4. 如何保证消息的严格顺序?

很多同学在留言中问,怎么来保证消息的严格顺序?我们多次提到过,主题层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。

如果说你的业务必须要求全局严格顺序就只能把消息队列数配置成1生产者和消费者也只能是一个实例这样才能保证全局严格顺序。

大部分情况下,我们并不需要全局严格顺序,只要保证局部有序就可以满足要求了。比如,在传递账户流水记录的时候,只要保证每个账户的流水有序就可以了,不同账户之间的流水记录是不需要保证顺序的。

如果需要保证局部严格顺序可以这样来实现。在发送端我们使用账户ID作为Key采用一致性哈希算法计算出队列编号指定队列来发送消息。一致性哈希算法可以保证相同Key的消息总是发送到同一个队列上这样可以保证相同Key的消息是严格有序的。如果不考虑队列扩容也可以用队列数量取模的简单方法来计算队列编号。

写在最后

在留言中很多同学留言提出来能不能讲一讲某个消息队列的某个功能具体如何配置。我的建议是你先不要太关注功能、API和配置这些细节在学习如何使用消息队列的过程中要保持一定的高度来学习。

因为使用消息队列,大部分的难点在宏观架构层面,要解决这些难点,你需要掌握消息队列宏观层面上的实现原理和最佳实践,这样,无论你使用什么消息队列,都可以做到游刃有余。在选定了合适的消息队列产品,准备写代码之前,再去文档中查看这些细节都来得及。

所以,我们专栏的“基础篇”讲消息队列的使用,更多讲的是一些通用的原理。这节课是我们消息队列高手课“基础篇”的最后一节课,完整基础篇的学习后,意味着你已经是一个使用消息队列的小达人了。

在“进阶篇”中,我们将把学习重点从“如何使用”转为“如何实现”,在学习消息队列的实现技术时,你反而要专注到每一个技术点上,深入下去,把每个细节都要搞清楚、学透。课程的深度、难度也会逐步加强,当然你获得的经验值也会更多。

希望大家一如既往坚持学习,多思考,多练习,跟老师一起打怪升级,成为真正的高手。

感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给你的朋友。