前言
多路转接(也称为多路复用)是一种用于管理多个I/O通道的技术,它能实现同时监听和处理多个I/O事件,而不是为每个I/O通道创建单独的线程或进程。其中,select是实现多路转接的一种常用方法。
select()
select函数是系统提供的一个多路转接接口,用于让我们的程序同时监视多个文件描述符(file descriptor,简称fd)的状态变化,如读就绪、写就绪或异常。其函数原型如下:
#include<sys/select.h>#include<sys/time.h>#include<unistd.h>intselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds,structtimeval*timeout);
参数说明
- nfds:是文件描述符集合中最大文件描述符值加1。这个参数实际上被忽略,因为现在的系统不再需要它来确定文件描述符的范围。
- readfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被读取。
- writefds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看它们是否准备好被写入。
- exceptfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查看是否有异常条件发生(例如,带外数据到达)。
- timeout:是一个指向 timeval 结构的指针,该结构指定了函数等待的最大时间长度。如果 timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪。
返回值
- 成功时,select() 返回准备就绪的文件描述符的总数(不包括 exceptfds 中的文件描述符)。
- 如果在调用时没有任何文件描述符准备就绪,并且 timeout 非空且指定的时间已过,则返回 0。
- 如果出现错误,则返回 -1,并设置 errno 以指示错误原因。
timeval是一个用于表示时间的结构体:
#include<sys/time.h>structtimeval{ time_t tv_sec;// 秒 suseconds_t tv_usec;// 微秒 };
tv_sec:秒数,从 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)开始计算的秒数(时间戳)。
tv_usec:微秒数,范围从 0 到 999999。
fd_set
**
fd_set
实际上是一个位图(Bitmask)或位向量(Bitvector),它的每一位代表一个文件描述符**。通过设置或清除位的方式,可以将文件描述符添加到或从
fd_set
中移除。
typedefstruct{
__fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];} fd_set;
其中,__fd_mask 通常是一个整数类型(如 long),__FD_SETSIZE 定义了 fd_set 中可以包含的最大文件描述符数量(通常是 1024),__NFDBITS 是 __fd_mask 中包含的位数。
与此同时,系统还提供一些接口来操作
fd_set
:
FD_ZERO(fd_set *set)
:将 fd_set 中的所有位清零,即将所有文件描述符从集合中移除。FD_SET(int fd, fd_set *set)
:将指定的文件描述符 fd 添加到 fd_set 中。FD_CLR(int fd, fd_set *set)
:将指定的文件描述符 fd 从 fd_set 中移除。FD_ISSET(int fd, fd_set *set)
:检查指定的文件描述符 fd 是否在 fd_set 中。在调用 select() 后,使用此宏来检测哪些文件描述符已准备就绪。
select的执行过程
对于执行过程,大致分为四个步骤:
1.初始化:
- 创建一个或多个
fd_set
结构,用于存储需要监视的文件描述符。- 使用
FD_ZERO
宏清空这些fd_set结构。- 使用
FD_SET
宏将需要监视的文件描述符添加到相应的fd_set结构中。- 对于添加到位图中的事件, 都会在位图表示成1
fd_set rfds;FD_ZERO(&rfds);FD_SET(_fd_array[i],&rfds);
2.调用select函数:
- 将最大的文件描述符值加1作为nfds参数。
- 将之前创建的fd_set结构作为
readfds
、writefds
和exceptfds
参数(如果需要监视相应的事件)。- 设置
timeout
参数以控制select的等待时间。- 调用select函数。
structtimeval timeout ={0,0};//设置等待时间int n=select(max_fd+1,&rfds,nullptr,nullptr,&timeout);
3.处理返回结果
- 此时经过select函数的执行后,已经影响了
fd_set
中的之前添加进来的文件描述符,只要没有就绪的话,那么都会被清空为0- 如果select返回大于0的值,表示有文件描述符的事件已经就绪。
- 利用for循环对每个事件进行检查是否就绪,
- 使用
FD_ISSET
宏检查哪些文件描述符的事件已经就绪。- 根据就绪的文件描述符执行相应的操作,如读取数据、写入数据或处理异常事件。
if(n>0){for(int i =0; i <1024; i++){if(FD_ISSET(_fd_array[i],&rfds)){//处理对应的就绪事件 }}}
4.重新设置并继续监视:
- 由于select函数会修改传入的
fd_set
结构,因此在每次调用select之前都需要重新设置这些结构。- 根据需要更新
timeout
参数。- 重复执行上述步骤以继续监视文件描述符的状态变化。
while(1){//包含以上内容}
注意事项:
- 每次调用select之前都需要重新设置fd_set结构和timeout参数。
- select函数只负责等待文件描述符的状态变化,并不负责数据的拷贝。数据的拷贝需要使用如read、write等函数来完成。
- select函数有一个限制,即它能够监视的文件描述符数量是有限的,通常取决于fd_set结构的大小(在32位系统上通常为1024个)。
利用select()建立一个Server服务器
InetAddr.hpp
包含网络地址的头文件:
#pragmaonce#include<iostream>#include<sys/types.h>#include<sys/socket.h>#include<arpa/inet.h>#include<netinet/in.h>classInetAddr{private:voidGetAddress(std::string *ip,uint16_t*port){*port =ntohs(_addr.sin_port);*ip =inet_ntoa(_addr.sin_addr);}public:InetAddr(conststructsockaddr_in&addr):_addr(addr){GetAddress(&_ip,&_port);}InetAddr(const std::string &ip,uint16_t port):_ip(ip),_port(port){
_addr.sin_family = AF_INET;
_addr.sin_port =htons(_port);
_addr.sin_addr.s_addr =inet_addr(_ip.c_str());}InetAddr(){}
std::string Ip(){return _ip;}booloperator==(const InetAddr &addr){// if(_ip == addr._ip)if(_ip == addr._ip && _port == addr._port)// 方便测试{returntrue;}returnfalse;}// bool operator = (const struct sockaddr_in &addr)// {// _addr = addr;// }structsockaddr_inAddr(){return _addr;}uint16_tPort(){return _port;}~InetAddr(){}private:structsockaddr_in _addr;
std::string _ip;uint16_t _port;};
Log.hpp
打印日志的头文件:
#pragmaonce#include<iostream>#include<fstream>#include<ctime>#include<cstdarg>#include<string>#include<sys/types.h>#include<unistd.h>#include<cstdio>#include"LockGuard.hpp"usingnamespace std;bool gIsSave=false;//默认输出到屏幕const string logname="log.txt";//1.日志是有等级的enumLevel{
DEBUG=0,
INFO,
WARNING,
ERROR,
FATAL
};voidSaveFile(const string& filename,const string& messages){
ofstream out(filename,ios::app);if(!out.is_open()){return;}
out<<messages;
out.close();}//等级转化为字符串
string LevelToString(int level){switch(level){case DEBUG:return"Debug";case INFO:return"Info";case WARNING:return"Warning";case ERROR:return"Error";case FATAL:return"Fatal";default:return"Unkonwn";}}//获取当前时间
string GetTimeString(){
time_t curr_time=time(nullptr);//时间戳structtm* format_time=localtime(&curr_time);//转化为时间结构if(format_time==nullptr)return"None";char time_buffer[1024];snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
format_time->tm_year +1900,
format_time->tm_mon +1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);return time_buffer;}
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//获取日志信息voidLogMessage(string filename,int line,bool issave,int level,char* format,...){
string levelstr=LevelToString(level);
string timestr=GetTimeString();
pid_t selfid=getpid();char buffer[1024];
va_list arg;va_start(arg,format);vsnprintf(buffer,sizeof(buffer),format,arg);va_end(arg);
string message="["+ timestr +"]"+"["+ levelstr +"]"+"["+ std::to_string(selfid)+"]"+"["+ filename +"]"+"["+ std::to_string(line)+"] "+ buffer +"\n";
LockGuard lockguard(&lock);if(!issave){
cout<<message;}else{SaveFile(logname,message);}}#defineLOG(level,format,...)\do\{\LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);\}while(0)#defineEnableFile()\do\{\gIsSave=true; \ }while(0)#defineEnableScreen()\do\{\gIsSave=false; \ }while(0)
LockGuard.hpp
互斥锁的头文件:
#ifndef__LOCK_GUARD_HPP__#define__LOCK_GUARD_HPP__#include<iostream>#include<pthread.h>classLockGuard{public:LockGuard(pthread_mutex_t* mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}private:
pthread_mutex_t *_mutex;};#endif
Socket.hpp
包含一系列Socket套接字的接口函数,还有TcpSocket专门的接口:
#include<iostream>#include<string>#include<functional>#include<sys/types.h>/* See NOTES */#include<sys/socket.h>#include<netinet/in.h>#include<arpa/inet.h>#include<unistd.h>#include<cstring>#include<pthread.h>#include<sys/types.h>#include<memory>#include"InetAddr.hpp"#include"Log.hpp"namespace socket_ns
{classSocket;conststaticint gbacklog=8;using socket_sptr=std::shared_ptr<Socket>;//套接字指针enum{
SOCKET_ERROR =1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR
};//在基类创建一系列虚函数,只要派生类能用到就在这里创建classSocket{public:virtualvoidCreateSocketOrDie()=0;//创建套接字virtualvoidBindSocketOrDie(InetAddr& addr)=0;//绑定套接字virtualvoidListenSocketOrDie()=0;//监听套接字virtualintAccepter(InetAddr* addr)=0;//接受客户端virtualboolConnector(InetAddr &addr)=0;//连接客户端virtualvoidSetSocketAddrReuse()=0;// 重启指定端口virtualintSockFd()=0;//获取SockfdvirtualintRecv(std::string *out)=0;//接收对方信息virtualintSend(const std::string &in)=0;//发送给对方信息public://创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建voidBuildListenSocket(InetAddr& addr){CreateSocketOrDie();SetSocketAddrReuse();BindSocketOrDie(addr);ListenSocketOrDie();}boolBuildClientSocket(InetAddr &addr){CreateSocketOrDie();returnConnector(addr);}};classTcpSocket:publicSocket{public:TcpSocket(int sockfd=-1):_sockfd(sockfd){}voidCreateSocketOrDie() override //override明确的重写基类函数{
_sockfd=socket(AF_INET,SOCK_STREAM,0);if(_sockfd<0){LOG(FATAL,"socket error");exit(SOCKET_ERROR);}LOG(DEBUG,"socket create success, sockfd is : %d\n", _sockfd);}voidBindSocketOrDie(InetAddr& addr) override
{structsockaddr_in local;memset(&local,0,sizeof(local));
local.sin_family = AF_INET;
local.sin_port =htons(addr.Port());
local.sin_addr.s_addr =inet_addr(addr.Ip().c_str());int n=bind(_sockfd,(structsockaddr*)&local,sizeof(local));if(n <0){LOG(FATAL,"bind error\n");exit(BIND_ERROR);}LOG(DEBUG,"bind success, sockfd is : %d\n", _sockfd);}voidListenSocketOrDie() override
{int n=listen(_sockfd,gbacklog);if(n <0){LOG(FATAL,"listen error\n");exit(LISTEN_ERROR);}LOG(DEBUG,"listen success, sockfd is : %d\n", _sockfd);}intAccepter(InetAddr* addr) override
{structsockaddr_in peer;
socklen_t len=sizeof(peer);int sockfd =accept(_sockfd,(structsockaddr*)&peer,&len);if(sockfd <0){LOG(WARNING,"accept error\n");return-1;}*addr=peer;return sockfd;}virtualboolConnector(InetAddr& addr){structsockaddr_in server;memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
server.sin_port=htons(addr.Port());int n=connect(_sockfd,(structsockaddr*)&server,sizeof(server));if(n <0){
std::cerr <<"connect error"<< std::endl;returnfalse;}returntrue;}voidSetSocketAddrReuse() override
{int opt =1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,&opt,sizeof(opt));//快速重启端口}intRecv(std::string *out) override
{char inbuffer[1024];
ssize_t n =recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);if(n >0){
inbuffer[n]=0;*out += inbuffer;// 接收次数可能不只一次,一般是多次的,}return n;}intSend(const std::string &in) override
{int n =send(_sockfd,in.c_str(),in.size(),0);return n;}intSockFd() override
{return _sockfd;}~TcpSocket(){}private:int _sockfd;};}
SelectServer.hpp
#pragmaonce#include<iostream>#include<string>#include<memory>#include"Socket.hpp"usingnamespace socket_ns;//select服务器要正确的编写,需要借助一个第三方数组来完成,保存合法的,所有的fd到数组中,方便后期批量化统一添加classSelectServer{conststaticint defaultfd =-1;//默认sockfdconststaticint N =sizeof(fd_set)*8;//监视文件描述符的最大值public:SelectServer(uint16_t port):_port(port),_listensock(make_unique<TcpSocket>()){
InetAddr addr("0", _port);//网络地址初始化
_listensock->BuildListenSocket(addr);//创建监听套接字//初始化辅助数组for(int i =0; i < N; i++){
_fd_array[i]= defaultfd;}
_fd_array[0]= _listensock->SockFd();}voidAcceptClient(){// 我们今天只关心了读,而读有:listensock 和 normal sockfd
InetAddr clientaddr;int sockfd = _listensock->Accepter(&clientaddr);// 这里调用accept会不会阻塞呢??不会。因为事件已经就绪了if(sockfd <0)return;LOG(DEBUG,"Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());//select托管(监视状态):将新fd放入辅助数组中int pos =1;for(; pos < N; pos++){if(_fd_array[pos]== defaultfd)break;}//让pos到辅助数组的空缺位置if(pos == N)//说明监视的文件描述符满了{::close(sockfd);// sockfd->Close();LOG(WARNING,"server is full!\n");return;}else{
_fd_array[pos]= sockfd;LOG(DEBUG,"%d add to select array!\n", sockfd);}LOG(DEBUG,"curr fd_array[] fd list : %s\n",RfdsToString().c_str());}voidServiceIO(int pos){char buffer[1024];
ssize_t n =::recv(_fd_array[pos], buffer,sizeof(buffer)-1,0);// 这里读取会不会被阻塞?不会if(n >0)//处理接收数据{
buffer[n]=0;
std::cout <<"client say# "<< buffer << std::endl;
std::string echo_string ="[server echo]# ";
echo_string += buffer;::send(_fd_array[pos], echo_string.c_str(), echo_string.size(),0);//返回给客户端}elseif(n ==0)//说明对方已断开连接{LOG(DEBUG,"%d is closed\n", _fd_array[pos]);::close(_fd_array[pos]);
_fd_array[pos]= defaultfd;LOG(DEBUG,"curr fd_array[] fd list : %s\n",RfdsToString().c_str());}else//出现错误{LOG(DEBUG,"%d recv error\n", _fd_array[pos]);::close(_fd_array[pos]);
_fd_array[pos]= defaultfd;LOG(DEBUG,"curr fd_array[] fd list : %s\n",RfdsToString().c_str());}}//处理准备就绪的事件voidHandlerEvent(fd_set &rfds){for(int i =0; i < N; i++){if(_fd_array[i]== defaultfd)continue;if(FD_ISSET(_fd_array[i],&rfds)){if(_fd_array[i]== _listensock->SockFd())//新的连接{AcceptClient();}else{// 普通的sockfd读事件就绪ServiceIO(i);}}}}voidLoop(){while(true){//监听套接字在等待对方发送连接//新的连接 == 读事件就绪//要将listensock添加到select中!
fd_set rfds;//一个记录文件描述符状态的集合FD_ZERO(&rfds);//将所有文件描述符移除集合int max_fd = defaultfd;//最大的文件描述符值for(int i =0; i < N; i++){if(_fd_array[i]== defaultfd)continue;FD_SET(_fd_array[i],&rfds);// 将所有合法的fd添加到rfds中if(max_fd < _fd_array[i]){
max_fd = _fd_array[i];// 更新出最大的fd的值}}structtimeval timeout ={0,0};//设置等待时间int n=select(max_fd+1,&rfds,nullptr,nullptr,nullptr);//timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪switch(n){case0://指定时间内没有任何文件描述符准备就绪LOG(INFO,"timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);break;case-1://出现错误LOG(ERROR,"select error...\n");break;default://成功状态LOG(DEBUG,"Event Happen. n : %d\n", n);// 底层有一个事件就绪,select为什么会一直通知我?因为:我们没有处理!HandlerEvent(rfds);break;}}}//打印出已存在的fd
std::string RfdsToString(){
std::string fdstr;for(int i =0; i < N; i++){if(_fd_array[i]== defaultfd)continue;
fdstr += std::to_string(_fd_array[i]);
fdstr +=" ";}return fdstr;}~SelectServer(){}private:uint16_t _port;//端口号
std::unique_ptr<Socket> _listensock;//监听socketint _fd_array[N];// 辅助数组};
成员:
辅助数组:由上面select执行过程可以知道,当select执行后,会对fd_set的文件描述符产生影响,所以为了能够在循环中多次调用select函数,就需要一个数组来进行对文件描述符的保存;
初始化:
对于辅助数组来说,只要没有新的fd进来,那么文件描述符将保持为负;而对于Select来说,监听fd就是第一个事件;所以要在初始化就添加进来;
Loop:
这里操作流程就是跟上面的执行过程是一样的,只是增加了一些细节:
N是1024,指的是Select能存储的最大fd的数目;我们需要让select监听我们想要监听的事件,就需要通过循环来一个一个添加到rfds中;
select()返回值:
根据select()
的返回值执行不同的代码;
这里所说的底层事件就绪,如果没有处理已就绪的事件,那么select就会一直监测到事件就绪,一直执行default
语句的内容;
HandlerEvent():
在for循环里面通过FD_ISSET函数找出每个已经就绪的事件,然后再判断是不是监听事件的;
Main.cc
#include"SelectSever.hpp"#include<memory>// ./selectserver portintmain(int argc,char*argv[]){if(argc !=2){
std::cout <<"Usage: "<< argv[0]<<" port"<< std::endl;return0;}uint16_t port = std::stoi(argv[1]);//获取端口号EnableScreen();
std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);
svr->Loop();return0;}
通过telnet进行测试:
开启Server服务:
telnet进行访问
Server的监听socket收到就绪的事件,创建一个新的IO服务客户端:
客户端任意发送内容:
客户端服务端都能通过Select的底层就绪互相接收发送:
Select()的优缺点
优点
- 多路复用:select()函数能够同时监视多个文件描述符,实现I/O多路复用,从而提高了程序的并发处理能力和资源利用率。
- 简单易用:select()函数的接口相对简单,易于理解和使用,特别是对于初学者来说。
- 灵活性:select()函数允许程序根据文件描述符的读、写、异常等事件进行灵活的处理,满足不同的I/O需求。
缺点
- 文件描述符数量的限制:select()函数能够监视的文件描述符数量有限,通常在Linux上默认为1024个(尽管可以通过修改宏定义或重新编译内核来提升这一限制,但这样做可能会降低效率)。这对于需要监视大量文件描述符的应用程序来说是一个显著的限制。
- 性能瓶颈:当监视的文件描述符数量较多时,select()函数的性能可能会成为瓶颈。因为每次调用select()时,内核都需要扫描所有被监视的文件描述符,这会导致不必要的开销。
- 内核拷贝开销:在select()调用过程中,由于每次都要事先准本
fd_set
结构内容,内核与用户空间之间需要进行内存拷贝操作,以传递文件描述符集合和结果。这会增加额外的开销,并可能影响性能。
因此,在选择是否使用select()函数时,需要根据具体的应用场景和需求进行权衡。对于需要监视大量文件描述符或追求高性能的应用程序来说,可能需要考虑使用更高级的I/O多路复用机制,如poll()或epoll()等。
版权归原作者 诡异森林。 所有, 如有侵权,请联系我们删除。