0302 多进程网络服务器架构
专栏内容:
- postgresql使用入门基础
- 手写数据库toadb
- 并发编程
个人主页:我的主页
管理社区:开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.
一、概述
在大规模数据处理中,会有大量的客户端接入同一台服务器,每个客户端都需要长时间提供服务。
服务器采用中心化的部署,而客户端往往分散在不同机器上,服务器与客户端之间跨网络通信,一般采用C/S架构。
而服务端的架构需要能应对大量并发客户端,同时可以给每个客户端独占的服务,这就用到了多任务的网络模型架构,下面我们来看看用多进程如何实现。
二、多路复用的网络模型
C/S架构中,处理大量的网络请求,需要一套基于多路复用的网络处理模型。
- 可以同时处理网络连接请求和网络数据传递;
- 减少程序的阻塞时间,避免无效的CPU消耗;
- 适应不同的并发规模;
以此为目标实现如下网络模型。
2.1 服务端网络监听
多路复用模型这里采用了epoll方式,如果自己的平台不支持,可以换为select或者poll的方式。
代码如下:
#include<stdio.h>#include<stdlib.h>#include<string.h>#include<unistd.h>#include<arpa/inet.h>#include<sys/epoll.h>#include<fcntl.h>#include<errno.h>#defineMAX_EVENTS10#defineBUFFER_SIZE1024#definePORT8080// 设置文件描述符为非阻塞模式 intset_nonblocking(int fd){int flags, s;
flags =fcntl(fd, F_GETFL,0);if(flags ==-1){perror("fcntl F_GETFL");return-1;}
flags |= O_NONBLOCK;
s =fcntl(fd, F_SETFL, flags);if(s ==-1){perror("fcntl F_SETFL");return-1;}return0;}intmain(){int listen_fd, conn_fd, nfds, epoll_fd;structsockaddr_in server_addr;structepoll_event ev, events[MAX_EVENTS];char buffer[BUFFER_SIZE];ssize_t count;// 创建监听socket
listen_fd =socket(AF_INET, SOCK_STREAM,0);if(listen_fd ==-1){perror("socket");exit(EXIT_FAILURE);}// 设置非阻塞模式 if(set_nonblocking(listen_fd)==-1){close(listen_fd);exit(EXIT_FAILURE);}// 绑定地址和端口
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port =htons(PORT);if(bind(listen_fd,(structsockaddr*)&server_addr,sizeof(server_addr))==-1){perror("bind");close(listen_fd);exit(EXIT_FAILURE);}// 开始监听 if(listen(listen_fd, SOMAXCONN)==-1){perror("listen");close(listen_fd);exit(EXIT_FAILURE);}// 创建epoll实例
epoll_fd =epoll_create1(0);if(epoll_fd ==-1){perror("epoll_create1");close(listen_fd);exit(EXIT_FAILURE);}// 添加监听socket到epoll实例
ev.events = EPOLLIN;
ev.data.fd = listen_fd;if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd,&ev)==-1){perror("epoll_ctl: listen_fd");close(listen_fd);close(epoll_fd);exit(EXIT_FAILURE);}// 主循环 while(1){
nfds =epoll_wait(epoll_fd, events, MAX_EVENTS,-1);if(nfds ==-1){perror("epoll_wait");exit(EXIT_FAILURE);}for(int n =0; n < nfds;++n){if(events[n].data.fd == listen_fd){// 新的连接
conn_fd =accept(listen_fd,NULL,NULL);if(conn_fd ==-1){perror("accept");continue;}// 设置非阻塞模式 if(set_nonblocking(conn_fd)==-1){close(conn_fd);continue;}// 添加新的连接socket到epoll实例
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd,&ev)==-1){perror("epoll_ctl: conn_fd");close(conn_fd);}}else{// 处理读事件
conn_fd = events[n].data.fd;while((count =read(conn_fd, buffer, BUFFER_SIZE))>0){// 处理接收到的数据(这里简单回显) write(conn_fd, buffer, count);}if(count ==-1&& errno != EAGAIN){// 出现错误或连接关闭 close(conn_fd);}elseif(count ==0){// 连接关闭 close(conn_fd);}// 从epoll实例中移除已关闭的socket if(count <=0&& errno != EAGAIN){
ev.events =0;epoll_ctl(epoll_fd, EPOLL_CTL_DEL, conn_fd,&ev);}}}}close(listen_fd);close(epoll_fd);return0;}
说明
- TCP服务端的基本步骤创建socket,设置为非阻塞模式,绑定IP与端口,开启监听;
- 这里服务端的socket需要设置为非阻塞模式,因为我们是在单进程中处理多个连接,每个连接不能阻塞等待;
- 然后加入到epoll监听池中,开始epoll事件的等待;这里只处理接收事件;
- 如果有服务端socket的接收事件,那么说明有客户端连接消息,进行accep,创建客户端连接的socket;
- 同样将客户端连接的socket设置为非阻塞,理由同上;加入epoll临听池中,同样也只处理接收事件;
- 如果有客户端连接的socket上的接收事件,那么说明客户端正在给服务端发消息;
- 收到客户端消息后,这里只是简单处理,原样再发给客户端;
- 如果客户端关闭或出错,将客户端连接关闭,并从epoll临听池中移除;
2.2 客户端测试
现在我们来编写一个简单的客户端模拟程序,测试一下多路复用的网络框架。
/*
* ex020302_client.c
*/#include<stdio.h>#include<stdlib.h>#include<string.h>#include<unistd.h>#include<arpa/inet.h>#defineSERVER_IP"127.0.0.1"#defineSERVER_PORT4808#defineBUFFER_SIZE1024#defineCLIENT_SEND_CNT20intmain(){int sockfd;structsockaddr_in server_addr;char buffer[BUFFER_SIZE]={0};constchar*message ="Hello, Server!";// 创建套接字 if((sockfd =socket(AF_INET, SOCK_STREAM,0))<0){perror("socket creation failed");exit(EXIT_FAILURE);}// 配置服务器地址信息
server_addr.sin_family = AF_INET;
server_addr.sin_port =htons(SERVER_PORT);// 将IP地址从字符串转换为二进制形式 if(inet_pton(AF_INET, SERVER_IP,&server_addr.sin_addr)<=0){perror("Invalid address/ Address not supported");close(sockfd);exit(EXIT_FAILURE);}// 连接到服务器 if(connect(sockfd,(structsockaddr*)&server_addr,sizeof(server_addr))<0){perror("Connection Failed");close(sockfd);exit(EXIT_FAILURE);}for(int i =0; i < CLIENT_SEND_CNT; i++){// 发送消息到服务器 send(sockfd, message,strlen(message),0);printf("Message sent: %s\n", message);// 接收服务器的响应 int bytes_received =recv(sockfd, buffer, BUFFER_SIZE -1,0);if(bytes_received <0){perror("Error in receiving");}elseif(bytes_received ==0){printf("Server closed the connection\n");}else{
buffer[bytes_received]='\0';// 确保字符串以空字符结尾 printf("Message received from server: %s\n", buffer);}sleep(1);}// 关闭套接字 close(sockfd);return0;}
说明
- TCP客户端建立的基本步骤,创建socket,初始化服务端地址,连接服务器;
- 然后向服务端发送相同的消息;
- 每次发送完成后,等待接收消息;
2.3 客户端测试
可以看到,服务端处理客户端的请求时,都是按照接收到的顺序进行串行处理;
当客户端的数量达到成百上千时,对客户端的响应时间就会出现非常明显的延迟,
这种延迟会随着业务的复杂度而放大。
这时就需要充分利用多核CPU硬件资源,来进行并发任务的处理。
三、多进程服务处理
上面是在单个任务进程中处理了监听和大量任务连接的网络处理,各客户端连接的服务会相互影响,实际是串行化处理的。
要让大量的客户端能同时被响应,需要采用多任务的方式,那么在上面的网络模型基础上加入多进程,服务端为每个客户端连接准备一个独立的进程,这样就可以及时响应。
3.1 多进程架构
首先我们利用前面几个章节的介绍,来搭建一个多进程的代码架构,由主进程根据需要进行创建子进程,并且由主进程进行全局的控制。
/*
* ex020302_netprocess.c
*/#include<stdio.h>#include<stdlib.h>#include<string.h>#include<unistd.h>#include<arpa/inet.h>#include<sys/epoll.h>#include<fcntl.h>#include<errno.h>#defineMAX_EVENTS10#defineBUFFER_SIZE1024#definePORT4808voiddaemon_fork(){int pid =-1;
pid =fork();if(pid <0){printf("fork error[%s]\n",strerror(errno));exit(-1);}elseif(pid >0){// parent exit.exit(0);}else{// child daemonreturn;}}voidsubprocess(int sock){int pid =-1;
pid =fork();if(pid <0){printf("fork error[%s]\n",strerror(errno));exit(-1);}elseif(pid >0){// parent.close(sock);return;}else{// child close(listen_fd);processMsg(sock);exit(0);}}
说明
- daemon服务程序函数,这个前一章节已经介绍过了,服务端以后台进程的方式运行;
- 子进程任务处理函数;这里创建的是任务子进程,并在子进程中调用消息处理函数;
- 这里需要注意的是,在子进程中要关闭服务端的socket,同时在父进程中要关闭客户端连接的socket; 因为父子进程会复制内存空间,但是在各自的进程中,已经不再需要;
3.2 并发网络处理模型
现在就可以将上面的多路复用网络处理放入多进程架构中,处理逻辑进行如下切分:
- 服务端监听socket初始化,多路复用器的初始化等,都放在主进程中,作为服务端网络初始化的一部分;
- 每个客户端连接的socket,以及它的读写消息处理逻辑,放在子进程中;这样每个客户端连接对应一个后台服务子进程;
- 创建子进程的时机,也就是在主进程中接收到新连接时,创建新连接成功后,就可以新建子进程进行处理;
- 而子进程的退出时间,就是客户端断开连接,或者处理出错时;
voidinitializeServerNet(){structsockaddr_in server_addr;// 创建监听socket
listen_fd =socket(AF_INET, SOCK_STREAM,0);if(listen_fd ==-1){perror("socket");exit(EXIT_FAILURE);}// 绑定地址和端口
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port =htons(PORT);if(bind(listen_fd,(structsockaddr*)&server_addr,sizeof(server_addr))==-1){perror("bind");close(listen_fd);exit(EXIT_FAILURE);}// 开始监听if(listen(listen_fd, SOMAXCONN)==-1){perror("listen");close(listen_fd);exit(EXIT_FAILURE);}}voidcloseServerFd(){close(listen_fd);}voiddispatchLoop(){int conn_fd;// 主循环 while(1){// 新的连接
conn_fd =accept(listen_fd,NULL,NULL);if(conn_fd ==-1){perror("accept");sleep(1);continue;}subprocess(conn_fd);}}voidprocessMsg(int sock){char buffer[BUFFER_SIZE];ssize_t count;printf("serv-process:%d start.\n");while((count =read(sock, buffer, BUFFER_SIZE))>0){// 处理接收到的数据(这里简单回显)write(sock, buffer, count);}if(count ==-1&& errno != EAGAIN){// 出现错误或连接关闭close(sock);}elseif(count ==0){// 连接关闭close(sock);}printf("serv-process:%d exit.\n");}
那么主程序实现如下:
voiddaemon_fork();voidsubprocess(int sock);voidprocessMsg(int sock);voidinitializeServerNet();voidcloseServerFd();voiddispatchLoop();int listen_fd;intmain(int argc ,char*argv[]){daemon_fork();initializeServerNet();dispatchLoop();closeServerFd();return0;}
- 在主进程中先进程服务端初始化;
- 然后就可以开始监听,并接收客户端的连接;
- 当有客户端连接时,就创建客户端连接,并启动子进程与该客户端进行网络通信;
- 子进程在客户端断开连接或出错时,就会退出;
2.3 客户端测试
可以看到将客户端发送次数调大后,开启的客户端越多,服务端启动的子进程也就会越多;
此时,可以看到服务端每个进程的CPU使用率并不是很高;
但是随着客户端数量越来越多,服务端进程数量超过CPU核数时,就会增加系统的负担;
四、总结
本文主要介绍了基于多进程架构的网络服务器的设计与实现,在多进程架构中每个客户端会有一个服务端的进程专门处理通信,增加了对客户端消息的响应效率,提升了并发处理能力。
结尾
非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!
作者邮箱:study@senllang.onaliyun.com
如有错误或者疏漏欢迎指出,互相学习。
注:未经同意,不得转载!
版权归原作者 韩楚风 所有, 如有侵权,请联系我们删除。