0


高并发内存池(学会了拿捏面试官)

高并发内存池

项目介绍

当前项目是实现一个高并发的内存池,原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。

什么是内存池

1.池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率

2.内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

3.内存池主要解决的问题

这一点是我们最关心的,也是为什么会有这个东西的原因。

内存池主要解决的是效率的问题。其次站在系统的内存分配器的角度,还需要解决一下内存碎
片的问题。那什么是内存碎片?
在这里插入图片描述
内存碎片分为外碎片和内碎片。上面是外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片问题,在后面项目中就会看到。

4.malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。

在这里插入图片描述

设计一个自己的定长内存池(自己开超市)

作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能

先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。

在这里插入图片描述
在这里插入图片描述

定长内存池说明:当需要一块内存空间的时候,我们先到定长内存池中取内存对象(开始freelist为空)
1、先向系统申请一块很大的内存空间,这里我们申请的是128k,
2、从定长内存池中取走一块T类型大小的内存对象
3、释放的时候,直接归还给自由链表,下一次取内存对象的话先从自由链表freelist中取,如果freelist为空,在去定长内存池中取。

需要注意的是,自由链表中指向下一个结点的地址,这里取类型的转换,32位下是没有问题的,能取到4个字节,但是64位下,就存在问题了。
在这里插入图片描述
所以我们这里这里可以将 int* 改为 void,32位和64位都是void*(指针,改成int一样的)大小,指针会变化,这样64位下也可以取到下一个结点地址。

具体代码如下:

template<classT>classObjectPool{public://开辟一块内存池
    T*New(){
        T* obj =nullptr;//如果自由链表上面有内存块,则先从自由链表上面取if(_freeList){
            obj =(T*)_freeList;
            _freeList =*((void**)_freeList);//指向下一个内存块//_freeList = *((int*)_freeList);}else//从定长内存池中取一块内存块{//如果定长内存池中剩余字节连一个类型都无法满足,此时需要重新向系统申请一块内存if(_leftBytes <sizeof(T)){
                _leftBytes =128*1024;//                _memory = (char*)malloc(_leftBytes);
                _memory =(char*)SystemAlloc(_leftBytes >>13);//有可能开辟失败if(_memory ==nullptr){throw std::bad_alloc();}}//取一块内存
            obj =(T*)_memory;//_memory向后移动一个T类型字节,确保能取到下一个结点地址
            size_t objSize =sizeof(T)<sizeof(void*)?sizeof(void*):sizeof(T);
            _memory += objSize;//向后移动一个内存块
            _leftBytes -= objSize;//内存池减少一个内存块}new(obj) T;//使用定位new调用T的构造函数初始化return obj;}//不需要的内存给自由链表voidDelete(T* obj){
        obj->~T();//显示调用T类型的析构函数进行资源清理//obj指向第一个结点,头指针freelist指向obj*((void**)obj)= _freeList;//*((int*)obj) = _freeList;
        _freeList = obj;}private:char* _memory =nullptr;//指向内存块的指针int _leftBytes =0;//内存块中剩余字节void* _freeList =nullptr;//自由链表};

高并发内存池框架整体设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题

  1. 性能问题。
  2. 多线程环境下,锁竞争问题。
  3. 内存碎片问题。

concurrent memory pool主要由以下3个部分构成:

  1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁每个线程独享一个cache,这也就是这个并发线程池高效的地方
  2. central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈
  3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题在这里插入图片描述

高并发内存池–Thread Cache(第一层)

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的
在这里插入图片描述
申请内存

  1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标index。
  2. 如果自由链表_freeLists[index]中有对象,则直接Pop一个内存对象返回。
  3. 如果_freeLists[index]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。

我们先处理自由链表,将映射关系建立好,自由链表的哈希桶跟对象大小的映射关系:

哈希桶

上面我们可以看到Thread Cache 从8bit、16bit…256KB增长的,这样的话我们就需要32768个桶,32768个桶是可以申请的,但是我们没有必要申请这么多,这样产生的自由链表太多,减少浪费。
在这里插入图片描述
我们来看看大佬的设计
内存对齐

 整体控制在最多10%左右的内碎片浪费    129+1515/144~10%
 为什么不8个位去对齐?这样产生的自由链表太多,减少浪费
 [1,128]8byte对齐 freelist[0,16)[128+1,1024]16byte对齐 freelist[16,72)   (1024-128+1)/16,增加56个链表,区间[16,72][1024+1,8*1024]128byte对齐 freelist[72,128)[8*1024+1,64*1024]1024byte对齐 freelist[128,184)[64*1024+1,256*1024]8*1024byte对齐 freelist[184,208)staticinline size_t _RoundUp(size_t bytes, size_t alignNum){
    第一种写法
    size_t alignSize =0;if(bytes % alignNum !=0){
        alignSize =(bytes / alignNum +1)/ alignNum;//+1向上提升,不足8个字节,需要对齐}else{
        alignSize = bytes;}return alignSize;//7 + 8 - 1//&//7//1110//1000//==8
    第二种写法
    return((bytes + alignNum -1)&~(alignNum -1));}//对齐数划分区间staticinline size_t RoundUp(size_t size){if(size <=128){return_RoundUp(size,8);}elseif(size <=1024){return_RoundUp(size,16);}elseif(size <=8*1024){return_RoundUp(size,128);}elseif(size <=64*1024){return_RoundUp(size,1024);}//256kb到thread cache的头elseif(size <=256*1024){return_RoundUp(size,8*1024);}else{return_RoundUp(size,1<< PAGE_SHIFT);//return -1;//超过,返回-1}}

对象大小

哈希映射位置,计算映射的那个一个自由链表桶
    1+7829...815staticinline size_t _Index(size_t bytes, size_t align_shift){
    第一种写法
    if(bytes % align_shift ==0){return bytes / align_shift -1;//16/8 - 1 = 1}else{return bytes / align_shift;//15 / 8 = 1}//16 + 1<<3 -1  = 23 >> 3 -1 = 2 - 1 = 1
    第二种写法
    return((bytes +(1<< align_shift)-1)>> align_shift)-1;}staticinline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);//不能超过桶大小staticint group_array[4]={16,56,56,56};if(bytes <=128){return_Index(bytes,3);}elseif(bytes <=1024){return_Index(bytes -128,4)+ group_array[0];//按照与算,需要-128,在加上一个区间有多少个自由链表。}elseif(bytes <=8*1024){return_Index(bytes -1024,7)+ group_array[0]+ group_array[1];}elseif(bytes <=64*1024){return_Index(bytes -8*1024,10)+ group_array[0]+ group_array[1]+ group_array[2];}elseif(bytes <=256*1024){return_Index(bytes -64*1024,13)+ group_array[0]+ group_array[1]+ group_array[2]+ group_array[3];}else{assert(false);return-1;}}

ThreadCache中的自由链表(FreeList)

FreeList这里是一个单链表,维护着申请和释放内存对象的关系。

//管理每个小对象的自由链表classFreeList{public://头插voidPush(void* obj){assert(obj);NextObj(obj)= _freelist;
        _freelist = obj;++_size;}//插入一段内存voidPushRange(void* start,void* end, size_t n){NextObj(end)= _freelist;//指向第一个结点
        _freelist = start;//指向新插入的内存块的第一个结点
        _size += n;//链表长度}//释放voidPopRange(void* start,void* end, size_t n){assert(n >= _size);
        start = _freelist;
        end = start;//释放n个内存块到Central Cache中for(size_t i =0; i < n -1;++i){
            end =NextObj(end);}
        _freelist =NextObj(end);NextObj(end)=nullptr;
        _size -= n;//链表长度-n}//借走一块内存块void*Pop(){assert(_freelist);//确保链表不为空void* obj = _freelist;
        _freelist =NextObj(obj);--_size;return obj;}boolEmpty(){return _freelist ==nullptr;}//需要修改,引用
    size_t&MaxSize(){return _maxsize;}//链表长度
    size_t Size(){return _size;}private:void* _freelist =nullptr;//头指针
    size_t _maxsize =1;//慢增长的大小
    size_t _size =0;//链表的长度};

其中的nextObj函数是取的指针obj的四个字节,指向一个结点,我们这里以头插举例:
在这里插入图片描述

//下一个结点地址staticvoid*&NextObj(void* obj){return*((void**)obj);}

ThreadCache框架

  1. thread cache本质是由一个哈希映射的对象自由链表构成。
//封装了一层classThreadCache{public:void*Allocate(size_t size);//申请对象voidDeallocate(void* ptr, size_t size);//释放对象// 从中心缓存获取对象void*FetchFromCentralCache(size_t index, size_t size);// 释放对象时,链表过长时,回收内存回到中心缓存voidListTooLong(FreeList& list, size_t size);private://自由链表
    FreeList _freeList[FREELISTNUM];//开辟208个};// TLS thread local storage
线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
static_declspec(thread) ThreadCache* pTLSThreadCache =nullptr;

线程局部存储说明(TLS):

  1. 线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。所以threadcache中是不需要加锁的
  2. 而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。

threadcache中申请的是小于256kb的内存对象,且有208个桶,我们这里定义出来

staticconst size_t MAX_BYTES =256*1024;//256kb  thread cache这一层,每个线程独享staticconst size_t FREELISTNUM =208;//freelist的表大小

申请内存空间

void*ThreadCache::Allocate(size_t size){assert(size <= MAX_BYTES);//小于最大位数
    size_t alignSize =SizeClass::RoundUp(size);//获取对齐数
    size_t index =SizeClass::Index(size);//获取映射位置//如果自由链表不为空,直接弹出一个对象if(!_freeList[index].Empty()){return _freeList[index].Pop();}else{//没有对象,从central cache层获取一定数量对象,插入到自由链表中并返回一个对象。returnFetchFromCentralCache(index, alignSize);//这里的size注意是对齐后的}}

从中心缓存获取内存对象

慢增长方法,同网络的拥塞控制,开始需要1个给1个,后面1个给2个,2个给3个,慢慢增长,避免浪费内存。

void*ThreadCache::FetchFromCentralCache(size_t index, size_t size){//1、最开始不会向central catch要太多对象,要多了用不完,比如需要8个字节,结果给了500个对象,但实际只用了50个,就存在浪费的情况//2、如果不需要size这个大小的内存需求,batchNum就会一直增大//3、size越小,获取到的batchNum越大//4、size越大,获取到的batchNum越小
    size_t batchNum =min(_freeList[index].MaxSize(),SizeClass::NumMoveSize(size));//取最小值//保证最小取到一个对象    要1个,给2个,1个给3个,依次递增,同拥塞控制if(_freeList[index].MaxSize()== batchNum){
        _freeList[index].MaxSize()+=1;}//去central catch中获取batchNum个对象,有多少取多少void* start =nullptr;void* end =nullptr;//获取对象              不要忘记这里使用的是单例模式,注意调用方法
    size_t actualNum =CentralCatch::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum >0);//确保申请成功if(actualNum ==1){assert(start == end);//一个的时候这两个相等return start;}else{
        _freeList[index].PushRange(NextObj(start), end, actualNum -1);//第一个对象返回,后面的链接回thread cache中return start;}}

将内存对象控制在[2,512]这个区间,一次thread cache从中心缓存获取多少个对象

static size_t NumMoveSize(size_t size){assert(size >0);//大于0//[2, 512] ,一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 大对象一次批量上限低int num = MAX_BYTES / size;if(num <2)
            num =2;if(num >512)
            num =512;//最小为2,最大为512,控制在这个区间return num;}

计算一次向系统获取几个页,后面PageCache中申请页的函数

// 计算一次向系统获取几个页// 单个对象 8byte// ...// 单个对象 256KBstatic size_t NumMovePage(size_t size){
    size_t num =NumMoveSize(size);
    size_t npage = num * size;//32  *  8   获取8个字节 1页,

    npage >>= PAGE_SHIFT;if(npage ==0)
        npage =1;return npage;}

高并发内存池–Central Cache(第二层)

central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
在这里插入图片描述
申请内存

  1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率
  2. central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
  3. central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count

CentralCache中的双向链表(SpanList)

SpanList是一个双向链表,每个结点又是一个span结构体,span结构体中又包含个多个成员变量,其中包含一个自由链表freelist。

在这里插入图片描述

structSpan{
    PAGE_ID _pageId =0;// 大块内存起始页的页号   表示每个桶起始位置
    size_t _n =0;// 页的数量

    Span* _next =nullptr;// 双向链表的结构
    Span* _prev =nullptr;

    size_t _objSize =0;//切好的小块对象的大小
    size_t _useCount =0;// 切好小块内存,被分配给thread cache的计数bool _isUse =false;//此块内存是否在被使用void* _freeList =nullptr;// 切好的小块内存的自由链表};//central catch的spanlist自由链表,双向链表,好方便查找翻页classSpanList{public://构造初始化SpanList(){
        _head =new Span;
        _head->_next = _head;
        _head->_prev = _head;}
    Span*Begin(){return _head->_next;}
    Span*End(){return _head;//就是head}boolEmpty(){return _head->_next == _head;//空,老毛病,注意== 和 = }
    Span*PopFront(){
        Span* front = _head->_next;//第一个结点Erase(front);return front;}voidPushFront(Span* span){Insert(Begin(), span);}//在pos之前插入newspan,跟顺序表插入逻辑一样voidInsert(Span* pos, Span* newSpan){//首先检查断言assert(pos);assert(newSpan);
        Span* prev = pos->_prev;
        prev->_next = newSpan;
        newSpan->_prev = prev;
        newSpan->_next = pos;
        pos->_prev = newSpan;}voidErase(Span* pos){assert(pos);assert(pos != _head);//头指针
        Span* next = pos->_next;
        Span* prev = pos->_prev;
        prev->_next = next;
        next->_prev = prev;//这里为什么没有delete,不用了就归还给自由链表,需要从自由链表取即可,objectpool同理的}private:
    Span* _head;//spanlist自由链表public:
    std::mutex _mtx;//桶锁,防止一个进程在获取时,例外一个进程也进程竞争资源};

CentralCache框架

//单例模式,只能存在一个central catch对象classCentralCatch{public://只能生成一个对象,饿汉模式,开始就创建对象static CentralCatch*GetInstance(){return&_sInst;}// 获取一个非空的span
    Span*GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cache
    size_t FetchRangeObj(void*& start,void*& end, size_t batchNum, size_t size);// 将一定数量的对象释放到span跨度voidReleaseListToSpans(void* start, size_t byte_size);private:CentralCatch(){};CentralCatch(const CentralCatch&)=delete;//禁止生成static CentralCatch _sInst;//静态对象private:
    SpanList _spanLists[FREELISTNUM];//映射规则跟thread catch一样};

从中心缓存获取一定数量的对象给Thread Cache

1、先根据获取对应的映射位置
2、开启桶锁,保护临界资源,只让一个线程进行访问
3、从central cache中获取对应的span,需要几个batchNum就拿几个内存对象,如果不够batchNum有多少个拿多少个,对应SpanList中的usecount也要记录借了多少个内存对象出去
4、对拿到的batchNum个对象链接成自由链表,返回给threadcache
5、解锁,防止死锁

对拿到的span我们进行一个切割
在这里插入图片描述

// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCatch::FetchRangeObj(void*& start,void*& end, size_t batchNum, size_t size){
    size_t index =SizeClass::Index(size);
    _spanLists[index]._mtx.lock();//锁住//获取一个span
    Span* span =GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);//span要存在,并且下面挂有小块内存的自由链表// 从span中获取batchNum个对象// 如果不够batchNum个,有多少拿多少
    start = span->_freeList;//头
    end = start;//初始都指向头结点//然后往后走取batchNum个对象
    size_t i =0;
    size_t actualNum =1;//这里要保证取到这么多,并且下一个结点不为空,才能继续往下走while(i < batchNum -1&&NextObj(end)!=nullptr){
        end =NextObj(end);++i;++actualNum;}
    span->_freeList =NextObj(end);NextObj(end)=nullptr;//end的下一个结点就指向空  end->next = nullptr
    span->_useCount += actualNum;//记录分配了多少个对象出去
    _spanLists[index]._mtx.unlock();//解锁,该线程不使用时候解锁,避免死锁return actualNum;}

获取一个非空的span

1、先判断自己的SpanList中有没有一个span中还有内存对象,有就先返回
2、对应上一个FetchRangeObj函数,这里需要解锁,防止其他线程内存对象释放回来,存在阻塞的情况
3、到这一步,我们只能去向Page Cache中获取一个非空的span,拿到对应的起始页号
4、将大块内存span切割成自由链表,放入Central Cache中挂起(写入这时需要加锁)

将从Page Cache中拿到的span大块内存,进行一个切割放Central Cache中挂起
在这里插入图片描述

Span*CentralCatch::GetOneSpan(SpanList& list, size_t byte_size){//1、查看CentralCache中有没有内存对象,有就先取
    Span* it = list.Begin();while(it != list.End()){//当前it的小块内存自由链表中不为空,直接返回if(it->_freeList !=nullptr){return it;}else{
            it = it->_next;}}
    list._mtx.unlock();//解锁原因,如果其他线程释放内存对象回来,就不会存在阻塞了//2、走到这里,central catch中内存也空的,继续向page catch中去找内存PageCache::GetInstance()->_pageMtx.lock();//先要锁住,防止其他线程进入
    Span* span =PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));//获取几个页
    span->_isUse =true;//正在使用
    span->_objSize = byte_size;PageCache::GetInstance()->_pageMtx.unlock();// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span // 对获取到的span进行切分//起始地址和内存块大小char* start =(char*)(span->_pageId << PAGE_SHIFT);//计算该span的起始页号
    size_t bytes = span->_n << PAGE_SHIFT;//计算该span的字节大小char* end = start + bytes;//末尾地址//将大块内存span切割成自由链表,放入central catch中挂起//取个头结点下来   方便进行尾插
    span->_freeList = start;
    start += byte_size;void* tail = span->_freeList;while(start < end){NextObj(tail)= start;
        tail =NextObj(tail);//tail = start
        start += byte_size;}//tail记得置空NextObj(tail)=nullptr;

    list._mtx.lock();//span挂到桶里面的时候,需要加锁
    list.PushFront(span);return span;}

高并发内存池–Page Cache(第三层)

在这里插入图片描述
申请内存:

  1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
  2. 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
  3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存

PageCache代码框架

1、page cache是一个以页为单位的span自由链表。
2、为了保证全局只有唯一的page cache,这个类被设计成了单例模式。

//单例模式,同central catch一样classPageCache{public:static PageCache*GetInstance(){return&_sInst;}// 获取一个K页的span
    Span*NewSpan(size_t k);// 获取从对象到span的映射
    Span*MapObjectToSpan(void* obj);// 释放空闲span回到Pagecache,并合并相邻的spanvoidReleaseSpanToPageCache(Span* span);
    std::mutex _pageMtx;private:
    SpanList _spanLists[NPAGES];
    ObjectPool<Span> _spanPool;//用定长内存池替代new、delete//std::unordered_map<PAGE_ID, Span*> _isSpanMap;//内存对象跟span的映射关系
    std::map<PAGE_ID, Span*> _isSpanMap;//map更易观察//TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;//使用基数树优化PageCache(){}PageCache(const PageCache&)=delete;static PageCache _sInst;//静态对象};

页号跟span的映射关系(方便释放内存对象回来查找对应位置)

1、使用map建立页号跟span的映射关系

// 获取从对象到span的映射//start - end  都是span
Span*PageCache::MapObjectToSpan(void* obj){
    PAGE_ID id =((PAGE_ID)obj >> PAGE_SHIFT);//右移13位,找到对应的id
    std::unique_lock<std::mutex>lock(_pageMtx);//加锁,RAII,出了作用域,自己解锁auto ret = _isSpanMap.find(id);//查找对应的span//去查找对应的spanif(ret != _isSpanMap.end()){//找到返回,没找到返回nullptrreturn ret->second;}else{assert(false);returnnullptr;}}

获取一个K页的span

1、先检查对应映射位置的桶里面是否还有span,有则弹出span、并建立页号跟span的映射关系。
2、如果没有则继续往后面的桶里面寻找span,若有则切割该span。
3、如果Page Cache中都没有span,则向堆去申请,然后递归调用自己重复1、2步骤。

这里作图说明:

Span*PageCache::NewSpan(size_t k){assert(k >0);//检查//检查第k个桶里面有没有span,有就弹出if(!_spanLists[k].Empty()){
        Span* kSpan = _spanLists[k].PopFront();//切割第一个span返回for(PAGE_ID i =0; i < kSpan->_n;++i){
            _isSpanMap[kSpan->_pageId + i]= kSpan;//建立映射关系,方便central cache回收内存时进行查找。}return kSpan;}//该页,没有span,则继续往后面的页进行寻找
    size_t i = k +1;for(; i < NPAGES;++i){//如果此页不为空,进行切割,比如需要4页的,但是10页才有,切割成一个6页的和一个4页的if(!_spanLists[i].Empty()){
            Span* nSpan = _spanLists[i].PopFront();//弹出第一个span1、使用定长内存池的new
            Span* kSpan =new  Span;//在nspan的头部切下k页下来
            kSpan->_pageId = nSpan->_pageId;
            kSpan->_n = k;//页数为k

            nSpan->_pageId += k;//往后走k页,起始地址往后移动
            nSpan->_n -= k;//减去k页

            _spanLists[nSpan->_n].PushFront(nSpan);//npages插入n号桶中// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时// 进行的合并查找//start 到 end  都是span
            _isSpanMap[nSpan->_pageId]= nSpan;
            _isSpanMap[nSpan->_pageId + nSpan->_n -1]= nSpan;// 建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor(PAGE_ID i =0; i < kSpan->_n;++i){
                _isSpanMap[kSpan->_pageId + i]= kSpan;}return kSpan;//返回central catch中使用}}//到这里,说明page catch中也没用多余的span了//这时就要去向堆中申请一个128页的span
    Span* bigSpan =new Span;void* ptr =SystemAlloc(NPAGES -1);
    bigSpan->_pageId =(PAGE_ID)ptr >> PAGE_SHIFT;//8 * 1024右移算页号,右移算id  8kb
    bigSpan->_n = NPAGES -1;

    _spanLists[bigSpan->_n].PushFront(bigSpan);
    递归重复找
    returnNewSpan(k);//可以迭代下去,递归更好,不存在效率问题,现在计算机很快}

在这里插入图片描述

windows和Linux下如何直接向堆申请和释放页为单位的大块内存

首先这里需要注意一下自己是在32位还是64位
在这里插入图片描述
在这里插入图片描述
选择机器位

#ifdef_WIN64typedefunsignedlonglong PAGE_ID;#elif_WIN32typedef size_t PAGE_ID;#else// linux#endif

向系统申请内存空间

// 匹配系统    向堆申请空间inlinestaticvoid*SystemAlloc(size_t kpage){#ifdef_WIN32void* ptr =VirtualAlloc(0, kpage <<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);//系统调用接口函数#else// linux下brk mmap等
    小于128k,_brk是将数据段(.data)的最高地址指针_edata往高地址推;
    大于128k,mmap是在进程的虚拟地址空间中(堆和栈中间,称为文件映射区域的地方)找一块空闲的虚拟内存。
    详细可参考对应链接:https://www.cnblogs.com/vinozly/p/5489138.html#endifif(ptr ==nullptr)throw std::bad_alloc();return ptr;}

释放内存空间

inlinestaticvoidSystemFree(void* ptr){#ifdef_WIN32VirtualFree(ptr,0, MEM_RELEASE);#else// sbrk unmmap等#endif}

释放内存整体框架

Threadcache释放内存

1、当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
2、当链表的长度过长,则回收一部分内存对象到central cache。

对第二条单独说明一下

  1. 每次申请完的内存对象释放回threadcache中,如果链表长度大于申请的内存时,说明此时用不到这么多内存对象,我们就需要还一部分给中心缓存centralcache

在这里插入图片描述

//1. 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。//2. 当链表的长度过长,则回收一部分内存对象到central cache。voidThreadCache::Deallocate(void* ptr, size_t size){assert(ptr);//需要判断是否有效assert(size <= MAX_BYTES);
    size_t index =SizeClass::Index(size);//获取对应索引位置
    _freeList[index].Push(ptr);//释放h//每次申请,用了又反给central cache   直到链表长度大于一次批量申请的内存//比如申请100个,用了1个,还99个,申请100个,用了50个,还50个,申请50个,此时链表长度51大于50个,就应该回收一部分内存对象给central cacheif(_freeList[index].Size()>= _freeList[index].MaxSize()){ListTooLong(_freeList[index], size);}}

链表过长,回收内存到中心缓存

//链表过长,回收内存到中心缓存voidThreadCache::ListTooLong(FreeList& list, size_t size){void* start =nullptr;void* end =nullptr;//取下内存对象,准备放回中心缓存
    list.PopRange(start, end, list.MaxSize());//释放回span中CentralCatch::GetInstance()->ReleaseListToSpans(start, size);}

Centralcache释放内存

  1. 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并

在这里插入图片描述

// 将一定数量的对象释放到span跨度voidCentralCatch::ReleaseListToSpans(void* start, size_t byte_size){
    size_t index =SizeClass::Index(byte_size);//获取哈希桶对应位置
    _spanLists[index]._mtx.lock();//归还内存对象时,禁止其他线程进入,加锁while(start){void* next =NextObj(start);//准备进行递归//根据页号和span的映射关系找到对应的span
        Span* span =PageCache::GetInstance()->MapObjectToSpan(start);//头插回去NextObj(start)= span->_freeList;
        span->_freeList = start;
        span->_useCount--;//归还一个-1,直到为0,归还给central cache//如果为0,则所有的小块内存都回来了//此时这个span就回收给page cacheif(span->_useCount ==0){
            centralcache中的SpanList对应的位置要清空该span,并将指针指向修改为nullptr
            _spanLists[index].Erase(span);
            span->_freeList =nullptr;
            span->_next =nullptr;
            span->_prev =nullptr;//都清空//释放给Page cache,需要使用page cache锁,解除spanlist的桶锁
            _spanLists[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);//归还给page cache,一层套一层PageCache::GetInstance()->_pageMtx.unlock();//完了之后重新加上
            _spanLists[index]._mtx.lock();}
        start = next;}
    _spanLists[index]._mtx.unlock();//解锁}

Pagecache释放内存

  1. 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

三种不合并情况:
1、没找到不合并
2、找到了,但是正在被其他线程所使用,不合并
3、前后页的span的页数加起来超过128页,也不合并,因为无法管理
在这里插入图片描述

voidPageCache::ReleaseSpanToPageCache(Span* span){//向前合并while(1){
        PAGE_ID prevId = span->_pageId -1;//前一个span的地址auto ret = _isSpanMap.find(prevId);//查找前一个span存不存在//前面没有页,不合并if(ret == _isSpanMap.end()){break;}//前面相邻的span在使用,不合并
        Span* prevSpan = ret.second;if(prevSpan->_isUse ==true){break;}//前后页的span合并 超过128页,也不合并(无法管理)if(prevSpan->_n + span->_n > NPAGES -1){break;}//这里就要进行合并了
        span->_pageId = prevSpan->_pageId;
        span->_n += prevSpan->_n;//并将前面的span删掉
        _spanLists[prevSpan->_n].Erase(prevSpan);//delete prevSpan;
        _spanPool.Delete(prevSpan);//一直循环,就能合并所有的外碎片,减少内存碎片}//向后合并while(1){
        PAGE_ID nextId = span->_n + span->_pageId;//+上自己的页数就是下一个span的起始地址    auto ret = _isSpanMap.find(nextId);//后面没有相邻的spanif(ret == _isSpanMap.end()){break;}//找到
        
        Span* nextSpan = ret.second;if(nextSpan->_isUse ==true){break;}if(nextSpan->_n + span->_n > NPAGES -1){break;}//这里页数相加即可
        span->_n += nextSpan->_n;//删除后面的span
        _spanLists[nextSpan->_n].Erase(nextSpan);//delete nextSpan;
        _spanPool.Delete(nextSpan);}//插入进去
    _spanLists[span->_n].PushFront(span);
    span->_isUse =false;
    _isSpanMap[span->_pageId]= span;//建立映射关系
    _isSpanMap[span->_pageId + span->_n -1]= span;}

大于256KB的大块内存申请问题

1、<=256KB找三层缓存
2、大于256KB

  • a、32 * 8k < size <= 128 * 8k 找pagecache
  • b、size > 128 * 8k找系统堆

修改对应代码:

pagecache中的NewSpan函数,加在最前面

if(k > NPAGES -1){void* ptr =SystemAlloc(k);//Span* span = new Span;//创建一个新的span
    Span* span = _spanPool.New();
    span->_pageId =(PAGE_ID)ptr >> PAGE_SHIFT;//起始地址
    span->_n = k;//_isSpanMap[span->_pageId] = span;//建立映射关系
    _idSpanMap.set(span->_pageId, span);return span;}

pagecache中的ReleaseSpanToPageCache函数,如果超过128页直接释放回堆

//超过128页直接返给堆if(span->_n > NPAGES -1){//拿到内存地址void* ptr =(void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);//delete span;//释放span
    _spanPool.Delete(span);return;}

释放对象时优化为不传对象大小

直接给span增加一个成员变量 _objSize用来表示申请的内存块对象的大小

structSpan{
    PAGE_ID _pageId =0;// 大块内存起始页的页号   表示每个桶起始位置size_t _n =0;// 页的数量

    Span* _next = nullptr;// 双向链表的结构
    Span* _prev = nullptr;size_t _objSize =0;//切好的小块对象的大小size_t _useCount =0;// 切好小块内存,被分配给thread cache的计数
    bool _isUse = false;//此块内存是否在被使用void* _freeList = nullptr;// 切好的小块内存的自由链表};

申请内存和释放内存的总函数接口

申请内存的接口:

staticvoid*ConcurrentAlloc(size_t size){//如果申请的大小大于256kb,直接向page cache中去要if(size > MAX_BYTES){size_t alignSize = SizeClass::RoundUp(size);//对齐数size_t kPage = alignSize >> PAGE_SHIFT;//id//去申请内存块
        PageCache::GetInstance()->_pageMtx.lock();//锁住,防止其他进程进入,保证原子性
        Span* span = PageCache::GetInstance()->NewSpan(kPage);
        span->_objSize = size;
        PageCache::GetInstance()->_pageMtx.unlock();void* ptr =(void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if(pTLSThreadCache == nullptr){static ObjectPool<ThreadCache> tcPool;
            pTLSThreadCache = tcPool.New();//去自己写的定长内存池进行申请}return pTLSThreadCache->Allocate(size);}}

释放内存的接口:

staticvoidConcurrentFree(void* ptr){
    Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);//找到对应的spansize_t size = span->_objSize;//大于32*8k直接返给pagecacheif(size > MAX_BYTES){
        PageCache::GetInstance()->_pageMtx.lock();
        PageCache::GetInstance()->ReleaseSpanToPageCache(span);
        PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);
        pTLSThreadCache->Deallocate(ptr, size);}}

性能瓶颈分析

通过查看下面的运行结果,我们可以看到自己写的性能还是比较差的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JzX15X9P-1649085647995)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220404214024120.png)]

但是我们不知道具体的时间慢在哪里,接下来我们用VS的性能诊断工具查看一下具体的消耗时间在哪里?

查看步骤一:点击调试 ->性能探查器

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0uzggeYy-1649085647995)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220404214829089.png)]

查看步骤二:开始->点击项目->确定

在这里插入图片描述

查看步骤三:时间浪费最多的地方会显示出来,自己点击查看

在这里插入图片描述

使用tcmalloc源码中实现基数树进行优化

基数树实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等。

单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储数组中在以该页号为下标的位置。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bcUnJQ5Z-1649085647997)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220404221827408.png)]

单层基数树:

// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:staticconstint LENGTH =1<< BITS;void** array_;
    public:typedefuintptr_t Number;
    explicit TCMalloc_PageMap1(void*(*allocator)(size_t)){
    array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*)<< BITS));memset(array_,0,sizeof(void*)<< BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void*get(Number k)const{if((k >> BITS)>0){returnNULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]". k的范围在[0,2^BITS-1]// REQUIRES "k" has been ensured before.//// 建立起始页号和span的映射关系voidset(Number k,void* v){
    array_[k]= v;}};

比如32位平台下,以一页大小为8K为例,此时页的数目就是2^32 / 2^13 = 2^19 ,因此存储页号最多需要19个比特位,此时传入模板参数传来的值是32 − 13 = 19 。由于32位平台下指针的大小是4字节,因此该数组的大小就是2^19 * 4 = 2^21 =2M,内存消耗不大,是可行的。但如果是在64位平台下,此时该数组的大小是2^51 × 8 = 2^54 = 2^24 G ,这显然是不可行的,实际上对于64位的平台,我们需要使用三层基数树。

二层基数树:

这里还是以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个比特位。而二层基数树实际上就是把这19个比特位分为两次进行映射。

比如用前5个比特位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的比特位在基数树的第二层进行映射,映射后最终得到该页号对应的span指针。

在这里插入图片描述

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.staticconstint ROOT_BITS =5;//第一层对应页号的前5个比特位staticconstint ROOT_LENGTH =1<< ROOT_BITS;//第一层存储元素的个数staticconstint LEAF_BITS = BITS - ROOT_BITS;//第二层对应页号的其余比特位staticconstint LEAF_LENGTH =1<< LEAF_BITS;//第二层存储元素的个数// Leaf nodestructLeaf{void* values[LEAF_LENGTH];};

    Leaf* root_[ROOT_LENGTH];// Pointers to 32 child nodesvoid*(*allocator_)(size_t);// Memory allocator

public:typedefuintptr_t Number;

    explicit TCMalloc_PageMap2(){//allocator_ = allocator;memset(root_,0,sizeof(root_));PreallocateMoreMemory();}void*get(Number k)const{const Number i1 = k >> LEAF_BITS;const Number i2 = k &(LEAF_LENGTH -1);if((k >> BITS)>0|| root_[i1]==NULL){returnNULL;}return root_[i1]->values[i2];}voidset(Number k,void* v){const Number i1 = k >> LEAF_BITS;const Number i2 = k &(LEAF_LENGTH -1);ASSERT(i1 < ROOT_LENGTH);
        root_[i1]->values[i2]= v;}
    确保映射[start,start_n-1]页号的空间是开辟好了的
    bool Ensure(Number start,size_t n){for(Number key = start; key <= start + n -1;){const Number i1 = key >> LEAF_BITS;// 栈溢出if(i1 >= ROOT_LENGTH)return false;// 第一层i1下标指向的空间未开辟if(root_[i1]==NULL){static ObjectPool<Leaf>    leafPool;
                Leaf* leaf =(Leaf*)leafPool.New();memset(leaf,0,sizeof(*leaf));
                root_[i1]= leaf;}
            key =((key >> LEAF_BITS)+1)<< LEAF_BITS;}return true;}voidPreallocateMoreMemory(){
        分配足够的资源以跟踪所有可能的页面
        Ensure(0,1<< BITS);}};

三层基数树:

三层基数树一般在64位下使用,三层基数树与二层基数树类似,三层基数树实际上就是把存储页号的若干比特位分为三次进行映射。当我们要建立某一页号的映射关系时,需要先确保存储该页映射的数组空间是开辟好了的,也就是调用代码中的Ensure函数,如果对应数组空间未开辟则会立马开辟对应的空间(同第二层)。

// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstaticconstint INTERIOR_BITS =(BITS +2)/3;// Round-upstaticconstint INTERIOR_LENGTH =1<< INTERIOR_BITS;// How many bits should we consume at leaf levelstaticconstint LEAF_BITS = BITS -2* INTERIOR_BITS;staticconstint LEAF_LENGTH =1<< LEAF_BITS;// Interior nodestructNode{
        Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestructLeaf{void* values[LEAF_LENGTH];};

    Node* root_;// Root of radix treevoid*(*allocator_)(size_t);// Memory allocator

    Node*NewNode(){
        Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if(result !=NULL){memset(result,0,sizeof(*result));}return result;}

public:typedefuintptr_t Number;

    explicit TCMalloc_PageMap3(void*(*allocator)(size_t)){
        allocator_ = allocator;
        root_ =NewNode();}void*get(Number k)const{const Number i1 = k >>(LEAF_BITS + INTERIOR_BITS);const Number i2 =(k >> LEAF_BITS)&(INTERIOR_LENGTH -1);const Number i3 = k &(LEAF_LENGTH -1);if((k >> BITS)>0||
            root_->ptrs[i1]==NULL|| root_->ptrs[i1]->ptrs[i2]==NULL){returnNULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}voidset(Number k,void* v){ASSERT(k >> BITS ==0);const Number i1 = k >>(LEAF_BITS + INTERIOR_BITS);const Number i2 =(k >> LEAF_BITS)&(INTERIOR_LENGTH -1);const Number i3 = k &(LEAF_LENGTH -1);
        reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]= v;}

    bool Ensure(Number start,size_t n){for(Number key = start; key <= start + n -1;){const Number i1 = key >>(LEAF_BITS + INTERIOR_BITS);const Number i2 =(key >> LEAF_BITS)&(INTERIOR_LENGTH -1);// Check for overflowif(i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif(root_->ptrs[i1]==NULL){
                Node* n =NewNode();if(n ==NULL)return false;
                root_->ptrs[i1]= n;}// Make leaf node if necessaryif(root_->ptrs[i1]->ptrs[i2]==NULL){
                Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if(leaf ==NULL)return false;memset(leaf,0,sizeof(*leaf));
                root_->ptrs[i1]->ptrs[i2]= reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf node
            key =((key >> LEAF_BITS)+1)<< LEAF_BITS;}return true;}voidPreallocateMoreMemory(){}};

优化代码实现

pagecache.h头文件

//单例模式,同central catch一样
class PageCache
{
private://std::map<PAGE_ID, Span*> _isSpanMap;//map更易观察
    TCMalloc_PageMap1<32- PAGE_SHIFT> _idSpanMap;//使用基数树优化};

起始页号跟span映射关系修改

//_isSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);

查找起始页号对应的span

auto ret =(Span*)_idSpanMap.get(prevId);

此时,我们在来观察一下运行结果,可以看到自己写的比系统的快了2倍左右,

在这里插入图片描述

为什么基数树不需要加锁?

在前面,我们需要加锁,是因为底层我们用到了map对应红黑树,unordered_map对应哈希表,我们在插入删除的时候,其结构会发生变化,所以需要加锁保证其线程安全。

对于基数树:

1、我们只在pagecache中的这两个函数中建立id和span的映射关系。

2、在基数树中,写之前又会提取开辟好空间,在我们写的过程中,是不会去改变结果的。

3、我们不会对同一个位置进行读写,线程1在对一个位置进行读写的时候,例外一个线程是被禁止访问这个区域的,这一块区域也就是线程1此时独享,线程2只能例外找一个位置进行读写。

在这里插入图片描述

扩展学习及当前项目实现的不足

实际中我们测试了,当前实现的并发内存池比malloc/free是更加高效的,那么我们能否替换到系统调用malloc呢?实际上是可以的。

  • 不同平台替换方式不同。 基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持 alias attribute,所以替换就变成了这种通用形式:
void*malloc(size_t size) THROW attribute__((alias(tc_malloc)))

因此所有malloc的调用都跳转到了tc_malloc的实现 。

源码:https://gitee.com/deng_yuniubi/high-concurrency-memory-pool

标签: c++ 开发语言 后端

本文转载自: https://blog.csdn.net/weixin_46873777/article/details/123962914
版权归原作者 雨轩(爵丶迹) 所有, 如有侵权,请联系我们删除。

“高并发内存池(学会了拿捏面试官)”的评论:

还没有评论