多路转接
基本概念
①阻塞与非阻塞
阻塞与非阻塞讨论的是在等待调用结果时的状态
阻塞调用是指在等待时,当前线程会被挂起. 调用线程只有在得到结果之后才会返回
非阻塞调用指在等待时,该线程可以执行其他任务,不被OS挂起
②同步通信 vs 异步通信
同步与异步讨论的是调用者是否会主动等待调用结果
同步:调用者发出调用时,没有得到结果不会返回,阻塞等待,调用者主动等待该调用结果
异步:与同步相反,发出调用后立即返回,调用内的工作由别人完成,自己并不参与,等待被调用者的通知,直接使用
③非阻塞IO
文件描述符, 默认都是阻塞IO
int fcntl(int fd, int cmd, … /* arg */ )
对于cmd参数:
复制一个现有的描述符(cmd=F_DUPFD).
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)
通过fcntl,实现一个非阻塞的文件描述符
voidSetNoBlock(int fd){int fl =fcntl(fd, F_GETFL);//将当前的文件描述符的属性取出来if(fl <0){perror("fcntl");return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);//使用F_SETFL将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK参数}
五种IO模型
①阻塞IO:
在内核将数据准备好之前, 系统调用会一直阻塞式等待(被OS挂起)
比如套接字:
②非阻塞IO
如果内核还未将数据准备好, 系统调用仍然会直接返回
并且返回EWOULDBLOCK错误码,定期检测
非阻塞IO可能会反复检测该文件描述符数据是否就绪, 这个过程称为轮询. 这对CPU来说是较大的浪费, 一般只有特定场景下才使用
对于阻塞IO,OS需要唤醒阻塞的进程,由OS发起并执行
而非阻塞轮询是由用户自己发起检测,OS执行
③信号驱动IO
内核将数据准备好的时候, 使用SIGIO信号通知应用程序进行IO操作
④IO多路转接:
虽然看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
IO分两步,一个是等待,一个是拷贝,高效IO往往指等待时间少
IO多路转接负责同时等待多个文件描述符,当数据就绪时返回,再次recvfrom后就直接拷贝数据,提高了IO效率
⑤异步IO
由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)
异步IO只需要发出系统调用请求,然后等待OS递交信号,此时用户缓冲区已经拷贝完成,直接取用就行,该动作由OS完成,而信号驱动由用户完成拷贝数据到用户缓冲区
特点:没有参与等待,没有参与拷贝,不会等待OS的信号再去调用recvfrom
IO多路转接
①select
select的主要工作是负责等待数据就绪,并且通知上层
特点:只要底层缓冲区有数据(有空间),select就认为读事件(写事件)就绪
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数:
nfds:最大文件描述符加1(select遍历文件描述符数组)
fd_set:文件描述符的位图,输入输出型参数,输入代表需要OS检测的文件描述符,输出为就绪的文件描述符,只能最多同时监控1024个
readfds:读事件位图
writefds:写事件位图
exceptfds:异常位图
timeout:
如果为NULL,则一直阻塞式等待,
0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回
返回值:
执行成功则返回所有就绪文件描述符的个数
0表示时间超过timeout
-1代表等待出错
作为输入输出型参数,select每次调用后,原来的参数数据就被覆盖了,所以需要通过数组保存原来的数据,每次都需要对传入的参数如readfds进行重新设置
连接事件到来,select也认为是读事件就绪
简易的select服务器:
1 #pragma once
2 #include"sock.hpp"3 #define DFL_FD -14 #define BACK_LOG 55 #define NUM 10246namespace ns_select
7{8classSelectServer9{10private:11int listen_sock;12unsignedshort port;13public:14SelectServer(unsignedshort _port):port(_port)15{1617}18voidInitSelectServer(){19 listen_sock=tzc::Sock::Socket();20 tzc::Sock::Bind(listen_sock,port);21 tzc::Sock::Listen(listen_sock,BACK_LOG);22}23voidRun()24{25 fd_set rfds;26int fd_array[NUM]={0};27ClearArray(fd_array,NUM,DFL_FD);//将数组初始化为-128 fd_array[0]=listen_sock;//放入监听套接字29for(;;)30{31//重新设置时间与max_fd32structtimeval timeout={5,0};33int max_fd=DFL_FD;34FD_ZERO(&rfds);35//添加进rfds36for(auto i=0;i<NUM;i++)37{38if(fd_array[i]==DFL_FD)39{40continue;41}42FD_SET(fd_array[i],&rfds);43if(max_fd<fd_array[i])44{45 max_fd=fd_array[i];46}47}48switch(select(max_fd+1,&rfds,nullptr,nullptr,&timeout))49{50case0:51 std::cout<<"timeout"<<std::endl;52break;53case-1:54 std::cerr<<"select error"<<std::endl;55break;56default:57//正常事件处理58HandlerEvent(rfds,fd_array,NUM);59break;6061}62}63}64voidHandlerEvent(const fd_set &rfds,int fd_array[],int num)65{66for(auto i=0;i<num;i++)67{68if(fd_array[i]==DFL_FD)69{70continue;71}72//有效fd73if(fd_array[i]==listen_sock&&FD_ISSET(fd_array[i],&rfds))74{75//连接事件到来76structsockaddr_in peer;77 socklen_t len=sizeof(peer);78int sock=accept(fd_array[i],(structsockaddr*)&peer,&len);79if(sock<0)80{81 std::cerr<<"accept error"<<std::endl;82continue;83}84uint16_t peer_port=htons(peer.sin_port);85 std::string peer_ip=inet_ntoa(peer.sin_addr);86 std::cout<<peer_ip<<": "<<peer_port<<std::endl;87//将文件描述符添加到fd_array数组中88if(!AddFdToArray(fd_array,num,sock))89{90close(sock);91 std::cout<<"select server full,close fd "<<sock<<std::endl;92}93}94else95{96if(FD_ISSET(fd_array[i],&rfds))97{98//读事件就绪99char buffer[1024];100//粘包等问题101//定制协议102//对每个文件描述符定义缓冲区103 ssize_t s=recv(fd_array[i],buffer,sizeof(buffer)-1,0);104if(s>0)105{106 buffer[s]=0;107 std::cout<<"echo# "<<buffer<<std::endl;108}109elseif(s==0)110{111 std::cout<<"client quit"<<std::endl;112close(fd_array[i]);113 fd_array[i]=DFL_FD;114}115else116{117 std::cerr<<"recv error"<<std::endl;118close(fd_array[i]);119 fd_array[i]=DFL_FD;120}121}122else123{124125}126}127}128}129private:130voidClearArray(int fd_array[],int num,int default_fd)131{132for(auto i=0;i<num;i++)133{134 fd_array[i]=default_fd;135}136}137boolAddFdToArray(int fd_array[],int num,int sock)138{139for(int i=0;i<num;i++)140{141if(fd_array[i]==DFL_FD)142{143 fd_array[i]=sock;144returntrue;145}146}147//使用完空间了148returnfalse;149}150};151152};
sock.h
1 #include<iostream>2 #include<unistd.h>3 #include<cstring>4 #include<sys/socket.h>5 #include<sys/types.h>6 #include<arpa/inet.h>7 #include<netinet/in.h>8namespace tzc
9{10classSock11{12public:13staticintSocket()14{15int sock=socket(AF_INET,SOCK_STREAM,0);16if(sock<0)17{18 std::cerr<<"socket error"<<std::endl;19exit(1);20}21int opt=1;22setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));2324return sock;25}26staticboolBind(int sock,unsignedshort port)27{28structsockaddr_in local;29memset(&local,0,sizeof(local));30 local.sin_family=AF_INET;31 local.sin_port=htons(port);32 local.sin_addr.s_addr=INADDR_ANY;33if(bind(sock,(structsockaddr*)& local,sizeof(local))<0)34{35 std::cerr<<"bind error"<<std::endl;36exit(2);37}38returntrue;39}40staticboolListen(int sock,int backlog)41{42if(listen(sock,backlog)<0)43{44 std::cout<<"listen error"<<std::endl;45exit(3);46}47returntrue;48}49};5051};
select缺点:
1.select能够同时等待的文件描述符是有上限的(1024)
2.select需要和OS交互数据,涉及较多数据拷贝,当select面临的链接很多时,会因为拷贝数据而降低效率
3.select每次调用,都必须从第三方数组重新添加fd,影响程序运行效率,比较麻烦,
4.select的nfds参数,决定了操作系统检测遍历的范围,当大量连接到来时,OS需要遍历的数据会越来越多
5.select成功返回后,每次都需要遍历第三方数组判断哪些文件描述符事件就绪
select优点:
1.select可以同时等待多个fd,而且只负责等待,不会拷贝数据到用户缓冲区
多路转接适用场景:大量的连接到来,但是只有少量是活跃的
而一般场景:如果大量连接到来都很活跃,直接阻塞式recvfrom读取就足够了
②poll
poll解决了select的两个问题
1.解决了select检测文件上限的问题
2.将用户传给OS的需要检测的文件描述符与OS传给用户的就绪文件描述符的两个行为进行分离
poll的缺点:
1.和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
2.每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
3.同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降
poll函数接口
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数:
fds:需要检测的文件描述符
nfds:结构体数组长度
timeout:轮询方式,与select一致
返回值
小于0, 表示出错;
等于0, 表示poll函数等待超时;
大于0, 表示poll由于监听的文件描述符就绪而返回.
struct pollfd {
int fd; //file descriptor
short events; // 用户 requested events
short revents; // OS returned events
};
events和revents的取值:
POLLIN:数据可读
POLLOUT:数据可写
简易poll
1 #include"sock.hpp"2 #include<poll.h>3classPollServer4{5private:6int listen_sock;7int port;8public:9PollServer(int _port):port(_port)10{}11voidInitServer()12{13 listen_sock=tzc::Sock::Socket();14 tzc::Sock::Bind(listen_sock,port);15 tzc::Sock::Listen(listen_sock,5);16}17voidRun()18{19structpollfd rfds[64];20for(int i=0;i<64;i++)21{22 rfds[i].fd=-1;23 rfds[i].events=0;24 rfds[i].revents=0;25}26 rfds[0].fd=listen_sock;27 rfds[0].events|=POLLIN;28 rfds[0].revents=0;29for(;;)30{31switch(poll(rfds,64,-1))32{33case0:34 std::cout<<"timeout"<<std::endl;35break;36case-1:37 std::cerr<<"poll error"<<std::endl;38default:39for(int i=0;i<64;i++)40{41if(rfds[i].fd==-1)42{43continue;44}45if(rfds[i].revents&POLLIN)46{47if(rfds[i].fd==listen_sock)48{49//连接到来50 std::cout<<"get a new link"<<std::endl;51}52else53{54//读事件就绪5556}57}58}59}60}61};62};
③epoll
1.epoll模型
epoll的使用过程就是三部曲:
1.调用epoll_create创建一个epoll句柄
2.调用epoll_ctl, 将要监控的文件描述符进行注册
3.调用epoll_wait, 等待文件描述符就绪
2.epoll的相关系统调用
int epoll_create(int size)
size:128or256,该参数已被废弃
返回值:
返回一个epoll的文件描述符
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
第一个参数是epoll_create()的返回值(epoll的句柄)
第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD :注册新的fd到epfd中
EPOLL_CTL_MOD :修改已经注册的fd的监听事件
EPOLL_CTL_DEL :从epfd中删除一个fd
第三个参数是需要监听的fd
第四个参数是告诉内核需要监听什么事
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
epoll将会把发生的事件拷贝到events数组中
maxevents告知内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
struct epoll_event结构如下:
1 #pragma once
2 #include"sock.hpp"3 #include<sys/epoll.h>4 #define back_log 55 #define MAX_NUM 646namespace ns_epoll
7{8constint size=256;9classEpollServer10{11private:12int listen_sock;13int epfd;14uint16_t port;15public:16EpollServer(int _port):port(_port)17{}18voidInitEpollServer()19{20 listen_sock=tzc::Sock::Socket();21 tzc::Sock::Bind(listen_sock,port);22 tzc::Sock::Listen(listen_sock,back_log);23if((epfd=epoll_create(size))<0)24{25 std::cerr<<"epoll_create error\n"<<std::endl;26exit(4);27}28}29voidAddEvent(int sock,uint32_t event)30{31structepoll_event ev;32 ev.events=0;33 ev.events|=event;34 ev.data.fd=sock;35if(epoll_ctl(epfd,EPOLL_CTL_ADD,sock,&ev)<0)36{37 std::cerr<<"epoll_ctl error,fd:"<<sock<<std::endl;38}39}40voidDelEvent(int sock)41{42if(epoll_ctl(epfd,EPOLL_CTL_DEL,sock,nullptr)<0)43{44 std::cerr<<"epoll_ctl error"<<std::endl;45}46}47voidRun()48{49AddEvent(listen_sock,EPOLLIN);50int timeout=1000;51structepoll_event revs[MAX_NUM];52for(;;)53{54//返回值num表明就绪事件个数,OS会依次放入revs中55int num=epoll_wait(epfd,revs,MAX_NUM,timeout);56if(num>0)57{58for(int i=0;i<num;i++)59{60int sock=revs[i].data.fd;61if(revs[i].events&EPOLLIN)62{63if(sock==listen_sock)64{65structsockaddr_in peer;66 socklen_t len=sizeof(peer);67int sk=accept(listen_sock,(structsockaddr*)&peer,&len);68if(sk<0)69{70 std::cout<<"accept error"<<std::endl;71continue;72}73 std::cout<<"get a new link: "<<inet_ntoa(peer.sin_addr)<<": "<<ntohs(peer.sin_port)<<std::endl;74AddEvent(sk,EPOLLIN);75}76else77{78//读事件就绪79char buffer[1024];80 ssize_t s=recv(sock,buffer,sizeof(buffer)-1,0);81if(s>0)82{83 buffer[s]=0;84 std::cout<<buffer<<std::endl;85}86else87{88 std::cout<<"client close"<<std::endl;89close(sock);90DelEvent(sock);91}92}93}94elseif(revs[i].events&EPOLLOUT)95{96//写事件97}98}99}100elseif(num==0)101{102 std::cout<<"time out"<<std::endl;103}104else105{106 std::cout<<"epoll error"<<std::endl;107}108}109}110~EpollServer()111{112if(listen_sock>=0)113{114close(listen_sock);115}116if(epfd>=0)117{118close(epfd);119}120}121122};123};
epoll优点:
1.接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
2.数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝 内核中,该红黑树结点仍在内存,下次操作不需要重新添加(而select/poll都是每次循环都要进行拷贝)
3.事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.
4.没有数量限制: 文件描述符数目无上限
3.epoll的工作方式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET),默认是LT的
LT VS ET
LT与ET的差别在于就绪事件通知机制:
LT:只要底层有数据就会一直通知上层读取数据
ET:当底层的数据从无到有,从有到多变化时才会通知上层一次
对于ET:
只有在底层数据变化时才会通知,但是如果此次并没有读完缓冲区数据,如果此后该缓冲区也再也无数据变化,将会导致该剩余数据一直没能被应用层读取,所以ET应该保证一次就把缓冲区的数据全部读取完
通过不断循环调用recv,判断recv的返回值来判定是否读取完毕,读取时会出现如下两种情况:
情况1:如果返回的小于期望读取的字节数,那么说明已经读取完毕
情况2:如果最后一次读取的刚刚将缓冲区读完,返回值刚好等于缓存区大小,但此时读取将被判定为还未读取完毕,会再次读取,进程将被阻塞,服务器将被挂起
所以对于情况2,就要求ET模式必须为非阻塞轮询模式的读取,当缓冲区无数据时返回值小于0,退出循环
可见ET模式下recv,write都必须是非阻塞的,而LT可以不需要,因为即使此次没读完还会通知,下次还能读取
select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET
4.epoll工作方式的对比
1.LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是一次处理完数据
2.相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的
3.同时ET 的代码复杂程度也更高
版权归原作者 TZC⑥ 所有, 如有侵权,请联系我们删除。