0


【项目实战】高并发内存池—调优以及性能测试

  •                                     ***🎬慕斯主页***:*******************************************************************************************************************************修仙—别有洞天********************************************************************************************************************************
    
                                     ♈️*今日夜电波:*Blues in the Closet—ずっと真夜中でいいのに。**
    
                                                         0:34━━━━━━️💟──────── 3:37
                                                               🔄   ◀️   ⏸   ▶️    ☰  
    
                               💗关注👍点赞🙌收藏*您的每一次鼓励都是对我莫大的支持*😍
    

目录


实现大于256KB的内存块申请

申请操作的修改
    前面我们实现的内存池实际上只支持申请256KB以下的内存,因为我们在RoundUp函数里面对超过256KB的内存申请操作设置了断言处理。当申请的内存小于256KB也就是32页时,我们可以向threadcache申请,如果申请的内存在32页-128页时,我们应该向pagecache申请,而如果大于128页了就需要向堆进行申请了。对此需要对RoundUp进行一定的修改:
    // 整体控制在最多10%左右的内碎⽚浪费
    // [1,128] 8byte对⻬ freelist[0,16)
    // [128+1,1024] 16byte对⻬ freelist[16,72)
    // [1024+1,81024] 128byte对⻬ freelist[72,128)
    // [8*1024+1,641024] 1024byte对⻬ freelist[128,184)
    // [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208)

    // 如何理解?实际上就是获取要得到的内存块大小
    // 如:bytes=16,align=8,则得到的结果是这样的:(16+(8-1))& ~(8-1) -> 0001 0111 & 1111 1000 -> 16
    static inline size_t _RoundUp(size_t bytes, size_t align)
    {
        return (((bytes)+align - 1) & ~(align - 1));
    }

    // 对⻬⼤⼩计算
    static inline size_t RoundUp(size_t bytes)
    {
        if (bytes<=128)
        {
            return _RoundUp(bytes, 8);
        }
        else if (bytes <= 1024) {
            return _RoundUp(bytes, 16);
        }
        else if (bytes <= 8*1024) {
            return _RoundUp(bytes, 128);
        }
        else if (bytes <= 64 * 1024) {
            return _RoundUp(bytes, 1024);
        }
        else if (bytes <= 256 * 1024){
            return _RoundUp(bytes, 8 * 1024);
        }
        else
        {
            return _RoundUp(bytes, 1 << PAGE_SHIFT);
        }

    }
    
    我们需要对于内存申请模块进行一定的修改,当大于256KB时直接调用pagecache的申请接口即可,后续需要向堆进行申请的操作也是更改NewSPan这个接口即可:
static void* ConcurrentAlloc(size_t size)
{
    if (size > MAX_BYTES) //申请大于256KB的内存
    {
        //计算出对齐后需要申请的页数
        size_t alignSize = SizeClass::RoundUp(size);
        size_t kPage = alignSize >> PAGE_SHIFT;

        //向page cache申请kPage页的span
        PageCache::GetInstance()->_pageMtx.lock();
        Span* span = PageCache::GetInstance()->NewSpan(kPage);
        PageCache::GetInstance()->_pageMtx.unlock();

        void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
        return ptr;
    }
    else
    {
        // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
        if (pTLSThreadCache == nullptr)
        {
            pTLSThreadCache = new ThreadCache;
        }

        cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

        return pTLSThreadCache->Allocate(size);
    }
}
    需要修改NewSPan,将之前的断言需要小于256KB的条件去掉,加入特别为大于128页需要找堆申请的选项给加上,当然前面ConcurrentAlloc申请超过128页时也是需要加锁的。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
    assert(k > 0);
    if (k > NPAGES - 1) //大于128页直接找堆申请
    {
        void* ptr = SystemAlloc(k);
        Span* span = new Span;
        span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
        span->_n = k;
        _idSpanMap[span->_pageId] = span;
        return span;
    }

    //先检查第k个桶里面有没有span
    if (!_spanLists[k].Empty())
    {
        Span* kSpan = _spanLists[k].PopFront();

        //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
        for (PAGE_ID i = 0; i < kSpan->_n; i++)
        {
            _idSpanMap[kSpan->_pageId + i] = kSpan;
        }

        return kSpan;
    }
    //检查一下后面的桶里面有没有span,如果有可以将其进行切分
    for (size_t i = k + 1; i < NPAGES; i++)
    {
        if (!_spanLists[i].Empty())
        {
            Span* nSpan = _spanLists[i].PopFront();
            Span* kSpan = new Span;
            //在nSpan的头部切k页下来
            kSpan->_pageId = nSpan->_pageId;
            kSpan->_n = k;

            nSpan->_pageId += k;
            nSpan->_n -= k;
            //将剩下的挂到对应映射的位置
            _spanLists[nSpan->_n].PushFront(nSpan);

            //存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
            _idSpanMap[nSpan->_pageId] = nSpan;
            _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

            //建立页号与span的映射,方便central cache回收小块内存时查找对应的span
            for (PAGE_ID i = 0; i < kSpan->_n; i++)
            {
                _idSpanMap[kSpan->_pageId + i] = kSpan;
            }

            return kSpan;
        }
    }
    //走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
    Span* bigSpan = new Span;
    void* ptr = SystemAlloc(NPAGES - 1);
    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
    bigSpan->_n = NPAGES - 1;

    _spanLists[bigSpan->_n].PushFront(bigSpan);

    //尽量避免代码重复,递归调用自己
    return NewSpan(k);
}
释放操作的修改
    前面我们再申请操作的时候将内存的申请分为了小于等于32页向pagecache申请,32页-128页向pagecache申请,大于128页向堆直接申请,那么我们对应的释放操作也应该这么分类去释放。
static void ConcurrentFree(void* ptr, size_t size)
{
    if (size > MAX_BYTES) //释放大于256KB的内存
    {
        Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
        PageCache::GetInstance()->_pageMtx.lock();
        PageCache::GetInstance()->ReleaseSpanToPageCache(span);
        PageCache::GetInstance()->_pageMtx.unlock();
    }
    else
    {
        assert(pTLSThreadCache);

        pTLSThreadCache->Deallocate(ptr, size);
    }
}
    如果要直接向堆释放,那么当然也需要对应的接口,在Windows环境下的接口为VirtualFree,对此进行封装一下,后续如果需要跨平台可进行操作:
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
    VirtualFree(ptr, 0, MEM_RELEASE);
#else
    // sbrk unmmap等
#endif
}
    后续同前面NewSpan一样,我们需要对从pagecache释放内存块的ReleaseSpanToPageCache接口进行一定的修改:
void PageCache::ReleaseSpanToPageCache(Span* span)
{
    if (span->_n > NPAGES - 1)//大于128页直接释放给堆
    {
        void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
        SystemFree(ptr);
        delete span;
        return;
    }

    // 对span前后的页,尝试进行合并,缓解内存碎片问题
    while (1)
    {
        PAGE_ID prevId= span->_pageId - 1;
        auto ret = _idSpanMap.find(prevId);
        //前面的页号没有(还未向系统申请),停止向前合并
        if (ret == _idSpanMap.end())
        {
            break;
        }
        //前面的页号对应的span正在被使用,停止向前合并
        if (ret->second->_isUse == true)
        {
            break;
        }
        //合并出超过128页的span无法进行管理,停止向前合并
        if (ret->second->_n + span->_n > NPAGES - 1)
        {
            break;
        }

        span->_pageId = ret->second->_pageId;
        span->_n += ret->second->_n;

        _spanLists[ret->second->_n].Erase(ret->second);
        delete ret->second;
    }

    // 向后合并
    while (1)
    {
        PAGE_ID nextId = span->_pageId + span->_n;
        auto ret = _idSpanMap.find(nextId);
        //后面的页号没有(还未向系统申请),停止向后合并
        if (ret == _idSpanMap.end())
        {
            break;
        }
        //后面的页号对应的span正在被使用,停止向后合并
        Span* nextSpan = ret->second;
        if (nextSpan->_isUse == true)
        {
            break;
        }
        //合并出超过128页的span无法进行管理,停止向后合并
        if (nextSpan->_n + span->_n > NPAGES - 1)
        {
            break;
        }
        //进行向后合并
        span->_n += nextSpan->_n;

        //将nextSpan从对应的双链表中移除
        _spanLists[nextSpan->_n].Erase(nextSpan);

        delete nextSpan;
    }
    //将合并后的span挂到对应的双链表当中
    _spanLists[span->_n].PushFront(span);
    //建立该span与其首尾页的映射
    _idSpanMap[span->_pageId] = span;
    _idSpanMap[span->_pageId + span->_n - 1] = span;
    //将该span设置为未被使用的状态
    span->_isUse = false;
}

使用定长内存池解除对new与delete的使用

    tcmalloc实际上就是为了在高并发的场景下替代malloc的,如果我们使用了new那么不还是没摆脱malloc的使用吗?(因为new底层封装了malloc)对此,我们可以使用之前实现的定长内存池来替换new,需要在几个使用了new的地方进行修改:

pagecache类中:

class PageCache
{
Span* PageCache::NewSpan(size_t k)
{
//Span* span = new Span;
Span* span = _spanPool.New();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();

}
  private:
ObjectPool<Span> _spanPool;
};
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//delete span;
_spanPool.Delete(span);
//delete ret->second;
_spanPool.Delete(ret->second);

//delete nextSpan;
_spanPool.Delete(nextSpan);
}

ConcurrentAlloc中,每个线程第一次使用threadcache时都是new出来的所以需要进行处理:

        // 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
        if (pTLSThreadCache == nullptr)
        {
            //pTLSThreadCache = new ThreadCache;
            static std::mutex tcMtx;
            static ObjectPool<ThreadCache> tcPool;
            tcMtx.lock();
            pTLSThreadCache = tcPool.New();
            tcMtx.unlock();
        }
    SpanList中也用到了new,这里定义一个static类型的ObjectPool即可,注意两个头文件之间的互相包含,我们需要给前置声明。
template<class T>
class ObjectPool;
class SpanList
{
public:
SpanList()
    {
      _head = new Span;
        _head = _spanPool.New();
    }
  private:
static ObjectPool<Span> _spanPool;

};

解决释放对象需要传入对象大小的问题

    这里就要提前面的Span结构体中定义的 size_t _objSize =0;//切好的小对象的大小变量。这个就是用于解决ConcurrentFree接口需要传入对象大小的问题。我们只需要在获取一个新的Span的同时传入这个Span的大小即可。后续可直接按照这个大小进行释放。这里主要有两个地方:一个是申请大于256KB的内存的时候,另外一个是centralcache从pagecache申请Span的时候,如下:
static void* ConcurrentAlloc(size_t size)
{
        //向page cache申请kPage页的span
        PageCache::GetInstance()->_pageMtx.lock();
        Span* span = PageCache::GetInstance()->NewSpan(kPage);
        span->_objSize = size;
}

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
    PageCache::GetInstance()->_pageMtx.lock();
    Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
    span->_objSize = size;
}

然后我们的释放即可不需要插入对象大小了:

static void ConcurrentFree(void* ptr)
{
    Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
    size_t size = span->_objSize;
    if (size > MAX_BYTES) //大于256KB的内存释放
    {
        PageCache::GetInstance()->_pageMtx.lock();
        PageCache::GetInstance()->ReleaseSpanToPageCache(span);
        PageCache::GetInstance()->_pageMtx.unlock();
    }
    else
    {
        assert(pTLSThreadCache);
        pTLSThreadCache->Deallocate(ptr, size);
    }
}

解决获取对象与Span的线程安全问题

    _idSpanMap这个成员变量用于存储页号与span之间的映射关系,由于STL不是线程安全的,因此我们在访问_idSpanMap的时候会出现线程安全的问题,对此我们需要加锁:
Span* PageCache::MapObjectToSpan(void* obj)
{
    PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;

    std::unique_lock<std::mutex> lock(_pageMtx); //构造时加锁,析构时自动解锁
    auto ret = _idSpanMap.find(id);
    if (ret != _idSpanMap.end())
    {
        return ret->second;
    }
    else {
        assert(false);
        return nullptr;
    }
}

与malloc在多线程环境下的性能测试

#include "ObjectPool.hpp"
#include "ConcurrentAlloc.hpp"
#inlcude <atomic>

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;
    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&, k]() {
            std::vector<void*> v;
            v.reserve(ntimes);
            for (size_t j = 0; j < rounds; ++j)
            {
                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    v.push_back(malloc(16));
                    //v.push_back(malloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();
                size_t begin2 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    free(v[i]);
                }
                size_t end2 = clock();
                v.clear();
                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
            });
    }
    for (auto& t : vthread)
    {
        t.join();
    }
    std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc " << ntimes << "次: 花费:" << malloc_costtime << " ms\n";
    std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次free " << ntimes << "次: 花费:" << free_costtime << " ms\n";
    std::cout << nworks << "个线程并发malloc&free " << nworks * rounds * ntimes << "次,总计花费:" << (malloc_costtime + free_costtime) << " ms\n";
    /*printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime);
    printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime);
    printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);*/
}

void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;
    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&]() {
            std::vector<void*> v;
            v.reserve(ntimes);
            for (size_t j = 0; j < rounds; ++j)
            {
                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    v.push_back(ConcurrentAlloc(16));
                    //v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();
                size_t begin2 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    ConcurrentFree(v[i]);
                }
                size_t end2 = clock();
                v.clear();
                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
            });
    }
    for (auto& t : vthread)
    {
        t.join();
    }
    std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent alloc " << ntimes << "次: 花费:" << malloc_costtime << " ms\n";
    std::cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent dealloc " << ntimes << "次: 花费:" << free_costtime << " ms\n";
    std::cout << nworks << "个线程并发concurrent alloc&dealloc " << nworks * rounds * ntimes << "次,总计花费:" << (malloc_costtime + free_costtime) << " ms\n";
    //printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime);
    //printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime);
    //printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}

int main()
{
    size_t n = 10000;
    cout << "==========================================================" <<
        endl;
    BenchmarkConcurrentMalloc(n, 3, 10);
    cout << endl << endl;
    BenchmarkMalloc(n, 3, 10);
    cout << "==========================================================" <<
        endl;
    return 0;
}
    较为分散的情况下进行内存申请,可以看到我们在多线程环境下,我们的concurrent alloc申请内存确实是比malloc快,但是在释放内存的时候我们却比malloc还慢了许多???而整体花费的时间还更多?导致这个的原因实际上就是因为锁的竞争。

    当我们较为集中的申请内存时,可以发现我们的concurrent alloc申请内存比malloc快了10倍多,虽然释放空间速度还是很慢,但是整体是比malloc要快的:

利用基数树进行优化性能

    上面也提到了,导致我们性能欠缺的原因就是锁的竞争导致的,因此当前项目的瓶颈点就在锁竞争上面。我们需要解决如上的问题,那么最简单的方法就是读写分离,利用基数树可以做到如上的功能。

    基数树(Radix Tree),也称为紧凑前缀树(Compact Prefix Tree)或Patricia树(Patricia Tree),是一种在计算机科学中用于存储和检索字符串键值的高效数据结构。它是对Trie树(前缀树)的一种优化,通过压缩公共前缀来减少节点的数量,从而节省存储空间。实际上就是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等等以下是基数树的详细解释:
    对于单层基数树:采用的就是直接定址法,每个页号对应一个span的地址,也就是如下我们使用set接口传入的页号以及span的首地址来做到的,而根据页号则可以得到对应span的首地址,既如下的get接口,需要注意的是:如下的模版参数BITS我们应该传入什么呢?在32位下的话,假设一页为8KB,那么我们需要传入32-13也就是19,这个19既为2的指数次幂的上标,那么我们储存32位下的所有页号就最多只需要19位,既我们要传入的为存储页号的最大位数。又因为32为下指针大小为4位,那么需要2^9*4=2M。但是,如果是在64为下,则需要2^51*8=2^54=2^24G。显然一层是不够的,此时我们需要3层基数树。
//单层基数树
template <int BITS>
class TCMalloc_PageMap1
{
public:
    typedef uintptr_t Number;
    explicit TCMalloc_PageMap1()
    {
        size_t size = sizeof(void*) << BITS; //需要开辟数组的大小
        size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按页对齐后的大小
        array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申请空间
        memset(array_, 0, size); //对申请到的内存进行清理
    }
    void* get(Number k) const
    {
        if ((k >> BITS) > 0) //k的范围不在[0, 2^BITS-1]
        {
            return NULL;
        }
        return array_[k]; //返回该页号对应的span
    }
    void set(Number k, void* v)
    {
        assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS-1]
        array_[k] = v; //建立映射
    }
private:
    void** array_; //存储映射关系的数组
    static const int LENGTH = 1 << BITS; //页的数目
};
    对于二层基数树,以32位为例,我们可以让前面提到的需要的19个比特位分两次来进行映射,比如:取出前5个比特位作为第一层的映射,后14位作为第二层的映射。这样带上每个指针4位的条件,我们第一层占用4*2^5=128B,而第二层最多2^14*4*2^15=2M。使用二层基数树的好处就在于:一层基数树一开始就把2M空间开辟出来了,但是第二层只需要在要用的时候再开辟空间即可。这样不就大大的节省了空间吗?同理的,如果我们要使用64位下的基数树映射,那么使用三层基数树就会更加节省空间:
//二层基数树
template <int BITS>
class TCMalloc_PageMap2
{
private:
    static const int ROOT_BITS = 5;                //第一层对应页号的前5个比特位
    static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储元素的个数
    static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位
    static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储元素的个数
    //第一层数组中存储的元素类型
    struct Leaf
    {
        void* values[LEAF_LENGTH];
    };
    Leaf* root_[ROOT_LENGTH]; //第一层数组
public:
    typedef uintptr_t Number;
    explicit TCMalloc_PageMap2()
    {
        memset(root_, 0, sizeof(root_)); //将第一层的空间进行清理
        PreallocateMoreMemory(); //直接将第二层全部开辟
    }
    void* get(Number k) const
    {
        const Number i1 = k >> LEAF_BITS;        //第一层对应的下标
        const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
        if ((k >> BITS) > 0 || root_[i1] == NULL) //页号值不在范围或没有建立过映射
        {
            return NULL;
        }
        return root_[i1]->values[i2]; //返回该页号对应span的指针
    }
    void set(Number k, void* v)
    {
        const Number i1 = k >> LEAF_BITS;        //第一层对应的下标
        const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
        assert(i1 < ROOT_LENGTH);
        root_[i1]->values[i2] = v; //建立该页号与对应span的映射
    }
    //确保映射[start,start_n-1]页号的空间是开辟好了的
    bool Ensure(Number start, size_t n)
    {
        for (Number key = start; key <= start + n - 1;)
        {
            const Number i1 = key >> LEAF_BITS;
            if (i1 >= ROOT_LENGTH) //页号超出范围
                return false;
            if (root_[i1] == NULL) //第一层i1下标指向的空间未开辟
            {
                //开辟对应空间
                static ObjectPool<Leaf> leafPool;
                Leaf* leaf = (Leaf*)leafPool.New();
                memset(leaf, 0, sizeof(*leaf));
                root_[i1] = leaf;
            }
            key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
        }
        return true;
    }
    void PreallocateMoreMemory()
    {
        Ensure(0, 1 << BITS); //将第二层的空间全部开辟好
    }
};
    对于三层基数树:层数的增加很大程度上节省了空间。
//三层基数树
template <int BITS>
class TCMalloc_PageMap3
{
private:
    static const int INTERIOR_BITS = (BITS + 2) / 3;       //第一、二层对应页号的比特位个数
    static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二层存储元素的个数
    static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数
    static const int LEAF_LENGTH = 1 << LEAF_BITS;         //第三层存储元素的个数
    struct Node
    {
        Node* ptrs[INTERIOR_LENGTH];
    };
    struct Leaf
    {
        void* values[LEAF_LENGTH];
    };
    Node* NewNode()
    {
        static ObjectPool<Node> nodePool;
        Node* result = nodePool.New();
        if (result != NULL)
        {
            memset(result, 0, sizeof(*result));
        }
        return result;
    }
    Node* root_;
public:
    typedef uintptr_t Number;
    explicit TCMalloc_PageMap3()
    {
        root_ = NewNode();
    }
    void* get(Number k) const
    {
        const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
        const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
        const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标
        //页号超出范围,或映射该页号的空间未开辟
        if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL)
        {
            return NULL;
        }
        return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回该页号对应span的指针
    }
    void set(Number k, void* v)
    {
        assert(k >> BITS == 0);
        const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
        const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
        const Number i3 = k & (LEAF_LENGTH - 1);                    //第三层对应的下标
        Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的
        reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立该页号与对应span的映射
    }
    //确保映射[start,start+n-1]页号的空间是开辟好了的
    bool Ensure(Number start, size_t n)
    {
        for (Number key = start; key <= start + n - 1;)
        {
            const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);         //第一层对应的下标
            const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
            if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下标值超出范围
                return false;
            if (root_->ptrs[i1] == NULL) //第一层i1下标指向的空间未开辟
            {
                //开辟对应空间
                Node* n = NewNode();
                if (n == NULL) return false;
                root_->ptrs[i1] = n;
            }
            if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二层i2下标指向的空间未开辟
            {
                //开辟对应空间
                static ObjectPool<Leaf> leafPool;
                Leaf* leaf = leafPool.New();
                if (leaf == NULL) return false;
                memset(leaf, 0, sizeof(*leaf));
                root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
            }
            key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
        }
        return true;
    }
    void PreallocateMoreMemory()
    {}
};
    后续需要对于代码的优化:我们只需要将pagecache层中的std::unordered_map<PAGE_ID, Span*> _idSpanMap;给替换成基数树即可,如下为32位平台下的示例:随便使用几层基数树都可以的,如下:
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
    TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

需要建立映射的时候,我们就调用set函数:

_idSpanMap.set(span->_pageId, span);

需要读取映射的时候,就需要调用get函数:

(Span*)_idSpanMap.get(nextId);

pagecache的更改:

#pragma once
#include "Common.hpp"
#include "ObjectPool.hpp"
#include "PageMap.hpp"
// 1.page cache是⼀个以⻚为单位的span⾃由链表
// 2.为了保证全局只有唯⼀的page cache,这个类被设计成了单例模式
class PageCache
{
public:
    static PageCache* GetInstance()
    {
        return &_sInst;
    }

    Span* NewSpan(size_t k);

    //获取从对象到span的映射
    Span* MapObjectToSpan(void* obj);

    std::mutex _pageMtx;

    void ReleaseSpanToPageCache(Span* span);
private:
    SpanList _spanLists[NPAGES];

    ObjectPool<Span> _spanPool;
    
    PageCache()
    {}

    PageCache(const PageCache&) = delete;

    static PageCache _sInst;

    //std::unordered_map<PAGE_ID, Span*> _idSpanMap;
    TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

};

PageCache PageCache::_sInst;

// 获取一个K页的span
//Span* PageCache::NewSpan(size_t k)
//{
//    assert(k > 0 && k < NPAGES);
//
//    // 先检查第k个桶里面有没有span
//    if (!_spanLists[k].Empty())
//    {
//        return _spanLists[k].PopFront();
//    }
//
//    // 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
//    for (size_t i = k + 1; i < NPAGES; ++i)
//    {
//        if (!_spanLists[i].Empty())
//        {
//            Span* nSpan = _spanLists[i].PopFront();
//            Span* kSpan = new Span;
//
//            // 在nSpan的头部切一个k页下来
//            // k页span返回
//            // nSpan再挂到对应映射的位置
//            kSpan->_pageId = nSpan->_pageId;
//            kSpan->_n = k;
//
//            nSpan->_pageId += k;
//            nSpan->_n -= k;
//
//            _spanLists[nSpan->_n].PushFront(nSpan);
//
//
//            return kSpan;
//        }
//    }
//
//    // 走到这个位置就说明后面没有大页的span了
//    // 这时就去找堆要一个128页的span
//    Span* bigSpan = new Span;
//    void* ptr = SystemAlloc(NPAGES - 1);
//    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
//    bigSpan->_n = NPAGES - 1;
//
//    _spanLists[bigSpan->_n].PushFront(bigSpan);
//
//    return NewSpan(k);
//}

// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
    assert(k > 0);

    // 大于128 page的直接向堆申请
    if (k > NPAGES-1)
    {
        void* ptr = SystemAlloc(k);
        //Span* span = new Span;
        Span* span = _spanPool.New();

        span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
        span->_n = k;

        //_idSpanMap[span->_pageId] = span;
        _idSpanMap.set(span->_pageId, span);

        return span;
    }

    // 先检查第k个桶里面有没有span
    if (!_spanLists[k].Empty())
    {
        Span* kSpan = _spanLists[k].PopFront();

        // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
        for (PAGE_ID i = 0; i < kSpan->_n; ++i)
        {
            //_idSpanMap[kSpan->_pageId + i] = kSpan;
            _idSpanMap.set(kSpan->_pageId + i, kSpan);
        }

        return kSpan;
    }

    // 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
    for (size_t i = k+1; i < NPAGES; ++i)
    {
        if (!_spanLists[i].Empty())
        {
            Span* nSpan = _spanLists[i].PopFront();
            //Span* kSpan = new Span;
            Span* kSpan = _spanPool.New();

            // 在nSpan的头部切一个k页下来
            // k页span返回
            // nSpan再挂到对应映射的位置
            kSpan->_pageId = nSpan->_pageId;
            kSpan->_n = k;

            nSpan->_pageId += k;
            nSpan->_n -= k;

            _spanLists[nSpan->_n].PushFront(nSpan);
            // 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时
            // 进行的合并查找
            //_idSpanMap[nSpan->_pageId] = nSpan;
            //_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
            _idSpanMap.set(nSpan->_pageId, nSpan);
            _idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);

            // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
            for (PAGE_ID i = 0; i < kSpan->_n; ++i)
            {
                //_idSpanMap[kSpan->_pageId + i] = kSpan;
                _idSpanMap.set(kSpan->_pageId + i, kSpan);
            }

            return kSpan;
        }
    }

    // 走到这个位置就说明后面没有大页的span了
    // 这时就去找堆要一个128页的span
    //Span* bigSpan = new Span;
    Span* bigSpan = _spanPool.New();
    void* ptr = SystemAlloc(NPAGES - 1);
    bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
    bigSpan->_n = NPAGES - 1;

    _spanLists[bigSpan->_n].PushFront(bigSpan);

    return NewSpan(k);
}

Span* PageCache::MapObjectToSpan(void* obj)
{
    PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);

    /*std::unique_lock<std::mutex> lock(_pageMtx);
    auto ret = _idSpanMap.find(id);
    if (ret != _idSpanMap.end())
    {
        return ret->second;
    }
    else
    {
        assert(false);
        return nullptr;
    }*/
    auto ret = (Span*)_idSpanMap.get(id);
    assert(ret != nullptr);
    return ret;
}

void PageCache::ReleaseSpanToPageCache(Span* span)
{
    // 大于128 page的直接还给堆
    if (span->_n > NPAGES-1)
    {
        void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
        SystemFree(ptr);
        //delete span;
        _spanPool.Delete(span);

        return;
    }

    // 对span前后的页,尝试进行合并,缓解内存碎片问题
    while (1)
    {
        PAGE_ID prevId = span->_pageId - 1;
        //auto ret = _idSpanMap.find(prevId);
         前面的页号没有,不合并了
        //if (ret == _idSpanMap.end())
        //{
        //    break;
        //}

        auto ret = (Span*)_idSpanMap.get(prevId);
        if (ret == nullptr)
        {
            break;
        }

        // 前面相邻页的span在使用,不合并了
        Span* prevSpan = ret;
        if (prevSpan->_isUse == true)
        {
            break;
        }

        // 合并出超过128页的span没办法管理,不合并了
        if (prevSpan->_n + span->_n > NPAGES-1)
        {
            break;
        }

        span->_pageId = prevSpan->_pageId;
        span->_n += prevSpan->_n;

        _spanLists[prevSpan->_n].Erase(prevSpan);
        //delete prevSpan;
        _spanPool.Delete(prevSpan);
    }

    // 向后合并
    while (1)
    {
        PAGE_ID nextId = span->_pageId + span->_n;
        /*auto ret = _idSpanMap.find(nextId);
        if (ret == _idSpanMap.end())
        {
            break;
        }*/

        auto ret = (Span*)_idSpanMap.get(nextId);
        if (ret == nullptr)
        {
            break;
        }

        Span* nextSpan = ret;
        if (nextSpan->_isUse == true)
        {
            break;
        }

        if (nextSpan->_n + span->_n > NPAGES-1)
        {
            break;
        }

        span->_n += nextSpan->_n;

        _spanLists[nextSpan->_n].Erase(nextSpan);
        //delete nextSpan;
        _spanPool.Delete(nextSpan);
    }

    _spanLists[span->_n].PushFront(span);
    span->_isUse = false;
    //_idSpanMap[span->_pageId] = span;
    //_idSpanMap[span->_pageId+span->_n-1] = span;

    _idSpanMap.set(span->_pageId, span);
    _idSpanMap.set(span->_pageId + span->_n - 1, span);
}
    可以看到我们在读取映射关系的时候都是没有加锁的,为什么基数数对于其它的哈希表如:map、unordered_map不一样,不需要加锁呢?因为,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且对于同一个页,我们不会同时进行读取映射以及建立映射,为什么呢?因为只有在释放对象的时候才需要读取映射,而建立映射的操作都是在pagecache中进行的。什么意思呢?读取的都是对应span的_useCount不等于0的页,建立映射是都是建立span的_useCount等于0的页。

优化后的性能:可以看到已经全方面吊打malloc了。

随机申请的地址:

紧凑申请的地址:


        **             感谢你耐心的看到这里ღ( ´・ᴗ・` )比心,如有哪里有错误请踢一脚作者o(╥﹏╥)o!** 

                                   ![](https://img-blog.csdnimg.cn/a2296f4aa7fd45e9b1a1c44f9b8432a6.gif)​​

** 给个三连再走嘛~ **

标签: 开发语言 c++ 缓存

本文转载自: https://blog.csdn.net/weixin_64038246/article/details/141131239
版权归原作者 慕斯( ˘▽˘)っ 所有, 如有侵权,请联系我们删除。

“【项目实战】高并发内存池—调优以及性能测试”的评论:

还没有评论