一个人知道自己为什么而活,
就可以忍受任何一种生活。
--- 尼采 ---
✨✨✨项目地址在这里 ✨✨✨
仿mudou的高并发服务器
1 高并发服务器
实现高并发服务器的基础是实现基于事件触发的Reactor模型,通过Reactor模型对事件进行统一管理。对此我们需要设计:
- 事件管理模块:只负责事件的管理,及时更新对应描述符的监控事件集,以及获取就绪事件集
- 多路转接模块:这个模块是对Epoll多路转接模型的封装,封装必要接口,方便使用。通过Channel获取监控事件集,放入epoll模型中进行监控。最终通过事件循环Poll 方法实时获取就绪活跃Channel。
- 反应堆模块:这个模块是对基于事件触发的Reactor模型,对所有描述符进行事件监控。内部封装Poller和定时器时间轮。根据就绪事件集执行任务池方法。同时监控定时器任务,定期执行定时任务!
- 连接模块:这是该项目中最重要的一个模块!该模块就是对连接进行全方位的管理,对上面的模块进行整合,通信连接的所有操作都是通过这个模块进行!对该连接描述符的回调函数都在这里进行设置,同时也负责如何处理连接数据。
- 监听模块:专门对监听套接字进行管理的类,本质就是简易的Connection,其中负责连接套接字的数据处理!
2 事件管理 Channel模块
- 成员变量: - 当前需要监控的事件集- 当前连接触发的事件集- 绑定的EventLoop对象,这是管理当前连接的对象。Channel只是对事件进行管理- 可读事件触发的回调函数- 可写事件触发的回调函数- 错误事件触发的回调函数- 断开事件触发的回调函数- 任意事件触发的回调函数
- 成员函数: - 设置Revents函数- 检查当前是否可读- 检查当前是否可写- 启动/关闭可写监控- 启动/关闭可读监控- 关闭所有事件的监控- 移除监控- 更新事件监控:将新的事件集更新到EventLoop中,进行监控- 设置回调函数- HandleEvent用来进行事件处理!
// Channel类管理事件集classPoller;classEventLoop;classChannel{private:int _fd;// Poller *_poller;
EventLoop *_loop;uint32_t _events;// 需要监控的事件集uint32_t _revents;// 当前连接就绪的事件集using EventCallBack = std::function<void()>;// 五个回调函数
EventCallBack _read_cb;// 可读事件回调函数
EventCallBack _write_cb;// 可写事件回调函数
EventCallBack _close_cb;// 连接断开事件回调函数
EventCallBack _error_cb;// 错误事件回调函数
EventCallBack _event_cb;// 任意事件回调函数public:Channel(EventLoop *loop,int fd):_fd(fd),_loop(loop),_events(0),_revents(0){}// 设置回调函数voidSetReadCallBack(const EventCallBack &cb){ _read_cb = cb;}voidSetWriteCallBack(const EventCallBack &cb){ _write_cb = cb;}voidSetCloseCallBack(const EventCallBack &cb){ _close_cb = cb;}voidSetErrorCallBack(const EventCallBack &cb){ _error_cb = cb;}voidSetEventCallBack(const EventCallBack &cb){ _event_cb = cb;}// 设置Revents函数voidSetRevents(uint32_t events){// LOG(DEBUG, "事件更新revents:%d\n", events);
_revents = events;}// 返回需要监控的事件集uint32_tEvents(){return _events;}intFd(){return _fd;}// 检查当前是否可读boolReadable(){return(_events & EPOLLIN);}// 检查当前是否可写boolWriteable(){return(_events & EPOLLOUT);}// 启动/关闭可写监控voidEnableRead(){LOG(DEBUG,"fd:%d 加入EPOLLIN监控\n", _fd);
_events |= EPOLLIN;Update();}voidDisableRead(){
_events &=~EPOLLIN;Update();}// 启动/关闭可读监控voidEnableWrite(){
_events |= EPOLLOUT;Update();}voidDisableWrite(){
_events &=~EPOLLOUT;Update();}// 关闭所有事件的监控voidDisableAll(){
_events =0;Update();}// 移除监控 --- 涉及poller 要在poller之后进行实现voidRemove();// 更新Channel的事件监控voidUpdate();// HandleEvent用来进行事件处理!voidHandleEvent(){// 根据revents判断需要执行哪些回调函数// 可读事件 半关闭连接 带外数据 紧急数据if((_revents & EPOLLIN)||(_revents & EPOLLRDHUP)||(_revents & EPOLLPRI)){if(_read_cb)_read_cb();// 不管任何事件都要调用的回调函数!if(_event_cb)_event_cb();// 放到事件处理完之后调用 刷新活跃度}// --- 有可能释放连接的操作事件 , 一次只处理一个!---// 可写事件if(_revents & EPOLLOUT){if(_write_cb)_write_cb();// 不管任何事件都要调用的回调函数!if(_event_cb)_event_cb();// 放到事件处理完之后调用 刷新活跃度}// 错误事件elseif(_revents & EPOLLERR){// 不管任何事件都要调用的回调函数!if(_event_cb)_event_cb();// _event_cb必须放在前面 ,因为出错就会释放连接if(_error_cb)_error_cb();}// 连接关闭elseif(_revents & EPOLLHUP){if(_event_cb)_event_cb();// _event_cb必须放在前面 ,因为出错就会释放连接if(_close_cb)_close_cb();}}};//...//...voidChannel::Remove(){return _loop->RemoveEvent(this);}// 更新Channel的事件监控voidChannel::Update(){return _loop->UpdateEvent(this);}
3 多路转接 Poller模块
Poller模块是对描述符IO进行监控的模块。
其中对多路转接的接口进行封装:
- 添加/修改描述符的事件监控,存在就修改,不存在就添加!
- 移除描述符的事件监控
所以:
- 必须要有一个epoll模型
- 一个
struct epoll_event
数组,监控时获取所有的活跃事件 - 使用哈希表管理描述符与事件管理的Channel对象。
运行逻辑为:
- 对描述符进行监控,通过Channel才能知道描述符需要监控什么事件
- 当描述符就绪了,通过描述符在hash表中找到对应的Channel,只有找到了Channel才知道要调用什么回调函数!
这里最核心的部分:事件循环Poll 方法。
Poll 方法是事件循环的核心,它调用
epoll_wait
阻塞等待事件发生。当有事件发生时,
epoll_wait
返回,
Poll
方法遍历返回的事件列表
_evs
,根据事件对应的文件描述符在
_event_channels
中找到对应的
Channel
对象,设置事件类型,并将其加入到活跃事件列表
active
中,以供上层进行后续处理。
// 多路转接方法Poller#defineMAX_POLLERSIZE1024classPoller{private:int _epfd;// Epoll模型structepoll_event _evs[MAX_POLLERSIZE];
std::unordered_map<int, Channel *> _event_channels;// fd与Channel的映射表private:voidUpdate(Channel *channel,int op){// 根据channel初始化structepoll_event ev;
ev.data.fd = channel->Fd();
ev.events = channel->Events();int ret =::epoll_ctl(_epfd, op, channel->Fd(),&ev);if(ret <0){LOG(ERROR,"epoll_ctl failed!\n");::abort();// 直接退出程序}return;}boolHasChannel(Channel *channel){auto it = _event_channels.find(channel->Fd());if(it == _event_channels.end()){returnfalse;}returntrue;}public:// 构造函数Poller(){
_epfd =::epoll_create(MAX_POLLERSIZE);if(_epfd <0){LOG(FATAL,"epoll_create failed!\n");::abort();}}// 更新EventvoidUpdateEvent(Channel *channel){// 先判断channel是否在channels中if(HasChannel(channel)==false){// 先建立托管//_event_channels[channel->Fd()] = channel;LOG(INFO,"Poller 加入新的fd事件托管 fd:%d\n", channel->Fd());
_event_channels.insert(std::make_pair(channel->Fd(), channel));// 进行添加returnUpdate(channel, EPOLL_CTL_ADD);}else{// 进行更新LOG(INFO,"Poller 加入事件托管 fd:%d\n", channel->Fd());returnUpdate(channel, EPOLL_CTL_MOD);}}// 移除EventvoidRemoveEvent(Channel *channel){// 先判断channel是否在channels中if(HasChannel(channel)==false){// 不在_event_channel里直接返回!return;}// 进行移除
_event_channels.erase(channel->Fd());Update(channel, EPOLL_CTL_DEL);}// 开始监控事件voidPoll(std::vector<Channel *>*active){// 进行监控// 阻塞式等待int nfds =::epoll_wait(_epfd, _evs, MAX_POLLERSIZE,-1);if(nfds <0){if(errno == EINTR)return;LOG(ERROR,"epoll_wait error:%s",strerror(errno));::abort();}// 对evs中的事件进行处理// LOG(DEBUG, "Poll 获取到新事件 n:%d\n", nfds);for(int i =0; i < nfds; i++){auto it = _event_channels.find(_evs[i].data.fd);// 判断是否存在 不存在直接返回assert(it != _event_channels.end());// LOG(DEBUG, "channel : %d _evs[i].events:%d\n", _evs[i].data.fd, _evs[i].events);// 进行调用
it->second->SetRevents(_evs[i].events);// 进行处理// it->second->HandleEvent();// 处理结束放入活跃队列
active->push_back(it->second);}}};
4 反应堆 EventLoop模块
EventLoop模块是管理事件监控管理的模块,就是Reactor反应堆模型。该模块与线程一一对应关联!
监控一个连接,这个连接一旦就绪,就要进行处理!如果这个连接描述符在多个线程中都触发了事件,就会存在线程安全问题!因此我们需要将一个连接的事件监控,以及连接事件处理和其他操作都放在同一个线程中进行处理!
后续如果接入了线程池,那么如何保证一个连接的所有操作都在EventLoop所在线程中?在EventLoop()中,添加一个任务队列。对连接的所有操作,都进行一次封装,对连接的操作要当做任务放入任务队列
事件监控 -> 事件处理(放入队列) -> 执行任务
这样可以保证对于连接的所有操作都是在一个线程中执行的,不涉及线程安全问题,是对于任务队列的操作有线程安全问题!只需要给task的操作加一把锁即可!
EventLoop处理流程
- 在线程中对描述符进行事件监控
- 有描述符就绪则对描述符进行事件处理,要保证处理回调函数的操作都在同一个线程中
- 所以的就绪事件处理完了,这时候再去将任务队列中的的任务进行执行
成员变量
- 线程ID :当事件就绪时,需要处理时,处理过程中,如果连接要进行某种操作,这些操作必须在
EventLoop
对应线程中执行,保证对连接的各项操作都是线程安全的,可以根据ID
判断所执行的操作是不是在所属线程中,如果在当前线程中可以直接执行;如果执行的操作不再线程中,才需要加入到任务池中,等待事件处理完然后执行任务 - Poller模型:对连接描述符进行监控
- 任务队列:执行任务
- 事件通知eventfd:本质是一个计算器,每写入一次就就绪了一次。有可能等待描述符IO事件就绪,执行流阻塞,这时候任务队列将得不到执行,所以需要eventfd来唤醒事件监控的阻塞
- _eventfd的管理Channel:这里使用智能指针!
- 线程互斥锁:保证线程安全
- TimeQueue时间轮模块:添加定时任务,负责对超时非活跃连接进行销毁
成员函数:
- 构造函数: 初始化成员变量 ,设置Channel的回调函数ReadEvent,启动可读事件监控
- IsInLoop函数:判断当前线程是否是EventLoop对应线程
- 执行任务RunInLoop函数:判断将要执行的任务是否处于当前线程,如果是直接执行回调;否则压入任务池中!这个接口是提供给外部的!
- 将操作压入任务池 QueueInLoop函数:将操作压入任务池,注意要进行上锁!
- 唤醒函数WakeUpEventfd:有可能因为没有事件就绪导致的epoll阻塞!进行唤醒
- 添加/修改监控: 更新对应Channel的事件监控。
- 移除监控:移除对应Channel的事件监控。
- 启动函数:进行事件监控,通过Poller 获取活跃连接;根据活跃连接进行事件处理, 遍历活跃连接,进行事件回调。然后执行所有任务。
- 执行所有任务RunAllTask:将任务池的任务置换出来,然后进行执行,注意加锁!
- 针对_eventfd的处理 : - 创建 int CreateEventfd () 标志位 EFD_CLOEXEC | EFD_NONBLOCK- 读取 void ReadEventfd 获取eventfd读取次- 唤醒 WakeUpEventfd 向Event中写入一个数据
timewheel 与 EventLoop 模块整合操作:
- 通过timefd设置定时器,内核会定时向文件描述符写入,触发超时事件
- 时间轮timewheel:实现每次执行RunTimeTask,都可以执行一波到期的任务!
要实现一个完整的秒级定时器,就需要将这两个功能整合到一起:
- timefd设置为每秒触发一次定时事件,当事件触发,则运行一次timewheel的runtimertask,执行一下所有的过期任务
- timefd的事件的监控与触发,可以融合EventLoop来进行实现!
修改时间轮timewheel模块:
- 增加定时器 _timerfd:设置读回调函数 并放入EvnetLoop中启动监控
- 对_timerfd进行管理的 _timer_channel:使用智能指针进行管理!
- 增添EventLoop:绑定对应的EventLoop!
- 创建Createtimefd接口 :设置为超时时间为 1 S
- ReadTimerfd接口:读取timerfd的数据
- Ontime接口:触发超时事件,进行读取timerfd , 进行超时任务的处理RunTimetask
- 添加/刷新/取消定时任务 :这三个接口设计了_timers的操作,会有线程安全问题!必须在EventLoop所在线程执行,都要通过EventLoop进行 RunInLoop操作!
/ EventLoop类 --- Reactor反应堆模型,管理监控连接事件
using Functor = std::function<void()>;classEventLoop{private:
std::thread::id _event_id;// 线程IDint _eventfd;// eventfd 用于通知事件
std::unique_ptr<Channel> _event_channel;// 管理Event事件的Channel对象
Poller _poller;// epoll模型
std::vector<Functor> _tasks;// 任务池
std::mutex _mtx;// 互斥锁保护线程// 时间轮
TimeWheel _timer_wheel;private:intCreateEventfd(){// 禁止进程复制 启动非阻塞读取int efd =::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd <0){LOG(ERROR,"Eventfd Create failed!\n");return-1;}return efd;}voidReadEvent(){// 从eventfd中读取数据// 注意每次读取的都是一个8字节数据uint64_t ret;int n =::read(_eventfd,&ret,sizeof(uint64_t));if(n <=0){// 被信号打断 表示没有数据if(errno == EINTR || errno == EAGAIN)return;LOG(ERROR,"eventfd recv failed!\n");::abort();}// 读取数据执行任务return;}voidWeakUpEventfd(){uint64_t val =1;int n =::write(_eventfd,&val,sizeof(uint64_t));if(n <=0){if(errno == EINTR)return;LOG(ERROR,"eventfd send failed!\n");::abort();}// 读取数据执行任务return;}public:EventLoop():_event_id(std::this_thread::get_id()),_eventfd(CreateEventfd()),_event_channel(newChannel(this, _eventfd)),_timer_wheel(this){//_poller = Poller();// 设置Eventfd的读回调函数
_event_channel->SetReadCallBack(std::bind(&EventLoop::ReadEvent,this));// 设置读事件监控
_event_channel->EnableRead();LOG(DEBUG,"EventLoop 构造完成\n");}boolIsInLoop(){return(_event_id == std::this_thread::get_id());}voidAssertInLoop(){returnassert(_event_id == std::this_thread::get_id());}voidRunInLoop(const Functor &cb){if(IsInLoop())returncb();// 否则压入任务池returnQueueLoop(cb);}voidQueueLoop(const Functor &cb){{
std::unique_lock<std::mutex>lock(_mtx);
_tasks.push_back(cb);}// 唤醒有可能因为没有事件就绪导致的epoll阻塞!WeakUpEventfd();}// 添加/修改监控voidUpdateEvent(Channel *channel){return _poller.UpdateEvent(channel);}// 移除监控voidRemoveEvent(Channel *channel){return _poller.RemoveEvent(channel);}voidRunAllTask(){
std::vector<Functor> tasks;{// 进行上锁
std::unique_lock<std::mutex>lock(_mtx);
_tasks.swap(tasks);}// 进行执行函数for(auto&f : tasks){f();}return;}// 开始监控函数voidStart(){while(1){
std::vector<Channel *> actives;
_poller.Poll(&actives);// 事件处理(放入队列) 遍历活跃连接,进行事件回调for(auto&channel : actives){
channel->HandleEvent();}// 执行任务 执行任务RunAllTaskRunAllTask();}}// 增加时间轮系列接口voidTimerAdd(uint64_t id,int delay, Task_t cb){ _timer_wheel.TimerAdd(id, delay, cb);}voidTimerRefresh(uint64_t id){ _timer_wheel.TimerRefresh(id);}voidTimerCancel(uint64_t id){ _timer_wheel.TimerCancel(id);}boolHasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}};
5 核心模块 - 连接Connection模块
这是该项目中最重要的一个模块!
该模块就是对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块进行!
需要管理:
- 套接字的管理,可以进行套接字操作
- 连接事件的管理,可读,可写,错误,挂断,任意事件
- 缓冲区的管理,从Socket读取/发送数据 需要经过缓冲区,便于Socket数据的接收与发送
- 协议上下文的管理,记录请求数据的处理过程
- 因为连接接收到数据之后要如何处理,需要用户决定,因此必须需要业务处理回调函数!
- 一个连接建立成功之后,该如何处理,由用户决定!因此必须有连接建立成功的回调函数!
- 同样关闭前,需要如何处理,也由用户决定,因此必须由关闭连接回调函数。
- 任意事件的产生,需不需要某种处理,由用户决定,因此必须由任意事件的回调函数!
提供功能:
- 发送数据 — 给用户提供的发送数据接口,只是将数据拷贝到发送缓冲区,然后启动写事件监控
- 关闭连接 — 给用户提供的关闭连接接口,应该在实际释放连接之前,查看输入输出缓冲区是否有数据待处理‘
- 超时管理 — 通过给用户接口,用来 启动/取消 超时销毁功能
- 协议切换 — 一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理函数
Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的!
当这样的一个场景中:对连接进行操作的时候,但是连接已经被释放了,导致内存访问错误,程序崩溃!
对于这个场景的解决方案:使智能指针share_ptr对Connection进行管理,只有计数为0时才会真正释放!
Connection要继承
enable_shared_from_this<Connection>
,这样可以方便的快速获取当前对象的
shared_ptr
指针。
成员变量:
- 连接ID :uint64_t _conn_id :便于连接的管理和查找
- 套接字描述符:int sockfd :连接关联的文件描述符
- 套接字对象:Socket _socket:套接字操作管理模块
- 事件管理: Channel _channel:连接事件管理模块
- 输入输出缓冲区:- Buffer _in_buffer; 存放Socket中读取的数据- Buffer _out_buffer; 存放要发送给对端的数据
- 通用数据类型上下文: Any _context;
- 连接状态:ConnStatu _statu; DISCONNECTED 未连接 ; CONNECTING 连接建立成功 待处理状态 ;CONNECTED 连接建立完成 可以通信; DISCONNETCING 待关闭。
- 5种回调函数 :- 连接回调函数类型为 std::function<void(const PtrConn&)>;- 处理时回调函数 std::function<void(const PtrConn& , Buffer*)>;- 关闭阶段的回调 std::function<void(const PtrConn&)>;- 还需要组件内的连接关闭回调 因为服务器组件内会把所有的连接管理起来 一旦某个连接关闭 就应该从管理的地方移除自己的信息!- 任意事件触发的回调 std::function<void(const PtrConn&)>;
- 超时销毁标志位 bool _enable_inactive_release: 判断是否启动非活跃销毁的标志,默认是false
- 定时器ID :可以直接简化为使用_conn_id。
- Reactor模型 EventLoop _loop*: 连接所关联的EventLoop。
成员函数:
- 线程内发送数据接口: void SendInLoop(char* data , size_t len)
- 线程内关闭连接接口:void ShutdownInLoop(),这个关闭操作不是真正的关闭函数 而是判断是否还有时间要进行处理
- 线程内开启超时销毁接口:void EnableInactiveReleaseInLoop(int sec)
- 线程内取消超时销毁接口:void CancelInactiveReleaseInLoop()
- 线程内设置回调函数接口:void UpgradeInLoop(Any Context , const ConnectionCallback …)
- 实际的释放接口:void ReleaseInLoop()
- 初始化设置接口:EstablishedInLoop() 连接获取之后所处的状态下要进行各种设置 EstablishedInLoop() 给Channel设置事件回调 启动读监控
- 4个Channel回调函数 : - HandleRead :接收Socket数据放到接收缓冲区中 使用非阻塞读取,读取出错调用ShutdownInLoop 检查缓冲区再进行关闭,将读取到的数据写入输入缓冲区,然后根据输入缓冲区数据大小调用_message_callback回调进行业务处理- HandleSend: 描述符可写事件就绪后要调用的函数,将发送缓冲区的数据发送,_out_buffer中保存的就是要发送的数据,根据返回值进行处理,发送错误就要关闭连接了 关闭之前如果还有数据需要进行处理 ,千万不能忘记将读偏移向后移动!如果现在的状态是待关闭状态,则有数据发送完之后释放连接 ,没有数据就直接关闭,输出缓冲区没有数据了就不用再监控写事件了!
- HandleClose:关闭连接
- HandleError:出现错误,直接关闭连接
- HandleEvent:刷新定时销毁任务活跃度
用户接口:
- 构造函数、析构函数
- 获取Id
- 获取描述符fd
- 是否处于连接状态 bool ConnStatu()
- 设置上下文 void SetContext(const Any & context)
- 获取上下文 Any* GetContext()
- 设置4个回调函数 SetConnectCB(const ConnectionCallback& cb)…
- 发送数据 void Send(char* data , size_t len) 将数据放到发送缓冲区,启动写事件监控
- 关闭连接 void Shutdown() 提供给用户的关闭,不是真正的关闭连接 , 需要判断有没有数据待处理
- 取消超时销毁 void CancelInactiveRelease()
- 协议切换 Upgrade(Any Context , const ConnectionCallback …) 切换协议 — 重置上下文以及阶段性处理函数
- 进行channel回调设置 Established() 启动读监控
classConnection;// 核心模块 --- Connection类typedefenum{
DISCONNECTED =0,
CONNECTING,
CONNECTED,
DISCONNETCING
} ConnStatu;using PtrConn = std::shared_ptr<Connection>;classConnection:public std::enable_shared_from_this<Connection>{private:uint64_t _conn_id;// connection连接ID
Socket _socket;// 管理的套接字int _sockfd;// 套接字fd
EventLoop *_loop;// connection连接关联的EventLoop对象
Any _context;// 上下文数据
Channel _channel;// 管理连接事件
Buffer _in_buffer;// 输入缓冲区 存放Socket中读取的数据
Buffer _out_buffer;// 输出缓冲区 存放要发送给对端的数据bool _enable_active_release;// 是否开启超时销毁 默认是false
ConnStatu _statu;// Connection连接状态// 5 个 回调函数 --- 注意使用智能指针 防止在执行任务之前Connection销毁using ConnectedCallBack = std::function<void(const PtrConn &)>;// 连接时进行的回调函数using MessageCallBack = std::function<void(const PtrConn &, Buffer *)>;// 处理数据时的回调函数using ClosedCallBack = std::function<void(const PtrConn &)>;// 关闭连接时的回调函数using AnyEventCallBack = std::function<void(const PtrConn &)>;// 处理任意事件时的回调函数
ConnectedCallBack _conn_cb;// 连接回调函数类型
MessageCallBack _message_cb;// 处理时回调函数
ClosedCallBack _closed_cb;// 关闭阶段的回调
AnyEventCallBack _event_cb;// 任意事件触发的回调// 还需要组件内的连接关闭回调 因为服务器组件内会把所有的连接管理起来 一旦某个连接关闭 就应该从管理的地方移除自己的信息!
ClosedCallBack _event_closed_cb;private:// 读事件触发的函数voidHandleRead(){// 接收Socket数据放到接收缓冲区中// LOG(DEBUG, "HandleRead\n");char buf[65536]={0};int ret = _socket.NonBlockRecv(buf,65536);// 返回值 为 - 1说明读取错误if(ret <0){returnShutdownInLoop();}// 返回值为0说明没读取到数据// 将数据写入到缓冲区
_in_buffer.WriteAndPush(buf, ret);// 然后调用_message_callback回调if(_in_buffer.ReadAbleSize()>0){return_message_cb(shared_from_this(),&_in_buffer);}}// 写事件触发的函数voidHandleWrite(){// 将输出缓冲区的数据向Socket描述符中进行非阻塞写入int ret = _socket.NonBlockSend(_out_buffer.ReadPos(), _out_buffer.ReadAbleSize());// 发送错误就关闭连接if(ret <0){// 如果输入缓冲区还有数据,就要进行处理之后再关闭连接if(_in_buffer.ReadAbleSize()>0)_message_cb(shared_from_this(),&_in_buffer);returnReleaseInLoop();}// 缓冲区读偏移向后移动
_out_buffer.MoveReadOffset(ret);if(_out_buffer.ReadAbleSize()==0){// 关闭写事件监控
_channel.DisableWrite();// 如果是待关闭状态就进行关闭if(_statu == DISCONNETCING){returnRelease();}}return;}// 连接关闭触发的函数voidHandleClose(){// 如果还有数据,就进行一次处理if(_in_buffer.ReadAbleSize()>0){_message_cb(shared_from_this(),&_in_buffer);}// 释放连接returnRelease();}// 错误事件触发的函数voidHandleError(){returnHandleClose();// 直接调用关闭函数}voidHandleEvent(){// 刷新活跃度if(_enable_active_release ==true){
_loop->TimerRefresh(_conn_id);}if(_event_cb)_event_cb(shared_from_this());}// 发送函数 不是直接进行发送 而是将数据拷贝到输出缓冲区 然后启动写事件监控voidSendInLoop(constchar*data, size_t len){if(_statu == DISCONNECTED)return;
_out_buffer.WriteAndPush(data, len);if(_channel.Writeable()==false){
_channel.EnableWrite();}}// 这个关闭操作不是真正的关闭函数 而是腰判断是否还有时间要进行处理voidShutdownInLoop(){
_statu = DISCONNETCING;// 状态设置为待关闭状态// 如果输入缓冲区有数据 ,要进行一次处理if(_in_buffer.ReadAbleSize()>0){if(_message_cb)_message_cb(shared_from_this(),&_in_buffer);}// 如果输出缓存区有数据,就要启动写事件监控if(_out_buffer.ReadAbleSize()>0){if(_channel.Writeable()==false)
_channel.EnableWrite();}// 如果输出缓冲区数据没有 直接进行关闭if(_out_buffer.ReadAbleSize()==0){Release();}}voidEnableInactiveReleaseInLoop(int sec){// LOG(DEBUG, "EnableInactiveReleaseInLoop %d s\n", sec);// 将超时销毁标志位设置为true
_enable_active_release =true;// 如果已经有超时任务,那么就进行一次刷新if(_loop->HasTimer(_conn_id)){return _loop->TimerRefresh(_conn_id);}// 没有就进行添加
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release,this));}voidCancelInactiveReleaseInLoop(){// 将超时销毁标志位设置为false
_enable_active_release =false;// 如果有超时任务才进行取消if(_loop->HasTimer(_conn_id)){
_loop->TimerCancel(_conn_id);}}voidUpgradeInLoop(const Any &context,const ConnectedCallBack &conn_cb,const MessageCallBack &mess_cb,const ClosedCallBack &closed_cb,const AnyEventCallBack &event_cb){
_context = context;
_conn_cb = conn_cb;
_message_cb = mess_cb;
_closed_cb = closed_cb;
_event_cb = event_cb;}// 真正的关闭函数voidReleaseInLoop(){LOG(INFO,"Realse Connection:%p\n",shared_from_this());// 1. 修改连接状态
_statu = DISCONNECTED;// 2. 移除所有事件监控
_channel.Remove();// 3. 关闭描述符
_socket.Close();// 4. 取消定时任务if(_loop->HasTimer(_conn_id))CancelInactiveReleaseInLoop();// 5. 执行用户设置的关闭回调if(_closed_cb)_closed_cb(shared_from_this());// 6. 执行组件内的关闭回调if(_event_closed_cb)_event_closed_cb(shared_from_this());}// 连接获取之后所处的状态下要进行各种设置 启动读事件监控 调用回调函数voidEstablishedInLoop(){// LOG(DEBUG, "EstablishedInLoop()\n");// 必须是连接中状态才执行assert(_statu == CONNECTING);
_statu = CONNECTED;// 1.执行完 修改为已连接状态
_channel.EnableRead();// 2.启动读事件监控if(_conn_cb)_conn_cb(shared_from_this());// 3. 调用连接回调函数}public:Connection(EventLoop *loop,uint64_t conn_id,int sockfd):_conn_id(_conn_id),_socket(sockfd),_sockfd(sockfd),_loop(loop),_channel(_loop, _sockfd),_enable_active_release(false),_statu(CONNECTING){// 设置channel回调函数
_channel.SetCloseCallBack(std::bind(&Connection::HandleClose,this));
_channel.SetErrorCallBack(std::bind(&Connection::HandleError,this));
_channel.SetWriteCallBack(std::bind(&Connection::HandleWrite,this));
_channel.SetEventCallBack(std::bind(&Connection::HandleEvent,this));
_channel.SetReadCallBack(std::bind(&Connection::HandleRead,this));}~Connection(){LOG(INFO,"Release Connection :%p",this);}// 基础接口uint64_tId(){return _conn_id;}// 返回Connection的idintFd(){return _sockfd;}// 返回套接字描述符voidSetContext(const Any &context){ _context = context;}// 设置上下文
Any *GetContext(){return&_context;}// 获取上下文boolConnected(){return(_statu == CONNECTED);}// 判断是否处于连接状态!// 设置回调函数voidSetConnectCB(const ConnectedCallBack &cb){ _conn_cb = cb;}voidSetMessageCB(const MessageCallBack &cb){ _message_cb = cb;}voidSetClosedCB(const ClosedCallBack &cb){ _closed_cb = cb;}voidSetAnyEventCB(const AnyEventCallBack &cb){ _event_cb = cb;}voidSetSvrClosedCB(const ClosedCallBack &cb){ _event_closed_cb = cb;}// 发送数据voidSend(constchar*data, size_t len){ _loop->RunInLoop(std::bind(&Connection::SendInLoop,this, data, len));}// 关闭连接 --- 提供给用户的关闭,_不是真正的关闭连接 , 需要判断有没有数据待处理voidShutdown(){ _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop,this));}// 启动超时销毁voidEnableInactiveRelease(int sec){ _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this, sec));}// 取消超时销毁voidCancelInactiveRelease(){ _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop,this));}// 进行channel回调设置voidEstablished(){ _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));}voidRelease(){// LOG(DEBUG, "Release()\n");
_loop->QueueLoop(std::bind(&Connection::ReleaseInLoop,this));}// 切换协议voidUpgrade(const Any &context,const ConnectedCallBack &conn_cb,const MessageCallBack &mess_cb,const ClosedCallBack &closed_cb,const AnyEventCallBack &event_cb){// 切换协议 --- 重置上下文以及阶段性回调处理函数 这个函数必须在EventLoop中立刻执行// 预防新事件触发后 ,处理时还是原先的协议! --- 导致数据处理异常
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this, context, conn_cb, mess_cb, closed_cb, event_cb));}};
6 监听套接字 Acceptor模块
专门对监听套接字进行管理的类
- 创建一个监听套接字
- 启动读事件监控,获取新连接
- 事件触发后,获取新连接
- 为新连接创建Connection进行管理(这是服务器模块进行的)
该模块只进行监听连接的管理,因此获取到新连接的描述符之后,对于新连接描述符如何处理其实并不关心!
成员变量
- 套接字对象:Socket 用于创建监听套接字
- EventLoop* _loop :对监听套接字进行事件管理
- Channel _channel :用于对监听套接字进行事件管理
- 新连接获取之后的回调函数 AcceptCallBack AcceptCallBack _accept_callback;
成员函数
- 构造函数
- 监听套接字读事件回调函数 — 调用 _accept_callback,进行新连接处理
- 创建套接字 返回描述符。
classAcceptor{private:
Socket _socket;// 套接字对象
EventLoop *_loop;// 对监听套接字进行事件监控
Channel _channel;// 用于对今天套接的事件管理using AcceptCallBack = std::function<void(int)>;
AcceptCallBack _accept_callback;intCreateSocket(int port){bool ret = _socket.CreateServer(port);assert(ret ==true);return _socket.Sockfd();}// 读事件回调函数voidHandleRead(){// 获取新连接int newfd = _socket.Accept();if(newfd <0){LOG(ERROR,"Accept failed\n");return;}if(_accept_callback)_accept_callback(newfd);}public:Acceptor(EventLoop *loop,int port):_socket(CreateSocket(port)),_loop(loop),_channel(_loop, _socket.Sockfd()){
_channel.SetReadCallBack(std::bind(&Acceptor::HandleRead,this));// 开启读事件监控!//_channel.EnableRead();}~Acceptor(){
_socket.Close();}voidSetAcceptCallBack(const AcceptCallBack &cb){
_accept_callback = cb;}voidListen(){// 开启读事件监控!
_channel.EnableRead();}};
现在,服务器模块基本实现!
版权归原作者 叫我龙翔 所有, 如有侵权,请联系我们删除。