***🎬慕斯主页***:***************************************************************修仙—别有洞天**************************************************************** ♈️*今日夜电波:Blues in the Closet—ずっと真夜中でいいのに。* 0:34━━━━━━️💟──────── 3:37 🔄 ◀️ ⏸ ▶️ ☰ 💗关注👍点赞🙌收藏*您的每一次鼓励都是对我莫大的支持*😍
内存池主要解决的问题的是效率问题(每次申请内存都要向系统进行申请,那么我们为什么不提前就申请好一大块内存,然后在按需分配呢?)跟内存碎片问题,如下:
在C/C++中申请和释放内存本质上都是调用malloc跟free,注意我们的new/delete本质上也是封装了operator malloc、operator free。malloc实际就是一个内存池。而tcmalloc在多线程的情况下会比malloc效率更高更快!
开胃菜--先设计⼀个定长的内存池
快速了解内存池的概念—申请与释放。固定大小的内存申请释放需求。特点:1、性能达到极致。2、不考虑内存碎片等问题。 大致的设计图示如下:
通过_memory来申请好内存,自由链表freelist来管理好要释放的内存。实际上我们这个定长内存池是没有释放内存的,只有在进程结束的时候才会被系统自动释放内存!
其中需要注意:(char*)解引用位1个字节,那么我们把他强转位(int *)解引用则会是4个字节。
那么如果我们想在一块一块的空间头部存地址,可以这样赋值*(int *)obj=xxx,注意obj位首地址。但是这也有个问题,就是我们不知道环境是32位下还是64位下。方法一:我们可以通过判断指针大小来指定对应大小的类型进行解引用赋值。方法二:*(void **)obj=xxx,直接通过解引用二级指针计算一级指针的大小来赋值,当然其他类型的二级指针也是可以的!!!后续则进行头插即可构建void*_free_list。
ConcurrentMemoryPool
#include <iostream>
#include <windows.h>
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage * (1 << 12), MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); //windows下的内存申请接口VirtualAlloc
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
if (_freeList != nullptr) // 如果⾃由链表有对象,直接取⼀个
{
obj = (T*)_freeList;
_freeList = *((void**)_freeList);
}
else
{
if (_leftBytes < sizeof T)
{
_leftBytes = 128 * 1024;
//这里还是调用了malloc,实际上我们可以直接去堆上申请空间
//_memory = (char*)malloc(_leftBytes);
// 直接调用系统接口去堆上申请空间
_memory = (char*)SystemAlloc(_leftBytes);
if (_memory == nullptr)
{
std::bad_alloc();
}
}
obj = (T*)_memory;
size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objsize;
_leftBytes -= objsize;
}
// 使⽤定位new调⽤T的构造函数初始化
// 使用placement new操作符在obj指向的内存位置上构造一个类型为T的对象
new(obj)T;
return obj;
}
void Delete(T* obj)
{
// 显⽰调⽤的T的析构函数进⾏清理
obj->~T();
// 头插到freeList
*((void**)obj) = _freeList;
_freeList = obj;
}
private:
char* _memory = nullptr; // 指向内存块的指针
void* _freeList = nullptr; // 管理还回来的内存对象的⾃由链表
int _leftBytes = 0; // 内存块中剩余字节数
};
实现大体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本⾝其实已经很优秀,那么我们项⽬的原型tcmalloc就是在多线程⾼并发的场景下更胜⼀筹,所以这次我们实现的内存池需要考虑以下⼏⽅⾯的问题。
性能问题。
多线程环境下,锁竞争问题。
内存碎⽚问题。
大体的框架如下:
concurrent memory pool主要由以下3个部分构成: 1. thread cache:线程缓存是每个线程独有的,⽤于⼩于256KB的内存的分配,线程从这⾥申请内存不需要加锁,每个线程独享⼀个cache,这也就是这个并发线程池⾼效的地⽅。 2. central cache:中⼼缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免⼀个线程占⽤了太多的内存,⽽其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的⽬的。central cache是存在竞争的,所以从这⾥取内存对象是需要加锁,⾸先这⾥⽤的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这⾥竞争不会很激烈。 3. page cache:⻚缓存是在central cache缓存上⾯的⼀层缓存,存储的内存是以⻚为单位存储及分配的,central cache没有内存对象时,从page cache分配出⼀定数量的page,并切割成定⻓⼤⼩的⼩块内存,分配给central cache。当⼀个span的⼏个跨度⻚的对象都回收以后,page cache会回收central cache满⾜条件的span对象,并且合并相邻的⻚,组成更⼤的⻚,缓解内存碎⽚的问题
根据上面的大体框架详细介绍每一层
thread cache
thread cache是哈希桶结构,每个桶是⼀个按桶位置映射⼤⼩的内存块对象的⾃由链表。每个线程都会有⼀个thread cache对象,这样每个线程在这⾥获取对象和释放对象时是⽆锁的。 当我们再申请内存的时候,我们会根据要申请的内存大小来进行分配对应的哈希桶管理下的内存,比如:我要申请的是4Byte的空间那么我们给你分配8Byte的空间,如果要8Byte的空间那也是给你8Byte的空间,如果要10Byte的空间,那么此时8Byte<10Byte<16Byte,此时会分配16Byte的空间。但是这个是时候又会出现一个问题,我们过量的分配了内存空间,那么是不是有一部分内存永远都用不上了呢?答案是对的,这部分过量分配的空间就被称职位内存碎片中的内部碎片问题,而外部碎片问题前面有所提到,那是因为连续的内存空间,而其中内存的释放而导致的!
申请内存:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶⾃ 由链表下标i。
- 如果⾃由链表_freeLists[i]中有对象,则直接Pop⼀个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取⼀定数量的对象,插⼊到⾃由链表 并返回⼀个对象。
释放内存:
- 当释放内存⼩于256k时将内存释放回thread cache,计算size映射⾃由链表桶位置i,将对象Push 到_freeLists[i]。
- 当链表的⻓度过⻓,则回收⼀部分内存对象到central cache。
哈希桶映射对齐规则
上面提到了,我们如果在分配内存方面都按照8kb、16kb、24kb这样逐渐增长且只有单一的对齐规则(既:按照8字节为单位作为一个_freeList),这样做会导致我们会有很多的_freeList,可以大致计算一下:256*1024/8=32768个的_freeList。也会造成很多的内部碎片的产生!对此,我们需要定义一套策略来减少内部碎片以及让_freeList的创建减少。
策略如下:
// [1,128] 8byte对⻬ freelist[0,16)
// [128+1,1024] 16byte对⻬ freelist[16,72)
// [1024+1,81024] 128byte对⻬ freelist[72,128)
// [8*1024+1,641024] 1024byte对⻬ freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208)
这个策略经过验证整体控制在最多10%左右的内碎⽚浪费,并且我们需要控制以及管理的_freeList也减少到了只有208个。
如下为_freeList的功能代码(申请和释放内存)对齐策略的代码:注意其中 _RoundUp为对齐规则的制定 (比如:让16字节和15字节都分配16字节),_Index为_freeList的制定。
#pragma once
#include <iostream>
#include <vector>
#include <time.h>
#include <assert.h>
using std::cout;
using std::endl;
// 获取内存对象中存储的头4 or 8字节值,即链接的下⼀个对象的地址
inline void*& NextObj(void* obj)
{
return *((void**)obj);
}
// ⼩于等于MAX_BYTES,就找thread cache申请
// ⼤于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
// thread cache 和 central cache⾃由链表哈希桶的表⼤⼩
static const size_t NFREELISTS = 208;
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
}
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(obj);
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;
};
// 管理对⻬和映射等关系
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎⽚浪费
// [1,128] 8byte对⻬ freelist[0,16)
// [128+1,1024] 16byte对⻬ freelist[16,72)
// [1024+1,81024] 128byte对⻬ freelist[72,128)
// [8*1024+1,641024] 1024byte对⻬ freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208)
// 如何理解?实际上就是获取要得到的内存块大小
// 如:bytes=16,align=8,则得到的结果是这样的:(16+(8-1))& ~(8-1) -> 0001 0111 & 1111 1000 -> 16
static inline size_t _RoundUp(size_t bytes, size_t align)
{
return (((bytes)+align - 1) & ~(align - 1));
}
// 对⻬⼤⼩计算
static inline size_t RoundUp(size_t bytes)
{
if (bytes<=128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024) {
return _RoundUp(bytes, 16);
}
else if (bytes <= 8*1024) {
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024) {
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024){
return _RoundUp(bytes, 8 * 1024);
}
assert(false);
return -1;
}
// 如何理解?用于找到对应的_freeList下标,注意align_shift为指数次幂
// 如:bytes=16,align_shift=3,则得到的结果是这样的:((16+(1<<3)-1)>>3)-1 -> ((24-1)>>3)-1 -> 1
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪⼀个⾃由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128) {
return _Index(bytes, 3);
}
else if (bytes <= 1024) {
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8*1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] +group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
assert(false);
return -1;
}
};
使用TLS+前面的自由由链表的哈希桶跟对象大小的映射关系实现thread cache
什么是TLS?
线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
#include "Common.hpp"
class ThreadCache
{
public:
void* FetchFromCentralCache(size_t index, size_t size);
// 申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
private:
//其实就是我们的哈希桶结构
FreeList _freeLists[NFREELISTS];
};
//__declspec(thread)是Microsoft Visual Studio编译器提供的一种实现线程局部存储(Thread Local Storage, TLS)的方式。它允许你声明一个变量,让每个线程都有这个变量的一个独立副本。
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// ...
return nullptr;
}
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
return FetchFromCentralCache(index, alignSize);
}
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找对映射的自由链表桶,对象插入进去
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
}
#pragma once
#include "Common.hpp"
#include "ThreadCache.hpp"
static void* ConcurrentAlloc(size_t size)
{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
return pTLSThreadCache->Allocate(size);
}
static void ConcurrentFree(void* ptr, size_t size)
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
central cache
central cache也是⼀个哈希桶结构,他的哈希桶的映射关系跟thread cache是⼀样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下⾯的span中的⼤内存块被按映射关系切成了⼀个个⼩内存块对象挂在span的⾃由链表中。图示如下:
申请内存:
- 当thread cache中没有内存时,就会批量向central cache申请⼀些内存对象,这⾥的批量获取对象 的数量使⽤了类似⽹络tcp协议拥塞控制的慢开始算法;central cache也有⼀个哈希映射的 spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不 过这⾥使⽤的是⼀个桶锁,尽可能提⾼效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请⼀个新的 span对象,拿到span以后将span管理的内存按⼤⼩切好作为⾃由链表链接到⼀起。然后从span中 取对象给thread cache。
- central cache的中挂的span中use_count记录分配了多少个对象出去,分配⼀个对象给thread cache,就++use_count
释放内存:
- 当thread_cache过⻓或者线程销毁,则会将内存释放回central cache中的,释放回来时-- use_count。当use_count减到0时则表⽰所有对象都回到了span,则将span释放回page cache, page cache中会对前后相邻的空闲⻚进⾏合并。
预定义常量_WIN64和_WIN32
需要注意的是_WIN64和_WIN32预定义常量在X64环境下都被定义了,而X86环境下只定义了_WIN32。这样就会导致,如果我们想通过这两个预定义常量来控制某些东西时,先定义了_WIN32后通过#elif_WIN64则_WIN64始终都是不会起作用的!对此,如果我们想要通过条件编译来控制页的大小或者某些指针相关的变量可以像如下来进行条件编译:
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
定义Span以及管理Span的Spanlist
Span管理⼀个跨度的⼤块内存,而Spanlist管理以⻚为单位的⼤块内存、管理多个连续⻚⼤块内存跨度结构。需要特别注意:Span结构体中的_freeList,这个同threadcache中的_freeList是同样的概念,都是以首部的一个指针大小的内存存放下一个内存块的地址以此构建成链表的结构,也就是上面thread cache图示中的具体实现。如下为基本的实现:
struct Span
{
PAGE_ID _pageId = 0; // ⼤块内存起始⻚的⻚号
size_t _n = 0; // ⻚的数量
Span* _next = nullptr; // 双向链表的结构
Span* _prev = nullptr;
size_t _objSize = 0; // 切好的⼩对象的⼤⼩
size_t _useCount = 0; // 切好⼩块内存,被分配给thread cache的计数
void* _freeList = nullptr; // 切好的⼩块内存的⾃由链表
bool _isUse = false; // 是否在被使⽤
};
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
// prev newspan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
对前面实现的一些组件进行更新
对于FreeList主要增加了PushRange和PopRange,都是批量化的申请以及释放内存。以及增加MaxSize用于对threadcache向centralcache申请内存的慢开始反馈调节算法,至少申请一内存块,配合后面的NumMoveSize来进行控制一次thread cache从中心缓存获取多少个内存块。
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
//直接操作头部和尾部即可
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
//这是一个输出型的操作,实际上就是将空闲的内存块,按照需要的n来用start取出来
void PopRange(void*& start, void*& end, size_t n)
{
assert(n >= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
size_t _size = 0;
};
对于下面这个NumMoveSize函数主要是为了实现threadcache向centralcache进行内存申请时慢开始反馈调节算法,保证在申请时给threadcache对象不会太多也不会太少!
// 一次thread cache从中心缓存获取多少个
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 对象越小一次批量上限高
// 对象越大一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
实现centralcache的大体框架和基本功能
采取单例模式中的懒汉模式,实际上所有的threadcache都是向这一个centralcache来申请想要的内存块的,centralcache向外提供GetInstance()接口用于获取唯一的对象,其中 SpanList _spanLists[NFREELISTS];实际上是同centralcache中的FreeList _freeLists[NFREELISTS];的哈希桶结构是一一对应的,都是208个哈希桶,但是_spanLists中每一个桶存放的是一个又一个的Span,Span中的_freeList存放着切好的⼩块内存的⾃由链表。centralcache还提供了几个接口,GetOneSpan获取⼀个⾮空的Span。FetchRangeObj中,GetOneSpan这个接口可以结合 SizeClass::Index,获取指定哈希桶中的一个Span,再将这个Span中的内存块经由FetchRangeObj将⼀定数量的对象给thread cache。ReleaseListToSpans则是将⼀定数量的对象释放到Span跨度。
需要注意的是,为了保证多线程环境下的线程安全,我们需要额外给桶结构加锁,注意:如果是访问不同的桶结构那么这是不冲突的!所以我们的锁也是定义在SpanList中。
#pragma once
#include "Common.hpp"
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取⼀个⾮空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 从中⼼缓存获取⼀定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 将⼀定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELISTS];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
CentralCache CentralCache::_sInst;
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// ...
return nullptr;
}
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
// 从span中获取batchNum个对象
// 如果不够batchNum个,有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
++i;
++actualNum;
}
// ?
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return actualNum;
}
将前面threadcache未实现的功能实现
按照需求的内存从从centralcache中获得内存,将这些内存块放到threadcache中进行管理。实际上很像网络里边的拥塞控制,一开始给很少,后面如果频繁的申请那么就一次给多一点。通过慢开始反馈调节算法,注意:_freeList中的MaxSize一开始是初始化为1的,也就是说所有的centralcache一开始只能获取一个内存对象,随着某个centralcache中的某个_freeList频繁的去获取内存对象,则会逐渐的增加给这个_freeList的内存对象数量。
// 实际上很像网络里边的拥塞控制,一开始给很少,后面如果频繁的申请那么就一次给多一点
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);
if (actualNum == 1)
{
assert(start == end);
return start;
}
else
{
_freeLists[index].PushRange(NextObj(start), end, actualNum-1);
return start;
}
}
page cache
申请内存:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则 向更⼤⻚寻找⼀个span,如果找到则分裂成两个。⽐如:申请的是4⻚page,4⻚page后⾯没有挂 span,则向后⾯寻找更⼤的span,假设在10⻚page位置找到⼀个span,则将10⻚page span分裂 为⼀个4⻚page span和⼀个6⻚page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使⽤mmap、brk或者是VirtualAlloc等⽅式 申请128⻚page span挂在⾃由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核⼼结构都是spanlist的哈希桶,但是他们是有本质区 别的,central cache中哈希桶,是按跟thread cache⼀样的⼤⼩对⻬关系映射的,他的spanlist中 挂的span中的内存都被按映射关系切好链接成⼩块内存的⾃由链表。⽽page cache 中的spanlist则 是按下标桶号映射的,也就是说第i号桶中挂的span都是i⻚内存。
释放内存:
- 如果central cache释放回⼀个span,则依次寻找span的前后page id的没有在使⽤的空闲span, 看是否可以合并,如果合并继续向前寻找。这样就可以将切⼩的内存合并收缩成⼤的span,减少内 存碎⽚。
需要特别注意的是:虽然我们的spancache同centralcache一样都是只是存在一个,也就是说他们都是存在线程安全问题的。那么我们同centralcache一样向spancache的每一个桶都加锁吗?其实是可以的,但是更推荐于直接整体加锁。给一个场景:如果我们是向桶加锁,此时有多个线程都在找span,并且所找的page中都无span,那么他们都需要往后寻找更⼤的span,此时就会就会造成很多的加锁、解锁的过程,会导致线程的频繁唤醒与睡眠,会对运行效率产生很大的影响。对此,我们倒不如直接整体加锁!
pagecache与centralcache结构的不同
pagecache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,以此类推。
central cache每个桶中的span被切成了一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span就应该由central cache自己来决定。
需要注意的是:pagecache中有几个桶,那么就会有最大挂几页的span,这里我们设计最大挂128页的span,为什么呢?因为线程申请单个对象最大是256KB,而128页可以被切成4个256KB的对象,因此是足够的。为了一一对应,我们这里将static const size_t NPAGES = 129。特别注意:PAGE_SHIFT这个我们定义的全局变量,我们将他定义为 size_t PAGE_SHIFT = 13;也就是说我们认为一页是2^13,也就是8KB的!
对前面实现的一些组件进行更新
在SizeClass类中增加一个NumMovePage函数用于计算要多少个页,实际上就是通过前面的NumMoveSize确定要多少个内存对象,再将这个值乘以单个对象的大小,再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。特别注意:PAGE_SHIFT这个我们定义的全局变量,我们将他定义为 size_t PAGE_SHIFT = 13;也就是说我们认为一页是2^13,也就是8KB的!
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
实现centralcache中GetOneSpan的函数
实现GetOneSpan这个向pagecache申请获取对应页大小的span的函数。
如何计算页的起始地址?
我们可以用某个页号,通过页号来计算页的起始地址,比如:一页定义为2^13,也就是8kb,我们知道了页号为100,想得到他的起始地址就用100*一页的大小即可,也就是100*8*1024,既100<<PAGE_SHIFT。既:页号<<PAGE_SHIFT。 这样我们就可以知道起始地址以及结束地址,接着根据所需对象的大小就可以切分这个Span为一个一个的内存块了!这里的做法是尾插,为什么呢?这样子可以使得内存快之间物理是连续的,当我们把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。实现如下:
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 查看当前的spanlist中是否有还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
it = it->_next;
}
// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
// 走到这里没有空闲span了,只能找page cache要
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
PageCache::GetInstance()->_pageMtx.unlock();
// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
// 把大块内存切成自由链表链接起来
// 先切一块下来去做头,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}
NextObj(tail) = nullptr;
// 切好span以后,需要把span挂到桶里面去的时候,再加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
基本实现pagecache的大体框架以及最重要的内存申请
这里采用单例模式来构建pagecache,因为前面也提到了centralcache一样都是只存在一个的!解释一下其中比较重要的NewSpan函数,他是用于centralcache向pagecache申请span用的,根据要申请多少页大小的span来返回对应大小的span,起始就是检查对应大小的哈希桶中有无span,如果没有那么就往后遍历比原来要申请大的哈希桶结构,然后取出进行划分。如果后续整个结构都没有span可以划分则需要向系统也就是堆进行申请内存,那么我们一次申请的内存大小也是自定义的128页也就是8*128KB,通过前面实现定长内存池中使用过并封装好的SystemAlloc来申请。需要注意的是:我们在申请成功后,会返回所申请内存的首地址,那么我们需要根据这个首地址来构建页号!由于我们向堆申请内存时都是按页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。既:(PAGE_ID)ptr >> PAGE_SHIFT。
#pragma once
#include "Common.hpp"
#include "ObjectPool.hpp"
// 1.page cache是⼀个以⻚为单位的span⾃由链表
// 2.为了保证全局只有唯⼀的page cache,这个类被设计成了单例模式
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
PageCache PageCache::_sInst;
// 获取一个K页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
// 先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
return _spanLists[k].PopFront();
}
// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
// 在nSpan的头部切一个k页下来
// k页span返回
// nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
这块思考一个问题,也是对前面GetOneSpan解锁、加锁的解释:我们是否需要在centralcache向pagecache申请内存的时候将centralcache对应的哈希桶的锁给释放掉呢?答案是:建议释放的!虽然此时这个桶内已经不能申请内存了,但是centralcache除了内存申请外还有内存的释放啊!如果在向pagecache申请内存前将锁释放掉,那么其他线程在归还内存时就不会被阻塞,提升了效率!
内存的回收实现
前面所实现的实际上都是三层缓存机制中的申请内存部分的功能,实际上我们并没有将内存实际的释放,以及还给系统,上面实现到的效果如下:
实际上就只是在threadcache这一层将用不上的内存挂回对应的_freeLists上而已,这样做的结果就会导致threadcache上会有大量的内存块!对此,我们当一层一层的向上返回不用的内存!
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找对映射的自由链表桶,对象插入进去
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
}
thread cache内存回收
我们这样定义:如果threadcache某个桶当中自由链表的长度超过它一次批量向centralcache申请的对象个数,那么此时我们就要把该自由链表当中的这些对象还给centralcache。当然,实际的tcmalloc设计的会更加复杂,他会考虑内存碎片、实际挂载的长度、每个threadcache整体的大小等等。
threadcache新增模块如下:
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找对映射的自由链表桶,对象插入进去
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
需要注意的是:为了增加代码可修改性,我们在设计ListTooLong时,实际上是将threadcache中一次可得的最大链表数量的自由链表都还给centralcache了。但是,可以看到ReleaseListToSpans实际上还的还是指定大小的size。因为在某些情况下当自由链表过长时,我们可能并不一定想把链表中全部的对象都取出来还给centralcache。
后续需要调用centralcache中提供的ReleaseListToSpans接口进行从threadcache到centralcache的内存回收。
central cache内存回收
如果我们需要将threadcache中的内存块对象还给centralcache,这些对象会出现可能来自不同span的情况,因此当我们计算出还回来的对象应该还给centralcache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个span。 对此,我们可以使用unordered_map建立页号同span的映射关系,以此可以通过页号来找到对应的span,还记得之前我们是怎么获得页号的吗?通过右移PAGE_SHIFT位得到的!因此,我们当然也可以根据对象的地址反向操作找到该对象对应的span!
如下为在pagecache中新增加的unordered_map映射关系以及通过内存对象获取从对象到span的映射:
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
Span* NewSpan(size_t k);
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
当分配Span给centralcache时都应该进行映射!也就是主要在NewSpan这个模块里。对此,我们每当有新的span加入都需要进行映射操作;
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查一下后面的桶里面有没有span,如果有可以将其进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//尽量避免代码重复,递归调用自己
return NewSpan(k);
}
如下为获取从对象到span的映射的实现:
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else {
assert(false);
return nullptr;
}
}
实现ReleaseListToSpans,依次遍历返回来的对象,然后让他们插入对应的span中,注意管理span中_useCount的值,然后当_useCount减到0了那么就说明span的切分出去的所有小块内存都回来了,此时就需要把这个span还给pagecache了!
实现如下:
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//头插
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
// 说明span的切分出去的所有小块内存都回来了
// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并
if (span->_useCount == 0)
{
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
// 释放span给page cache时,使用page cache的锁就可以了
// 这时把桶锁解掉
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
特别注意,这个满足条件的span需要在双链表中被移除,然后,后续有些锁的操作:就是桶锁在释放span给page cache时可以先解锁给其他线程申请内存,然后使用pagecache的锁进行内存回收操作,注意这里调用了pagecache的ReleaseSpanToPageCache模块,从centralcache中将span回收到pagecache中。在进行完成操作后需要记得锁上桶锁,因为后面还有一个锁释放的操作。
page cache内存回收
当我们在前面获得从centralcache回收得到的span后,是否就是直接挂载到对应大小的哈希桶上就可以了?实际上,为了缓解内存中的外碎片问题,我们还需要对返回的span同其他同样无人使用的span进行合并的操作! 那么如何合并会比较好呢?当然是连续的空间进行合并比较好,而我们可以向前合并也可以向后合并。假设还回来的span页号为pageid,拥有的页数为n。在向前合并时,我们需要判断是否有pageid-1,然后就可以进行合并操作。如果是向后,则需要判断pageid+n,然后就可以进行合并操作。这些操作都是可以重复进行的,也就是说我们可以一直合并到不能合并为止。 但是!需要注意的是:当我们通过页号找到span时,我们是不知道span是否挂载在centralcache上的,如果在centralcache上则不能进行合并操作。因此,我们需要根据是否可用来确定是否可以合并!在合并的过程中,如果我们发现合并出来的内存大小太大了,我们的pagecache装不下,那么这种操作也是不被允许的!
对此,前面在对于Span类的设计中,我们有定义如下字段:
bool _isUse = false; //是否在被使用
这个字段就是用于判断是否span是否被使用的!对此分别需要在centralcache申请得到span时,将这个span的_isUse改为true,在centralcache还span时,将这个span的_isUse改为false。
如下为申请(既:_isUse改为true)时的修改:
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 查看当前的spanlist中是否有还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
// 走到这里没有空闲span了,只能找page cache要
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
PageCache::GetInstance()->_pageMtx.unlock();
// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
// 把大块内存切成自由链表链接起来
// 先切一块下来去做头,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}
// 切好span以后,需要把span挂到桶里面去的时候,再加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
我们在pagecache的合并过程中,实际上只需要知道一个span首页号以及尾页号即可得知是否可以合并了。如果向前合并,则只需要前一个span的尾页即可找到,向后也是同理通过后一个span的首页即可找到,在pagecache只需要span中首尾页之间的映射关系即可,不需要像centralcache一样每一个都要做到可以精确的查找。
如下为PageCache::NewSpan的修改,当将n页的span切成k页的span和n-k页的span时,除了为每个页与span建立映射外,还需要建立n-k页的span与其首尾页之间的映射。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查一下后面的桶里面有没有span,如果有可以将其进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//尽量避免代码重复,递归调用自己
return NewSpan(k);
}
而释放(既:_isUse改为false)放在了pagecache后续要实现的ReleaseSpanToPageCache模块中,下面为基本的实现:
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 对span前后的页,尝试进行合并,缓解内存碎片问题
while (1)
{
PAGE_ID prevId= span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
//前面的页号没有(还未向系统申请),停止向前合并
if (ret == _idSpanMap.end())
{
break;
}
//前面的页号对应的span正在被使用,停止向前合并
if (ret->second->_isUse == true)
{
break;
}
//合并出超过128页的span无法进行管理,停止向前合并
if (ret->second->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = ret->second->_pageId;
span->_n += ret->second->_n;
_spanLists[ret->second->_n].Erase(ret->second);
delete ret->second;
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
//后面的页号没有(还未向系统申请),停止向后合并
if (ret == _idSpanMap.end())
{
break;
}
//后面的页号对应的span正在被使用,停止向后合并
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
//合并出超过128页的span无法进行管理,停止向后合并
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
//进行向后合并
span->_n += nextSpan->_n;
//将nextSpan从对应的双链表中移除
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
//合并的span挂载到对应的双链表
_spanLists[span->_n].PushFront(span);
//建立首尾映射
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
span->_isUse = false;
}
** 感谢你耐心的看到这里ღ( ´・ᴗ・` )比心,如有哪里有错误请踢一脚作者o(╥﹏╥)o!**
![](https://img-blog.csdnimg.cn/a2296f4aa7fd45e9b1a1c44f9b8432a6.gif)
** 给个三连再走嘛~ **
版权归原作者 慕斯( ˘▽˘)っ 所有, 如有侵权,请联系我们删除。