gitbook/业务开发算法50讲/docs/473255.md
2022-09-03 22:05:03 +08:00

20 KiB
Raw Permalink Blame History

09二分如何高效查询Kafka中的消息

你好,我是微扰君。

今天我们来学习另一个常用的算法思想,二分法。这个算法思想相信即使你没有什么开发经验也不会感到陌生,而且之前讲红黑树的时候我们也简单聊过。

不知道你有没有玩过“猜数字”的游戏。大家规定一个范围一个人在心里想一个这个范围内的具体数字比如一个1-100的自然数然后另几个人来猜数字每次猜错这个人都会提示他们的猜测是大了还是小了看谁最快猜到数字。

如果你做这个游戏会怎么猜呢从1开始顺次猜吗我反正不会这么猜出于一个很简单的直觉如果1猜错了那么出题的同学给你的提示对可选范围的缩小非常有限也就是从1-100变成了2-100。

我想很多人第一反应也都会是从比较中间的位置比如50开始猜起。毕竟如果50猜错了因为要提示是大了还是小了范围就要么缩小到1-49要么缩小到51-100这样猜测范围就可以成倍的缩小。

所以,如果每一次我们都猜测可能范围内的中间值,那么即使猜错了也能成倍的缩小范围,这样的策略其实就是二分查找算法

有了二分查找算法即使更大的范围内进行游戏比如在1-1,000,000的范围内我们按照二分的策略最多也只需要20次即可完成任意数字的猜测这是遍历数字猜测所远远做不到的。可以看下图有一个直观的认知。

不过这样凭感觉的分析肯定是不行的,我们来严谨地讨论一下二分查找相比于线性查找,到底有多大的优势吧。

二分查找

在具体比较它俩的复杂度和实现之前,首先,我们要知道二分查找相比于线性查找更快是有先决条件的,就是查找的范围内的元素一定是有序排列的

比如在刚刚说的猜数字游戏里我们之所以每次能排除一半的搜索空间就是因为数字整体是有序排列的如果某次猜测的数x比目标值target大那么当然比x更大的数就没有必要猜测了。

那什么是无序的排列呢我们换个例子假设要从一个由字母构成的数组中寻找某个目标字母“G”是否存在。

如果字母数组本身是按照字母序排序的,那么显然可以用二分查找法,和翻字典的过程其实差不多,如果我们打开的当前页比目标字母要字母序更靠前,那么我们肯定会往后翻,反之则会往前翻。

但是如果字母数组并不是按照字母序排列的,而是随机排列没有规律可言,这样唯一的做法就只有遍历数组的每个元素逐一对比了,因为在乱序的数组中的任意位置和目标字母进行比较,不会有任何有用的信息可以告诉我们应该要往前找还是往后找

这也是为什么有序的结构在很多情况下是更受偏爱的,我们在许多场景下,会先对数组元素进行排序预处理,再进行后续的其他操作;哪怕我们知道,排序,即使是内排序,也会花费不菲的代价,但它带来的收益可能是更高的。

在算法面试中你可能会经常碰到这样要先进行排序预处理的题目比如力扣上经典的两数之和题有一种基于双指针的做法就是要先进行排序预处理的而采用快排这样的O(nlgn)时间复杂度的算法,即使多出了预处理的步骤,也可以让我们获得比暴力法更好的时间复杂度的算法。如果你感兴趣可以课后尝试解决一下相关的题目。

二分查找

所以我们也来看一下数组上的二分查找算法在维基百科上的严格定义:

二分查找是一种在有序数组中查找某一特定元素的搜索算法。

搜索过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜索过程结束;如果某一特定元素大于或者小于中间元素,则在数组大于或小于中间元素的那一半中查找,而且跟开始一样从中间元素开始比较。如果在某一步骤数组为空,则代表找不到。

这种搜索算法每一次比较都使搜索范围缩小一半。

核心就是查找元素需要可比较且有序的排列

我们来简单分析一下二分查找的时间复杂度。也很简单假设我们搜索的空间内一共有N个元素它们可以根据某种比较函数升序排列那基于二分查找的策略均摊下来需要比较多少次呢

比较理想的情况比如查找的元素正好在序列正中间搜索一次就可以返回了。但我们需要考虑最差的情况。由于每次检索至少能排除一半的空间假设一开始的搜索空间大小是N那么我们的搜索空间在最差情况下会构成一个等比数列下图是一个N=16时具体的例子

当搜索空间里只剩一个可能元素时也就是最后一次猜测我们要么猜到了答案要么就是答案不存在。这样最坏的搜索次数就是最大搜索空间折半多少次可以变成1。所以二分搜索的时间复杂度就是O(logn)了。

二分查找的实现

用代码实现的时候往往会用数组来存储有序排列的待搜索元素这里假设整个数组元素是升序排列的。我们用left和right两个整型作为数组的下标代表搜索空间的左右边界进行循环猜测。

每次猜测我们都会选择可选范围内最中间的元素去和目标值比较,最中间元素的数组下标为(left+right)/2。如果和目标值一样我们就猜中了答案如果比目标值大说明比当前元素大的元素都不可能了我们应该把可能范围的右边界移到当前位置之前反之就应该把左边界移到当前位置之后。

以猜数字为例力扣上的374题描述的就是这个游戏对应的二分查找代码可以这样写

// Forward declaration of guess API.
// @param num, your guess
// @return -1 if my number is lower, 1 if my number is higher, otherwise return 0
int guess(int num); // num比答案高返回-1 否则返回1

class Solution {
public:
    int guessNumber(int n) {
        int l = 1;
        int r = n;

        while(l < r) {
            int mid = l + (r-l)/2; // 每次用左右边界的中点作为猜测值

            if (guess(mid) == 0) return mid; //猜中直接返回

            if (guess(mid) < 0) { // 猜的数大了
                r = mid - 1;
            } else { // 猜的数小了
                l = mid+1;
            }
        }

        return l;
    }
};

你只要记住我们始终让l和r表示可能的范围再根据中间值比较的结果进行边界的缩放代码是很容易实现的。

这里我们也对比线性搜索的代码看一看:

// Forward declaration of guess API.
// @param num, your guess
// @return -1 if my number is lower, 1 if my number is higher, otherwise return 0
int guess(int num);

class Solution {
public:
    int guessNumber(int n) {
      for (int i = 1; i <= n; i++) {
        if (guess[i] == 0) return i;
      }
      return -1;
for        
    }
};

可以看到在线性搜索的代码里其实没有用到guess[i]为-1或者1的信息也就没有利用到数字本身是有序的特点。当目标值为n的时候我们需要比较n次才能得到答案所以均摊的整体时间复杂度为O(N)。在N很大的时候线性查找会比二分查找慢很多。

二分查找的应用

那二分查找在工程中常用吗? 可太常用了下面我们就以Kafka的索引查询为例学习一下二分查找在工程实战中可以发挥的巨大威力。

当然你可能会说,平时写业务代码的时候好像也没怎么写过二分查找。这其实也很正常,一是因为大部分时候,业务代码很少会在内存中存储大量的线性有序数据,在需要比较大量数据的检索时,我们往往会依赖底层的中间件;而数据量比较小时,线性查找和二分查找可能也差别不大了;另外我们也常常会用一些如红黑树这样的结构去存储有序集合,检索的时候也不会用到二分搜索这样在线性容器内的操作。

不过作为有追求的程序员这种非常基础的算法思想我们还是很有必要掌握的不止是能帮助你通过面试更能帮助你更好地理解像Kafka这样的中间件的部分底层实现原理。

Kafka

我们知道Kafka是一款性能强大且相当常用的分布式消息队列常常用于对流量进行消峰、解耦系统和异步处理部分逻辑以提高性能的场景不太了解的同学可以去看一下官网的介绍

在Kafka中所有的消息都是以“日志”的形式存储的。这里的“日志”不是说一般业务代码中用于debug的日志而是一种存储的范式这种范式只允许我们在文件尾部追加新数据而不允许修改文件之前的任何内容。

简单理解,你可以认为Kafka的海量消息就是按照写入的时间顺序依次追加在许多日志文件中。那在某个日志文件中每条消息自然会距离第一条消息有一个对应的offset不过这里的offset更像是一个消息的自增ID而不是一个消息在文件中的偏移量。

为什么说是许多日志文件,而不是一个巨型的日志文件呢?这也是一个常用的计算机思想:分片。在这里,分片可以让我们更快速、更方便地删除部分无用文件,提高磁盘的利用率。

Kafka日志文件具体的存储方式可以参考这张图。Kafka的每个topic会有多个partition每个partition下的日志都按照顺序分成一个个有序的日志段顺次排列

怎么找到消息

我们知道Kafka虽然不允许从尾部以外的地方插入或者修改数据但我们在Kafka中还是很可能需要从某个时间点开始读数据的这就意味着我们要通过一个offset快速查找到某条消息在日志文件的什么位置。这里再强调一下kafka中的offset实际上是类似于消息自增ID的存在并不是真的在磁盘上的偏移量。

但由于每条消息的消息体不同每条消息所占用的磁盘大小都是不同的只有offset没有办法直接定位到文件的位置。所以我们要么遍历日志文件进行查找要么我们为日志文件建立一套索引系统将消息offset和在文件中的position关联起来这样我们就可以利用消息offset的有序性通过二分法加速查找了

下面是一个典型的某个topic的某个partition下file日志文件的存储情况。

00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000000035.log
00000000000000000035.index
00000000000000000035.timeindex

  • .log文件就是存储了消息体本身的日志文件
  • .index文件就是用于帮我们快速检索消息在文件中位置的索引文件
  • 这里还有个.timeindex后缀的文件它和index其实差不多都是索引文件只不过在这个文件中关联position的变成了时间戳。

所有的文件名都是在这个file下的第一条消息距离Kafka整体的第一条消息的offset也就是绝对偏移量那么在一个index文件内我们就只需要用更小的空间存储相对偏移量即可

而index文件的内容也很简单就是用固定大小的记录来标记一对“消息offset”和“消息在log文件中的位置position”的关系当然我们会保证消息offset是递增的。下图是一个简单的示意。有了这个索引文件就可以快速地进行二分查找了。

当然这里还有个小细节不知道你有没有注意到在index文件中文件中的offset并不是连续存储的。这会导致我们拿着offset在index中查询时只能大致查找到一段可能的范围之后在.log文件中我们还需要在查找的最接近的消息的位置往后顺序遍历才可以找到真正的offset所在精确位置。

比如要查询offset为29的消息在索引表中只能定位到offset为26的位置在838然后我们还要从838的位置开始在.log文件中往后遍历查询直到找到offset为29的消息。

这其实是一个时空效率的权衡为了使用更少的内存空间Kafka采用的是稀疏不连续的索引,在实战中起到了非常好的效果。

好说到这相信让你基于Kafka的存储体系去实现指定offset消息的查询也可以轻松实现了吧。不过这里要稍微再多说一点很可能你所面对的不是一个可以在内存中放得下的简单索引文件而是一个比内存大得多的存放在磁盘上的东西。怎么办

Kafka的做法是基于mmap技术将硬盘上的文件和内存进行映射当然由于硬盘的空间可能比内存大很多所以并不能够直接将内存在物理层面上与磁盘进行一一映射这里我们需要引入虚拟内存的手段。这点我们会在操作系统篇讲解LRU缓存置换算法的时候进一步讨论。

那最后我们来看一下kafka中的代码到底是怎么写的吧。

kafka源码实现

你可以对比一下自己想的实现看看有没有什么差别。当然Kafka用的是scala语言你可能需要花一点时间理解一下基础的语法。

在整个Kafka中二分搜索的核心作用就是用于加速索引指定offset的消息所以相应的代码都在 core/src/main/scala/kafka/log/AbstractIndex.scala 中。 indexSlogRangeFor 就是用于检索目标值的函数,其返回值就代表可能范围的上下界,我们会不断的递归搜索,如果最终返回的下界和上界相等,就说明我们找到了目标值。

      /**
	   * Lookup lower and upper bounds for the given target.
	   */
	  private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
	    // 检查index是否为空
	    if(_entries == 0)
	      return (-1, -1)
	
        // 二分搜索
	    def binarySearch(begin: Int, end: Int) : (Int, Int) = {
	      var lo = begin
	      var hi = end
	      while(lo < hi) {
	        val mid = ceil(hi/2.0 + lo/2.0).toInt
	        val found = parseEntry(idx, mid)
	        val compareResult = compareIndexEntry(found, target, searchEntity)
	        if(compareResult > 0)
	          hi = mid - 1
	        else if(compareResult < 0)
	          lo = mid
	        else
	          return (mid, mid)
	      }
	      (lo, if (lo == _entries - 1) -1 else lo + 1)
	    }
	
	    val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
	    // 查询的目标offset是否在热区
	    if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
	      return binarySearch(firstHotEntry, _entries - 1)
	    }
	
	    // 查询的目标offset是否小于最小的offset
	    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
	      return (-1, 0)
	
	    return binarySearch(0, firstHotEntry)
	  }

看第9-25行代码和前面写的“猜数字”代码看起来是不是如出一辙呢就是简单的二分搜索相信你应该没有什么问题了。

我们稍微解释一下27行到37行的代码。你可能会很疑惑有了二分搜索函数我们直接检索 binarySearch(0, _entries - 1) 不就可以了吗?为什么还要分两段检索呢?

这其实也涉及到操作系统、内存和mmap的工作机制。

前面我们提到Kafka利用mmap将更大的磁盘文件映射到了一个虚拟内存空间但底层的内存存储其实是相对小的所以很多时候我们需要将一部分暂时不用的空间从内存中置换出去把需要访问但此时不在内存中的文件置换进来这个方法叫做内存置换,每次内存置换都会触发一次“缺页中断”之后我们会在LRU的章节里展开讲解现在你只需要知道这个操作显然是需要比较高昂的成本就可以了。

而Kafka消息队列的特性也决定了我们大部分的索引查询其实都是在日志比较靠近尾部的区域(数据比较新)。

那么,如果我们将索引中最后的 8KB 认定为“热区”,是大部分查询所会命中的区域,剩余的区域认定为是“冷区”,但每次查询的中间位置随着日志的增长就很容易变成冷区,就很容易触发缺页中断。一个典型查询在内存中产生的冷热页面的对比例子可以参见下图:

如果我们在查询的时候分成两段优先查询热区没有命中时再查询冷区是不是就能大大减少“缺页中断”的次数了呢是的对于查询常在尾部出现的情况下采用冷热分区的二分查询算法能很好地优化性能。具体issue可以参考这个

可以看到相比简单的内存中的二分查询Kafka中的二分查询考虑了更多的“现实”问题这也是我们在工程中遇到算法问题和平时做算法题的算法的一个很大的差异。所以如果我们想要真的把算法很好的应用于工程中除了对算法本身的掌握需要过硬也需要真正打好计算机基础知识对程序和操作系统的运行了如指掌才能真正写出高性能的代码。

总结

相信通过今天的学习你已经学会了如何给一个基于“日志”存储的消息队列建立消息的索引查询了吧。通过线性有序的索引文件我们其实可以为任何需要查询的系统进行基于二分法的查询以优化查询效率。二分查找的核心就在于相比于线性查找我们可以在每次查询中成倍地缩小可能的查询范围达到O(logN)的时间复杂度。

所以在工程中,建立索引文件,或者是对业务数据进行排序,都是常用的预处理手段。通过一次计算,就可以帮助我们在之后的查询操作中获得更好的效率,是典型的空间换时间的手段。希望你在工作生活中可以灵活运用今天学习的知识。

课后作业

那最后提一个小问题。 既然建立线性的索引文件就可以帮助我们加速查询的过程那为什么在许多情况下我们还需要使用诸如红黑树、B+树这样的复杂索引结构呢比如InnoDB的索引文件就采用了B+Tree它和Kafka所选择的顺序稀疏索引文件各有什么优劣呢

欢迎你在留言区和我一起讨论。如果有收获也欢迎你转发给身边的朋友,邀他一起学习。我们下节课见~