0


【项目日记】仿mudou的高并发服务器 --- 实现缓冲区模块,通用类型Any模块,套接字模块

在这里插入图片描述

一个人知道自己为什么而活,
就可以忍受任何一种生活。
--- 尼采 ---


✨✨✨项目地址在这里 ✨✨✨

✨✨✨https://gitee.com/penggli_2_0/TcpServer✨✨✨


仿mudou的高并发服务器

1 主从Reactor模型

这个项目的目标是实现一个可以高效处理请求的服务器,那么对于这样的一个服务器要如何实现呢?

这里采取主从Reactor模型的方案:

  1. 主Reactor模型负责对监听套接字进行管理,进行获取连接操作。
  2. 从属Reactor负责对连接的数据进行处理,并且为了保证线程安全,每一个从属Reactor都要绑定一个线程,这个连接的任务都在这个线程中完成。

简单来说就是这样的一个模型:
在这里插入图片描述
其中管理连接的对象是Connection对象,这个类是高并发服务器的核心部分,这里包含了对套接字的处理,事件等待等一系列操作!而Connection对象中对于这些操作的管理也要通过其他的对象来进行!

主要的工作就是搭建起主从Reactor模型,但是这个模型不是一下子就可以写出来的。为了实现主从Reactor模型,我们先需要实现一些基础类,对基础功能进封装,然后对这些功能进行整合,最终实现主从Reactor模型

2 基础功能封装

2.1 缓冲区 Buffer模块

Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能:

  1. 需要支持字符串读取/写入
  2. 需要支持char*缓冲区读取/写入
  3. 需要正常按行读取 — 方便解析http请求

Buffer模块成员变量很简单:

  • vector<char>容器_buffer: 对内存空间进行管理
  • uint64_t _reader_idx 读偏移:进行读取位置在_buffer容器中的偏移量,即读取的起始位置。
  • uint64_t _writer_idx 写偏移:下一次写入位置在_buffer容器中的偏移量,即写入的起始位置。

接下来实现一下基础功能:

  1. 构造函数:初始化读/写偏移为0 ,容器_buffer初始化一个大小 BUFFER_DEFAULT_SIZE。
  2. 获取当前写入起始地址:_buffer空间的起始地址加入写偏移量即写入起始地址。
  3. 获取当前读取起始地址: _buffer空间的起始地址加入读偏移量即读取起始地址。
  4. 获取缓冲区末尾空闲空间大小:写偏移之后的空闲空间 ,总体空间大小减去写偏移就是写偏移之后的空间大小。
  5. 获取缓冲区起始空闲空间大小:读偏移之前的空闲空间,其实就是读偏移的大小。
  6. 获取可读数据大小:写偏移减去读偏移就就之间可读空间的大小!
  7. 读/写偏移向后移动: * 先根据len判断是否小于可读数据大小 len必须小于可读数据大小,然后移动读偏移。 * 向后移动必须小于当前后边的空闲空间大小,写入数据必须小于缓冲区剩余空间大小,不足就进行扩容!
  8. 确保可写空间足够 : * 末尾空闲空间足够 直接返回。 * 末尾空间不足,但算上起始位置的空闲空间大小足够 ,将数据移动到起始位置。 * 如果总空间不足 ,进行扩容,扩容到足够空间即可
  9. 写入数据:首先保证有足够空间,然后将数据数据拷贝进去。可以继续设计出针对string的写入、针对Buffer的写入以及写入 + 压入数据
  10. 读取数据:首先读取大小len必须小于可读取数据大小,然后拷贝数据出来。同样可以设计出针对string的读取、针对Buffer的读取以及读取+弹出数据
  11. 清空缓冲区:将偏移量归零即可!
  12. 读取一行数据:先找到换行符,然后进行读取。
// 缓冲区Buffer类classBuffer{private:
    std::vector<char> _buffer;uint64_t _reader_idx;uint64_t _writer_idx;private:char*Begin(){return&*_buffer.begin();}public:Buffer():_buffer(BUFFER_DEFAULT_SIZE),_reader_idx(0),_writer_idx(0){}// 获取当前写入起始地址char*WritePos(){returnBegin()+ _writer_idx;}// 获取当前读取起始地址char*ReadPos(){returnBegin()+ _reader_idx;}// 获取读取位置之前的空闲空间uint64_tHeadIdleSize(){return _reader_idx;}// 获取写入位置之后的空闲空间uint64_tTailIdleSize(){return _buffer.size()- _writer_idx;}// 获取可读数据大小uint64_tReadAbleSize(){return _writer_idx - _reader_idx;}// 读/写偏移移动voidMoveReadOffset(uint64_t len){if(len <=0)return;assert(len <=ReadAbleSize());
        _reader_idx += len;}voidMoveWriteOffset(uint64_t len){if(len <=0)return;assert(len <=TailIdleSize());
        _writer_idx += len;}// 确保可写空间足够 --- 整体空闲空间足够了就移动数据,否则进行扩容voidEnsureWriteSpace(uint64_t len){// 当len小于写入位置之后的空闲空间 直接进行写入if(len <=TailIdleSize()){return;}// len 小于总共的空闲空间elseif(len <=TailIdleSize()+HeadIdleSize()){// 记录总共的数据uint64_t sz =ReadAbleSize();// 移动数据
            std::copy(ReadPos(),ReadPos()+ sz,Begin());// 更新写入读取位置
            _reader_idx =0;
            _writer_idx = sz;}// 需要扩容else{// 在写入位置之后扩充len个大小
            _buffer.resize(_writer_idx + len);}}// ----------写入数据------------voidWrite(constvoid*data,uint64_t len){// 写入的数据不能超过可写的总空间// 确保可以正常写入EnsureWriteSpace(len);// 进行拷贝constchar*d =reinterpret_cast<constchar*>(data);
        std::copy(d, d + len,WritePos());}// 写入BuffervoidWriteBuffer(Buffer &buffer){// 直接调用WritereturnWrite(buffer.ReadPos(), buffer.ReadAbleSize());}// 写入字符串voidWriteString(const std::string &str){// 直接调用WritereturnWrite(&str[0], str.size());}// 写入 + 偏移voidWriteAndPush(constvoid*data,uint64_t len){if(len <=0)return;// 进行写入Write(data, len);// 更新写入偏移量MoveWriteOffset(len);}voidWriteStringAndPush(const std::string &str){// 直接调用WriteWriteString(str);LOG(DEBUG,"%s\n",WritePos());// 更新偏移量MoveWriteOffset(str.size());LOG(DEBUG,"当前可读数据大小:%ld\n",ReadAbleSize());}voidWriteBufferAndPush(Buffer &buffer){// 直接调用WriteWrite(buffer.ReadPos(), buffer.ReadAbleSize());MoveWriteOffset(buffer.ReadAbleSize());}//---------读取数据----------voidRead(void*buf,uint64_t len){// len 必须小于可读数据大小assert(len <=ReadAbleSize());// 进行拷贝
        std::copy(ReadPos(),ReadPos()+ len,(char*)buf);}// 读取字符串
    std::string ReadAsString(uint64_t len){// len 必须小于可读数据大小assert(len <=ReadAbleSize());
        std::string str;
        str.resize(len);// 直接调用ReadRead(&str[0], len);return str;}// 读取 + PopvoidReadAndPop(void*buf,uint64_t len){// 进行读取Read(buf, len);// 更新读取偏移量MoveReadOffset(len);}
    std::string ReadAsStringAndPop(uint64_t len){// len 必须小于可读数据大小assert(len <=ReadAbleSize());// 进行读取
        std::string str =ReadAsString(len);// 更新偏移量MoveReadOffset(len);return str;}//-----------读取一行数据--------------char*FindCRLF(){return(char*)memchr(ReadPos(),'\n',ReadAbleSize());}
    std::string GetLine(){// 先寻找换行符char*pos =FindCRLF();if(pos ==nullptr)return"";// 读取要带走'\n'
        std::string str =ReadAsString(pos -ReadPos()+1);return str;}
    std::string GetLineAndPop(){
        std::string str =GetLine();MoveReadOffset(str.size());return str;}// 清除数据voidClear(){// 偏移量归零即可
        _reader_idx =0;
        _writer_idx =0;}};

2.2 通用类型 Any类

每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有明显的协议倾向,它可以是任意协议的上下文信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构

  1. 一个连接必须拥有一个请求接收与解析的上下文!
  2. 上下文的类型或者结构不能固定!因为服务器的协议支持的协议很多,不同的协议,可能都有不同的上下文结构!

所以必须拥有一个容器,能够保存各种不同的类型!那么就要实现一个any类

  1. 假如使用模版类方法,那么实例化对象的时候一定要指明容器保存的数据类型!而我们需要的是any可以接收任意类型Any a ; a = 10 ; a = "abc"!
  2. 但是可以嵌套一下,在Any类中设计一个类,专门用于保存各种类型的数据,而Any类保存的是固定类的对象。
  3. 对于这个固定类依旧不能使用模版。但这里这里可以采用多态,设计一个子类,这是一个模版类。 这样可以通过父类指针读取子类数据!
  4. Any类中,保存的是holder类的指针,当Any类需要保存一个数据时,只需要提供placeholder子类实例化一个 特定类型的对象出来,让子类对象保存数据!
// 通用类型 Any类classAny{private:classholder{public:holder(){}virtual~holder(){};virtualconst std::type_info &type()=0;virtual holder *clone()=0;};template<classT>classplaceholder:publicholder{public:placeholder(const T &val):_val(val){}virtualconst std::type_info &type() override
        {returntypeid(T);}virtual holder *clone() override
        {returnnewplaceholder(_val);}

        T _val;};// 通过这个父类指针访问子类的成员
    holder *_content =nullptr;public:Any(){}template<classT>// 拷贝构造Any(const T &val):_content(new placeholder<T>(val)){}// 拷贝构造Any(const Any &other):_content(other._content ? other._content->clone():nullptr){}// 交换数据
    Any &swap(Any &other){
        std::swap(_content, other._content);return*this;}// 赋值重载
    Any &operator=(Any &other){// 根据 other建立一个临时对象 ,进行资源的交换Any(other).swap(*this);return*this;}template<classT>
    Any &operator=(const T &val){// 根据 val 建立一个临时对象,进行资源的交换Any(val).swap(*this);return*this;}template<classT>
    T *Get(){if(typeid(T)!= _content->type())returnnullptr;return&((reinterpret_cast<placeholder<T>*>(_content))->_val);}~Any(){delete _content;}};

2.3 套接字 Socket模块

Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。是连接模块Connection 与监听模块Accpter的基础!对于套接字的操作我们已经在熟悉不过了:

  1. 构造函数 析构函数
  2. 创建套接字 bool Create()
  3. 绑定地址信息
  4. 开始监听
  5. 向服务器发起连接
  6. 获取新连接
  7. 接收数据
  8. 发送数据
  9. 关闭套接字
  10. 创建一个服务端连接:首先创建套接字,然后将进程绑定地址信息,开启监听状态,注意设置为非阻塞读取(为了配合多路转接IO,不需要IO接口进行等待,一切等待都由多路转接负责);同时启动地址重用 ,保护客户端
  11. 创建一个客户端连接:创建套接字,连接服务器。
  12. 设置套接字选项 — 开启地址端口重用
  13. 设置套接字阻塞属性 — 设置为非阻塞读取
// 套接字Socket类classSocket{private:int _sockfd;// 套接字文件描述符private:// 创建套接字boolCreate(){// int socket(int domain, int type, int protocol);// IPV4     数据流IO
        _sockfd =::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if(_sockfd <0){LOG(ERROR,"create socket failed!\n");returnfalse;}LOG(INFO,"Sockfd:%d create success \n", _sockfd);returntrue;}// 绑定地址信息boolbind(const std::string &ip,uint16_t port){structsockaddr_in addr;
        addr.sin_family = AF_INET;// 使用IPv4版本地址
        addr.sin_port =htons(port);// 主机端口转网络端口
        addr.sin_addr.s_addr =inet_addr(ip.c_str());// 将ip字符串转化为网络ip// 进行绑定
        socklen_t len =sizeof(structsockaddr_in);int n =::bind(_sockfd,(structsockaddr*)&addr, len);if(n <0){LOG(ERROR,"bind failed!\n");returnfalse;}returntrue;}// 建立监听套接字boolListen(int backlog = MAX_LIETENSIZE){//                        全连接队列的大小int n =::listen(_sockfd, backlog);if(n <0){LOG(ERROR,"listen failed!\n");returnfalse;}returntrue;}// 向服务器发起连接boolConnect(const std::string &ip,uint16_t port){structsockaddr_in addr;
        addr.sin_family = AF_INET;// 使用IPv4版本地址
        addr.sin_port =htons(port);// 主机端口转网络端口
        addr.sin_addr.s_addr =inet_addr(ip.c_str());// 将ip字符串转化为网络ip// 进行绑定
        socklen_t len =sizeof(structsockaddr_in);int n =::connect(_sockfd,(structsockaddr*)&addr, len);if(n <0){LOG(ERROR,"connect failed!\n");returnfalse;}returntrue;}// 非阻塞启动地址重用voidReuseAddress(uint16_t port,const std::string &ip){int val =1;
        socklen_t len =sizeof(int);// 将端口号设置为可重用if(setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT,(void*)&val, len)==-1){throw std::runtime_error("Failed to set SO_REUSEPORT");}// 将IP地址设置为可重用if(setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,(void*)&val, len)==-1){throw std::runtime_error("Failed to set SO_REUSEADDR");}}voidNonBlock(){int flag =::fcntl(_sockfd, F_GETFL,0);::fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}public:// 构造函数Socket():_sockfd(-1){};Socket(int sockfd):_sockfd(sockfd){}// 析构函数~Socket(){Close();}// 返回套接字文件描述符intSockfd(){return _sockfd;}// 接收数据
    ssize_t Recv(void*buf, size_t len,int flag =0){
        ssize_t n =::recv(_sockfd, buf, len, flag);if(n <=0){// 被信号中断         没有数据了- 非阻塞读取完毕if(errno == EINTR || errno == EAGAIN)return0;// 没有读取到数据LOG(ERROR,"recv failed!\n");return-1;}return n;// 实际发送的长度}
    ssize_t NonBlockRecv(void*buf, size_t len){//       MSG_DONTWAIT  非阻塞式读取if(len <=0)return0;returnRecv(buf, len, MSG_DONTWAIT);}// 发送数据
    ssize_t Send(void*buf, size_t len,int flag =0){if(len <=0)return0;
        ssize_t n =::send(_sockfd, buf, len, flag);if(n <=0){// 被信号中断         没有数据了- 非阻塞读取完毕if(errno == EINTR || errno == EAGAIN)return0;// 没有读取到数据LOG(ERROR,"send failed!\n");return-1;}LOG(DEBUG,"Send :%s\n",(char*)buf);return n;// 实际发送的数据大小}
    ssize_t NonBlockSend(void*buf, size_t len){//       MSG_DONTWAIT  非阻塞式发送if(len <=0)return0;returnSend(buf, len, MSG_DONTWAIT);}// 创建服务端套接字boolCreateServer(uint16_t port,const std::string &ip ="0.0.0.0",bool flag =0){// 创建套接字if(Create()==false)returnfalse;// 设置为非阻塞if(flag)NonBlock();// 将地址与端口设置为可重用ReuseAddress(port, ip);// 绑定地址信息if(bind(ip, port)==false)returnfalse;// 进行监听if(Listen()==false)returnfalse;returntrue;}// 创建客户端套接字boolCreateClient(uint16_t port,const std::string &ip){// 创建套接字if(Create()==false)returnfalse;// 连接服务器if(Connect(ip, port)==false)returnfalse;returntrue;}// 服务器获取连接intAccept(){int newfd =::accept(_sockfd,nullptr,nullptr);if(newfd <0){LOG(ERROR,"accept failed!\n");return-1;}return newfd;}//  关闭套接字voidClose(){LOG(INFO,"Close sockfd: %d\n", _sockfd);if(_sockfd !=-1){::close(_sockfd);}}};
标签: 服务器 运维 网络

本文转载自: https://blog.csdn.net/JLX_1/article/details/143987150
版权归原作者 叫我龙翔 所有, 如有侵权,请联系我们删除。

“【项目日记】仿mudou的高并发服务器 --- 实现缓冲区模块,通用类型Any模块,套接字模块”的评论:

还没有评论