# 34|快速分配和释放内存:内存池 你好,我是吴咏炜。 在上一讲讲过了性能测试之后,我们终于可以回到内存池这个话题,来深入讨论一下了。 ## 一个测试用例 如果你想用内存池,那我的第一个问题就是,你到底是不是需要使用内存池? 下面是一些你可能想使用内存池的理由: * 希望减少内存分配和释放的时间开销——更快的分配和释放 * 希望减少内存分配和释放的空间开销——更少的总体内存使用 下面则是一些反对使用内存池的理由: * 你的通用内存分配器可能已经足够快了 * 使用内存池可能导致操作系统更难回收你已经不再需要的内存 * 使用内存池可能使得你的对象较难跟其他对象进行交互(参考[第 32 讲](https://time.geekbang.org/column/article/491227),在 PMR 之前分配器是容器类型的一部分) 当然,既然你看到这里了,你肯定是想要使用内存池的。不过,我们需要能够衡量使用内存池的效果,所以我们需要进行测试。 如果你想要进行某个操作的性能测试,你就需要某种“典型场景”。作为例子,我这儿拿一个掺入了随机操作的过程当作测试场景。具体来说,我做的事情是: 1. 产生随机数 2. 把这些随机数插入到一个 `unordered_set` 中,测量所需的时间 3. 把这些随机数从这个 `unordered_set` 里逐个删除,测量所需的时间 4. 再把这些随机数重新插入到 `unordered_set` 中,测量所需的时间 这虽然不是一个完美的例子,但确实可以让我们观察到内存池的作用。如果你有真实的场景,也可以借鉴这种方式来进行测试。 我们的待测对象和类型非常简单: ```cpp using TestType = unordered_set; TestType s; ``` 产生随机数的代码略复杂一点点: ```cpp mt19937 engine; uniform_int_distribution dist; array rand_nums{}; for (int& num : rand_nums) { num = dist(engine); } ``` 我们希望得到跨平台的稳定测试结果,因此指定了一个名声不错的伪随机数引擎 `mt19937`(否则默认的伪随机数引擎 `default_random_engine` 也没什么问题)。我们只需要一个简单的随机均匀分布,因而使用了默认构造的 `uniform_int_distribution`,不给出随机数的范围,来产生所有合法整数范围内的随机数。然后,我们在长度为 `LEN` 的数组中,每一项(注意此处必须使用引用的方式来范围遍历 `rand_nums`)都写入一个随机的整数。由于我们没有对随机数引擎使用真正随机的种子来初始化,这些随机数每次都是相同的,可以保证测试的稳定性。 初始插入操作就简单了,只是把数组 `rand_nums` 里的每一项插入到 `s` 里。由于 `s` 这个变量的操作实在有点复杂,不管它是全局变量还是本地变量,编译器都不太可能把这些操作优化掉了。我们这个测试可以简单地测量总体耗时: ```cpp t1 = rdtsc(); for (int num : rand_nums) { s.insert(num); } t2 = rdtsc(); ``` 删除操作也差不多,我们仍然使用 `rand_nums` 来删除 `s` 中的每一项: ```cpp t1 = rdtsc(); for (int num : rand_nums) { s.erase(num); } t2 = rdtsc(); ``` 最后,我们再重复一遍插入的过程,看看重新插入的性能有没有变化。完整的测试代码可以看一下[代码库](https://github.com/adah1972/geek_time_cpp)。 下面是某硬件环境下的初步测试结果。 Apple Clang 12.0,macOS Catalina 10.15: > `It took 449 cycles by average to insert a number` > `It took 492 cycles by average to erase a number` > `It took 305 cycles by average to insert a number again` MSVC 19.29,Windows 10: > `It took 366 cycles by average to insert a number` > `It took 185 cycles by average to erase a number` > `It took 300 cycles by average to insert a number again` GCC 10.3,Ubuntu Linux 20.04 LTS: > `It took 307 cycles by average to insert a number` > `It took 162 cycles by average to erase a number` > `It took 176 cycles by average to insert a number again` 可以看到,使用不同的平台和编译器,结果差异比较大。但我们确实可以看到,再次插入的性能比第一次要高,在 Linux 上尤其明显。 事实上,这还只是使用默认的内存分配器的结果。使用不同的内存分配器也能获得不同的效果。比如,在 Linux 上使用 tcmalloc \[1\] 来取代默认的分配器 \[2\],我们可以得到更好的测试结果: > `It took 250 cycles by average to insert a number` > `It took 116 cycles by average to erase a number` > `It took 117 cycles by average to insert a number again` 取决于你使用的平台的内存分配器的性能,也取决于你是否需要跨平台地得到更好的内存分配性能,内存池也许对你很有用,也许对你用处不大。目前,我就假设内存池会对你有用吧(既然你已经读到这里了😁)。 ## PMR 内存池 有了测试用例之后,我们可以验证一下多态分配器([第 32 讲](https://time.geekbang.org/column/article/491227))里提供的内存池的作用了。我们只需要对测试用例做一下小修改,把 `TestType` 相关的两行改成下面这样子就行: ```cpp using TestType = pmr::unordered_set; pmr::unsynchronized_pool_resource res; pmr::polymorphic_allocator a{&res}; TestType s(a); ``` 取决于平台,你可能会看到不同的性能结果。比如,在 Linux 上我得到了下面的测试结果: > `It took 272 cycles by average to insert a number` > `It took 210 cycles by average to erase a number` > `It took 169 cycles by average to insert a number again` 性能数据有得有失。而在 macOS 和 Windows 上,我看到了更大的、全方位的性能提升。对于跨平台的应用,这样的内存池确实会有效果。 注意,我上面使用的是无多线程同步的 `unsynchronized_pool_resource`。有多线程同步的内存池就是另外一个故事了。在 Linux 上,性能反而会有下降;而在其他平台上,性能提升也很不明显。——一般而言,对于多线程的处理,通用内存分配器已经做了充足的优化,性能上可能反而会超出一般简单实现的内存池。内存池通常应该在**单线程**或**线程本地**(thread\_local)的场景使用,至少从执行时间的角度来讲是如此。 ## 自定义内存池 在[第 31 讲](https://time.geekbang.org/column/article/489409)我提到过,利用同一类型的对象的大小完全相同这一特性,可以实现一个高度优化的内存池。只是利用类特定的分配和释放函数,使用场景会比较受限。下面我会描述利用这个思路实现的一个内存池,既可以用在类特定的分配和释放函数里,也可以用在容器的分配器里。 ### 基本策略 作为内存池,最基本的要求就是减少向系统的内存分配器请求内存的次数。因此,我们希望单次内存分配就获得大块的内存(chunk),然后分割开给各个对象使用。这样的内存块,通常是某个特定大小的整数倍。 下一步,我们有两种不同的做法: 1. 任何要求某个大小(或某个大小范围)的内存分配请求都到某一个内存池里分配和释放 2. 任何要求某个特定类型的对象的内存分配请求都到某一个内存池里分配和释放 第一种做法跟 SGI STL 差不多,而第二种做法是 C++ 的内存分配机制给我们的额外优化机会。两种做法各有一些优缺点,而我目前是采取了第二种做法,主要考虑下面这些因素: * 不同类型的对象使用不同的内存池,即使它们的大小相同。在很多场景下,把同一类型的对象放在一起,程序会有更好的局域性。 * 通过对象类型可以得出对象大小,但反过来则不可以。换句话说,按我目前的方式,你可以把方案退化成为只使用对象大小,因而讲解目前这种方式更具有通用性。 我做的另外一个选择是在大部分时间里**不**返回内存给内存分配器。原因是: * 返回内存给内存分配器反而更容易导致内存碎片,导致后续内存不足或消耗更大。 * 返回内存给内存分配器,通常内存分配器也没法返回给操作系统(因为内存碎片的原因),因此并不能减少程序的运行期内存开销。 * 不返回内存给内存分配器的话,实现简单,代码更小、更快。 我的一些实验表明,内存池也很难决定什么时候返回内存给内存分配器。如果某个内存块(chunk)全空就返回的话,程序向内存分配器请求内存的次数会明显增加。目前我能想到的唯一好处,是程序的对象数量会有明显的波动的时候:在某个时刻,程序会产生大量的 A 对象,然后释放掉;在另一时刻,又会产生大量的 B 对象,然后释放掉。仅在这种情况下,我的不返回选择会增加程序的最大内存开销。目前我就暂不考虑这种特殊场景了。 ### 对象内存池 根据上面的讨论,我们需要有一个内存块的数据结构,也需要决定一个内存块里放多少个对象。我们采用一个可特化的参数来决定后者: ```cpp template inline constexpr size_t memory_chunk_size = 64; ``` 也就是说,`memory_chunk_size` 默认大小是 64,但你可以针对某一特定类型来进行特化,改变其大小。比如,你想针对你的某一特定 `Obj` 类型把大小改成 32,你可以写: ```cpp template <> inline constexpr size_t memory_chunk_size = 32; ``` 当然,一般情况下你没必要这么做。在大部分需要内存池的场景,默认大小已经工作得挺好了。 然后,我们需要定义一个数据结构,可以存放某种对象,也可以把内存块串成一个链表。显然,我们可以使用一个 union: ```cpp union node { T data; node* next; }; ``` 直接使用 `T` 类型的好处是我们可以自然地使用 `T` 类型的对齐特征,而不需要使用 `alignas` 之类的麻烦方式。不过,我们也有一些小复杂性需要解决:当 `T` 是一个带有非平凡构造函数和析构函数的对象时,上面的代码编译会有问题,因为编译器不知道在构造和析构时到底该怎么办了。我们只用这个结点来管理内存,因此我们声明空的构造函数和析构函数就好(注意,此处不能使用 `= default`)。此外,这样的内存结点显然也不应该进行复制,因此,我们最好要禁用拷贝构造函数和拷贝赋值运算符。 ```cpp union node { T data; node* next; node() {} ~node() {} node(const node&) = delete; node& operator=(const node&) = delete; }; ``` 然后,我们就可以定义出内存块了: ```cpp template class memory_chunk { public: union node { … }; memory_chunk( memory_chunk* next_chunk); node* get_free_nodes() { return storage_.data(); } memory_chunk* get_next() const { return next_chunk_; } private: memory_chunk* next_chunk_{}; array> storage_; }; ``` 内存块就是结点的数组,加上指向下一个内存块的指针,来把内存块串成一个链表。我们通过构造函数来初始化内存块: ```cpp template memory_chunk::memory_chunk( memory_chunk* next_chunk) : next_chunk_(next_chunk) { for (size_t i = 0; i < storage_.size() - 1; ++i) { storage_[i].next = &storage_[i + 1]; } storage_[storage_.size() - 1] .next = nullptr; } ``` “下一个”内存块的指针由外部传入。对于结点的数组,我们使每个结点的 `next` 指针指向下一项;除了最后一项,其 `next` 指针为空。也就是说,我们把内存块串成了一个链表,供后面内存池来使用。 有了这些原料,我们的内存池就可以很方便地写出来了。类的定义如下: ```cpp template class memory_pool { public: using node = typename memory_chunk::node; memory_pool() = default; memory_pool(const memory_pool&) = delete; memory_pool& operator=( const memory_pool&) = delete; ~memory_pool(); T* allocate(); void deallocate(T* ptr); private: node* free_list_{}; memory_chunk* chunk_list_{}; }; ``` 可以看到,内存池对象只有两个成员变量,`free_list_` 和 `chunk_list_`,及三个成员函数,析构函数、`allocate` 和 `deallocate`。`free_list_` 是空闲结点的链表,`chunk_list_` 是所有内存块的链表。而在三个成员函数里,析构函数的意义是负责释放所有的内存块: ```cpp template memory_pool::~memory_pool() { while (chunk_list_) { memory_chunk* chunk = chunk_list_; chunk_list_ = chunk_list_->get_next(); delete chunk; } } ``` `allocate` 负责内存的分配: ```cpp template T* memory_pool::allocate() { if (free_list_ == nullptr) { chunk_list_ = new memory_chunk( chunk_list_); free_list_ = chunk_list_->get_free_nodes(); } T* result = &free_list_->data; free_list_ = free_list_->next; return result; } ``` 我们首先检查空闲列表 `free_list_` 是否为空,为空则说明内存池里已经没有内存供对象使用,因此我们需要新申请一个内存块,然后让 `chunk_list_` 指向这个新内存块,并让 `free_list` 指向其首项。随后,分配内存只是简单地从结点链表上摘下一项,并调整链表的首项指针。 `deallocate` 当然就是负责内存的释放: ```cpp template void memory_pool::deallocate(T* ptr) { auto free_item = reinterpret_cast(ptr); free_item->next = free_list_; free_list_ = free_item; } ``` 这就更简单了,就是把用户传进来的指针当成结点的指针,然后放回到空闲列表里而已。 顺便说一句,对于调整链表这样的操作,标准库提供的 `std::exchange` 工具可以让代码更加简洁。比如,`allocate` 的最后三条语句可以缩成一条:`return &exchange(free_list_, free_list_->next)->data;`。 ### 内存池应用:类特定的分配和释放函数 虽然类特定的分配和释放函数已经不那么经常使用,我们还是可以看一下如何把内存池用到这一最简单的应用场景中。这也可以让我们测一下这种极端情况下的内存池收益。 之前提过,对于某一个类 `Obj`,我们要使用类特定的分配和释放函数,只需在其中声明这样两个成员函数: ```cpp class Obj { public: … void* operator new(size_t size); void operator delete( void* ptr) noexcept; }; ``` 这里我省去了声明前的 `static`,这是允许的,效果相同(不管写不写 `static`,这两个成员函数都是静态的)。我们可以在这个类的实现文件(非头文件)里加入下面的内容即可使用内存池: ```cpp memory_pool obj_pool; void* Obj::operator new(size_t size) { assert(size == sizeof(Obj)); return obj_pool.allocate(); } void Obj::operator delete( void* ptr) noexcept { obj_pool.deallocate( static_cast(ptr)); } ``` 对于这样的对象,及没有类特定的分配和释放函数的对象,分别做大量的 `new` 和 `delete` 操作,我在 Linux(默认分配和释放性能最好的主流平台)上得到: > `107 cycles for each allocation and deallocations on normal Obj` > `8 cycles for each allocation and deallocations on pooled Obj`\\ 不使用内存池时平均每次分配和释放耗时 107 个时钟周期,使用内存池则降为 8 个时钟周期。 如果使用 tcmalloc,区别就小一点了: > `27 cycles for each allocation and deallocations on normal Obj` > `8 cycles for each allocation and deallocations on pooled Obj`\\ 不使用内存池也只要 27 个时钟周期。 ### 内存池应用:分配器 上面的测试可以让我们看到内存池带来的收益会有多大,但手工使用 `new` 和 `delete` 早就已经不是推荐的做法了。最常见的情况,我们需要把对象放在容器里面。因此,我们需要让分配器支持内存池。 除了我们定义分配器需要的那些必要定义外,我们需要定义的核心成员函数是 `allocate` 和 `deallocate`。实现的示意如下: ```cpp template memory_pool& get_memory_pool() { thread_local memory_pool pool; return pool; } template > struct pooled_allocator : private Base { … T* allocate(size_t n) { if (n == 1) { return get_memory_pool() .allocate(); } else { return Base::allocate(n); } } void deallocate(T* p, size_t n) { if (n == 1) { return get_memory_pool() .deallocate(p); } else { return Base::deallocate(p, n); } } }; ``` 也就是说,对于每一种特定类型 `T`,我们都有一个专属的线程本地内存池。这个内存池会在首次使用被创建,在线程退出时被销毁。 在 `allocate` 和 `deallocate` 函数里,我们首先检查需要分配或释放的对象个数。当前的实现不能处理超过单个对象大小的分配和释放,因此这样的请求会直接转到基类的内存分配器进行处理, 默认情况下是系统的 `std::allocator`,它会使用 `operator new` 和 `operator delete` 来进行分配和释放。我们仅针对单个对象的内存分配和释放使用线程本地内存池,因此这个分配器适合 `list`、`map`、`set` 这样的对元素单独分配内存的容器,而不适合 `vector`、`deque` 这样的批量分配内存的容器。——后者实际上也基本没有使用内存池的必要了。 使用这个内存池很简单,把容器的 `Allocator` 模板参数设成目前实现的 `pooled_allocator` 即可。使用之前的测试,我们需要把 `TestType` 定义成下面的形式: ```cpp using TestType = unordered_set< int, hash, equal_to, pooled_allocator>; ``` 由于 `Allocator` 是最后一个参数,我们必须把之前类模板的默认模板参数也手工补上,也就是 `hash` 和 `equal_to` 这两个。这样做一下简单修改之后,我们就能看到测试的性能提升。在 Linux 上我得到了: > `It took 199 cycles by average to insert a number` > `It took 112 cycles by average to erase a number` > `It took 110 cycles by average to insert a number again` 确实是迄今为止最好的性能测试结果! ### 生命周期陷阱 好吧,我撒了个小谎。如果你原封不动按我目前给出的代码来自己实现一遍的话,你很可能看到程序在退出时挂起或崩溃。问题是这样发生的: 1. 我们有一个全局对象,在构造时会把它的析构函数调用挂到程序退出时需要执行的代码中。 2. 在这个全局对象首次需要内存时,我们会初始化内存池的实例。同时,它的析构函数会挂到线程退出需要执行的代码中。注意这比第 1 步要晚。 3. 内存池析构会发生在全局对象析构之前(即使它们都是全局对象或者都是线程本地对象,也一定是后构造的先析构),它会释放所有的内存。 4. 在全局对象析构时,如果有任何读写之前分配的堆上内存的操作,都是未定义行为! 那么问题如何解决呢?我们可以选择以下几种方式: * 确保内存池的构造先于全局对象的构造。把全局对象改成 thread\_local 是一件简单的事(或者如果我们只需要单线程操作的话,可以把 `get_memory_pool` 里的 `thread_local` 改成 `static`),但问题是,内存池实例的类型是实现定义的,很难预料。对于我们的 `unordered_set`,真正需要实例化的内存池类型可能是 `pooled_allocator>`,并且会随编译器不同而不同! * 在线程退出时不释放内存。问题是,如果我们重复起停线程的话,就会有内存泄漏了。只有在我们起的线程数量固定的情况下,这种方法才可行。 * 不使用全局对象或线程本地对象,而是只使用本地对象。这当然对程序是一种限制…… 很遗憾,似乎真没有完美的解决方案!你只能根据你的实际使用场景,选择其中最合适的一种了。 [代码库](https://github.com/adah1972/geek_time_cpp)本讲的最后一个测试用例展示了预声明内存池的做法,如果你真有这样的需要,可以参考一下。 ## 内容小结 在本讲中我们完整讨论了内存池,包括它的测试和实现。你在学完这一讲之后,应该已经对内存池有了充分的了解,知道什么情况下、该如何去实现一个内存池。 ## 课后思考 请练习一下本讲的代码,复现内存池生命周期的问题,并尝试使用不同的解决方案。然后: 1. 尝试一下,在 `memory_pool` 中加入对象计数,并在析构时检查对象数是否为零,仅在为零时才释放内存。 2. 考虑一下,如何发现你的容器结点究竟是什么类型? 如果有任何问题,欢迎留言和我进行讨论。 ## 参考资料 \[1\] Google, “gperftools”. [https://github.com/gperftools/gperftools](https://github.com/gperftools/gperftools) \[2\] GNU, “The GNU allocator”. [https://www.gnu.org/software/libc/manual/html\_node/The-GNU-Allocator.html](https://www.gnu.org/software/libc/manual/html_node/The-GNU-Allocator.html)