0


[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

在这里插入图片描述

文章目录

在这里插入图片描述

一.网络层与传输层协议

  • 网络层与传输层内置于操作系统的内核中,网络层一般使用ip协议,传输层常用协议为Tcp协议和Udp协议,Tcp协议和Udp协议拥有各自的特点和应用场景:在这里插入图片描述

sockaddr结构体继承体系(Linux体系)

  • sockaddr_in结构体用于存储网络通信主机进程的ip和端口号等信息在这里插入图片描述

贯穿计算机系统的网络通信架构图示:

在这里插入图片描述

二.实现并部署多线程并发Tcp服务器框架

小项目的完整文件的gittee链接

  • Tcp服务器架构:在这里插入图片描述

线程池模块

  1. #pragmaonce#include<iostream>#include<pthread.h>#include"log.hpp"#include<semaphore.h>#include<vector>#include<cstdio>template<classT>classRingQueue{private:
  2. pthread_mutex_t Clock_;
  3. pthread_mutex_t Plock_;
  4. sem_t Psem_;
  5. sem_t Csem_;
  6. std::vector<T> Queue_;int Pptr_;int Cptr_;int capacity_;public:RingQueue(int capacity =10):Queue_(capacity),Pptr_(0),Cptr_(0),capacity_(capacity){sem_init(&Psem_,0,capacity);sem_init(&Csem_,0,0);pthread_mutex_init(&Clock_,nullptr);pthread_mutex_init(&Plock_,nullptr);}~RingQueue(){sem_destroy(&Psem_);sem_destroy(&Csem_);pthread_mutex_destroy(&Clock_);pthread_mutex_destroy(&Plock_);}
  7. T Pop(){sem_wait(&Csem_);pthread_mutex_lock(&Clock_);
  8. T tem = Queue_[Cptr_];
  9. Cptr_++;
  10. Cptr_ %= capacity_;pthread_mutex_unlock(&Clock_);sem_post(&Psem_);return tem;}voidPush(T t){sem_wait(&Psem_);pthread_mutex_lock(&Plock_);
  11. Queue_[Pptr_]= t;
  12. Pptr_++;
  13. Pptr_%= capacity_;pthread_mutex_unlock(&Plock_);sem_post(&Csem_);}};
  1. #pragmaonce#include"sem_cp.cpp"#include<pthread.h>#include<iostream>#include<string>#include<mutex>#include"CalTask.cpp"template<classTask>classThread_Pool{structThread_Data{int Thread_num;
  2. pthread_t tid;};private:
  3. RingQueue<Task> Queue_;//线程安全的环形队列
  4. std::vector<Thread_Data> thread_arr;//管理线程的容器static std::mutex lock_;//单例锁static Thread_Pool<Task>* ptr_;//单例指针private:Thread_Pool(int capacity_Of_queue =20):Queue_(capacity_Of_queue){}Thread_Pool(const Thread_Pool<Task>& Tp)=delete;
  5. Thread_Pool<Task>&operator=(const Thread_Pool<Task>& Tp)=delete;public:~Thread_Pool(){}//获取线程池单例-->注意C++的类模板静态成员函数需要在类体外进行定义static Thread_Pool<Task>*Getinstance();//创建多线程voidCreate_thread(int thread_num =10){
  6. Thread_Data T_data;for(int i =0; i < thread_num ;++i){//注意线程池对象的this指针传递给线程pthread_create(&T_data.tid,nullptr,Routine,this);
  7. T_data.Thread_num = i +1;
  8. thread_arr.push_back(T_data);}}//线程等待voidThread_join(){for(int i =0;i < thread_arr.size();++i){pthread_join(thread_arr[i].tid,nullptr);}}//向线程池中加入任务voidPush(Task T){
  9. Queue_.Push(T);}voidPush(Task && T){
  10. Queue_.Push(std::forward<Task>(T));}private://线程函数-->该函数没有在类外调用,所以无须在类体外定义staticvoid*Routine(void* args){
  11. Thread_Pool<Task>* Pool =static_cast<Thread_Pool<Task>*>(args);while(true){
  12. std::cout <<"Thread prepare to work\n"<< std::endl;
  13. Task Thread_Task = Pool->Queue_.Pop();//要求Task类重载()-->用于执行具体任务Thread_Task();}returnnullptr;}};//初始化静态指针template<classTask>
  14. Thread_Pool<Task>* Thread_Pool<Task>::ptr_ =nullptr;template<classTask>
  15. std::mutex Thread_Pool<Task>::lock_;//注意C++的类模板静态成员函数需要在类体外进行定义template<classTask>
  16. Thread_Pool<Task>*Thread_Pool<Task>::Getinstance(){if(ptr_ ==nullptr){
  17. lock_.lock();if(ptr_ ==nullptr){
  18. ptr_ =new Thread_Pool<Task>;}
  19. lock_.unlock();}return ptr_;}

序列化反序列化工具模块

  • 序列反序列化是保证通信过程中数据完整性的关键步骤,保证数据语义完整,结构完整

在这里插入图片描述

  1. #pragmaonce#include<iostream>#include<string>// 自定义序列化反序列化协议const std::string blank_space_sep =" ";const std::string protocol_sep ="\n";//封装报文
  2. std::string Encode(std::string &content){//报文正文字节数
  3. std::string package = std::to_string(content.size());
  4. package += protocol_sep;
  5. package += content;//用分隔符封装正文
  6. package += protocol_sep;return package;}//解析报文package-->"正文长度"\n"正文"\nboolDecode(std::string &package, std::string& content){
  7. size_t pos = package.find(protocol_sep);if(pos == std::string::npos)returnfalse;//解析报文正文长度
  8. size_t Len = std::atoi(package.substr(0,pos).c_str());//确定报文是否完整
  9. size_t total_Len = pos + Len +2;if(package.size()!= total_Len)returnfalse;//获取正文内容
  10. content = package.substr(pos+1,Len);
  11. package.erase(0,total_Len);returntrue;}//用户层协议请求结构体classRequest{public:int x;int y;char op;public:Request(int data1 ,int data2 ,char op):x(data1),y(data2),op(op){}Request(){}public://请求结构体 序列化 成报文正文字符串 "x op y"boolSerialize(std::string& out){
  12. std::string content = std::to_string(x);
  13. content += blank_space_sep;
  14. content += op;
  15. content += blank_space_sep;
  16. content += std::to_string(y);
  17. out = content;returntrue;// 等价的jason代码// Json::Value root;// root["x"] = x;// root["y"] = y;// root["op"] = op;// // Json::FastWriter w;// Json::StyledWriter w;// out = w.write(root);// return true;}//报文正文字符串 反序列化 成请求结构体// "x op y"boolDeserialize(const std::string &in){
  18. size_t left = in.find(blank_space_sep);if(left == std::string::npos)returnfalse;
  19. x = std::stoi(in.substr(0,left).c_str());
  20. std::size_t right = in.rfind(blank_space_sep);if(right == std::string::npos)returnfalse;
  21. y = std::atoi(in.substr(right +1).c_str());if(left +2!= right)returnfalse;
  22. op = in[left+1];returntrue;// 等价的jason代码// Json::Value root;// Json::Reader r;// r.parse(in, root);// x = root["x"].asInt();// y = root["y"].asInt();// op = root["op"].asInt();// return true;}voidDebugPrint(){
  23. std::cout <<"新请求构建完成: "<< x << op << y <<"=?"<< std::endl;}};//用户层协议请求回应结构体classResponse{public:int result;int code;public:Response(int res ,int c):result(res),code(c){}Response(){}public://请求回应结构体 序列化 成报文正文字符串 "result code"boolSerialize(std::string& out){
  24. std::string s = std::to_string(result);
  25. s += blank_space_sep;
  26. s += std::to_string(code);
  27. out = s;returntrue;// 等价的jason代码// Json::Value root;// root["result"] = result;// root["code"] = code;// // Json::FastWriter w;// Json::StyledWriter w;// out = w.write(root);// return true;}//"result code"//报文正文字符串 反序列化 成请求回应结构体boolDeserialize(const std::string &in){
  28. std::size_t pos = in.find(blank_space_sep);if(pos == std::string::npos)returnfalse;if(pos ==0|| pos == in.size()-1)returnfalse;
  29. result = std::stoi(in.substr(0, pos).c_str());
  30. code = std::stoi(in.substr(pos+1).c_str());returntrue;// 等价的jason代码// Json::Value root;// Json::Reader r;// r.parse(in, root);// result = root["result"].asInt();// code = root["code"].asInt();// return true;}voidDebugPrint(){
  31. std::cout <<"结果响应完成, result: "<< result <<", code: "<< code << std::endl;}};

通信信道建立模块

  1. #pragmaonce#include<iostream>#include<string>#include<sys/types.h>#include<sys/socket.h>#include"log.hpp"#include<memory.h>#include<arpa/inet.h>#include<netinet/in.h>namespace MySocket{//Tcp通讯构建器classTcpServer{enum{
  2. UsageError =1,
  3. SocketError,
  4. BindError,
  5. ListenError,};private:int socketfd_ =-1;
  6. std :: string ip_;uint16_t port_;int backlog_ =10;public:TcpServer(const std::string& ip ="172.19.29.44",uint16_t port =8081):ip_(ip),port_(port){}~TcpServer(){if(socketfd_ >0)close(socketfd_);}public://确定通信协议,建立文件描述符voidBuildSocket(){
  7. socketfd_ =socket(AF_INET,SOCK_STREAM,0);if(socketfd_ <0){lg(Fatal,"socket error,%s\n",strerror(errno));exit(SocketError);}}//文件描述符与服务器ip : 端口号绑定voidSocketBind(){structsockaddr_in addr;memset(&addr,0,sizeof(addr));
  8. addr.sin_port =htons(port_);
  9. addr.sin_family = AF_INET;
  10. addr.sin_addr.s_addr =inet_addr(ip_.c_str());if(bind(socketfd_,(const sockaddr*)&addr,sizeof(addr))<0){lg(Fatal,"socket bind error,%s\n",strerror(errno));exit(BindError);}lg(Info,"socket bind success\n");}//启动服务监听,等待客户端的连接voidSocklisten(){if(socketfd_ <=0){lg(Fatal,"socket error,%s\n",strerror(errno));exit(SocketError);}if(listen(socketfd_,backlog_)<0){lg(Fatal,"listen error, %s: %d",strerror(errno), errno);exit(ListenError);}}//服务器接收客户端的连接-->并创建用于通信的文件描述符-->一个客户端连接对应一个文件描述符intSockAccept(std::string& cilent_ip,uint16_t& cilent_port){structsockaddr_in client_addr;// 输出型参数,用于获取用户的ip : 端口号memset(&client_addr,0,sizeof(client_addr));
  11. socklen_t Len =sizeof(client_addr);int newfd =accept(socketfd_,(structsockaddr*)&client_addr,&Len);if(newfd <0){lg(Warning,"accept error, %s: %d",strerror(errno), errno);return-1;}//提取客户端信息-->输出参数char ipstr[64];
  12. cilent_ip =inet_ntop(AF_INET,&client_addr.sin_addr,ipstr,sizeof(ipstr));
  13. cilent_ip = ipstr;
  14. cilent_port =ntohs(client_addr.sin_port);return newfd;}public:intGet_Server_fd(){return socketfd_;}voidClose_fd(){if(socketfd_ >0){close(socketfd_);
  15. socketfd_ =-1;}}};};

服务器主体模块

在这里插入图片描述

  1. #pragmaonce#include"ThreadPool.cpp"#include"TcpServer.cpp"#include"CalTask.cpp"#include"log.hpp"#include<signal.h>//构建计算器服务器classCalServer{constint size =2048;private:
  2. Thread_Pool<CalTask>* Pool_ptr_;
  3. MySocket::TcpServer Socket_;int Socket_fd_ =-1;public:CalServer(const std::string& de_ip ="172.19.29.44",uint16_t de_port =8081):Socket_(de_ip,de_port){
  4. Pool_ptr_ =Thread_Pool<CalTask>::Getinstance();if(Pool_ptr_ ==nullptr){lg(Fatal,"Pool_ptr_ is nullptr\n");return;}
  5. Pool_ptr_->Create_thread();}~CalServer(){}public://建立Tcp连接条件boolInit(){
  6. Socket_.BuildSocket();
  7. Socket_fd_ = Socket_.Get_Server_fd();if(Socket_fd_ <0){lg(Fatal,"BuildSocket failed\n");returntrue;}
  8. Socket_.SocketBind();
  9. Socket_.Socklisten();lg(Info,"init server .... done");returntrue;}//启动服务器voidStart(){signal(SIGCHLD, SIG_IGN);signal(SIGPIPE, SIG_IGN);char ReadBuffer[size];while(true){//接受用户请求
  10. std::string client_ip;uint16_t client_port;int client_fd = Socket_.SockAccept(client_ip,client_port);if(client_fd <0){lg(Warning,"SockAccept error\n");continue;}lg(Info,"accept a new link, sockfd: %d, clientip: %s, clientport: %d", client_fd, client_ip.c_str(), client_port);int n =read(client_fd,ReadBuffer,sizeof(ReadBuffer));
  11. ReadBuffer[n]=0;
  12. std::string TaskStr(ReadBuffer);printf("receives mess from client : %s",ReadBuffer);if(n <0){lg(Warning,"read error\n");break;}
  13. CalTask task(client_fd,client_ip,client_port,TaskStr);
  14. Pool_ptr_->Push(task);}}};

任务回调模块(根据具体应用场景可重构)

  1. #pragmaonce#include<string>#include"ThreadPool.cpp"#include"Protocol.cpp"enum{
  2. Div_Zero =1,
  3. Mod_Zero,
  4. Other_Oper
  5. };classCalTask{private:int socketfd_;//网络通信文件描述符
  6. std :: string ip_;//客户端ipuint16_t port_;//客户端端口号
  7. std::string package_;//客户请求字符串public:CalTask(int socketfd,const std::string& ip ,uint16_t& port,std::string & str):socketfd_(socketfd),ip_(ip),port_(port),package_(str){}CalTask(){}//类一定要有默认构造函数~CalTask(){}public://执行计算任务并将结果发送给用户voidoperator()(){
  8. std::cout <<"Task Running ... \n"<< std::endl;
  9. std::string content;//将用户发送的报文进行解包获取正文bool r =Decode(package_, content);if(!r)return;//将报文正文进行反序列化
  10. Request req;
  11. r = req.Deserialize(content);if(!r)return;
  12. req.DebugPrint();
  13. content ="";//构建计算结果
  14. Response resp =CalculatorHelper(req);
  15. resp.DebugPrint();//计算结果序列化成字符串
  16. resp.Serialize(content);//字符串正文封装成报文发送给用户
  17. std::string ResStr =Encode(content);write(socketfd_,ResStr.c_str(),ResStr.size());if(socketfd_ >0)close(socketfd_);}private:
  18. Response CalculatorHelper(const Request &req){//构建请求回应结构体
  19. Response resp(0,0);switch(req.op){case'+':
  20. resp.result = req.x + req.y;break;case'-':
  21. resp.result = req.x - req.y;break;case'*':
  22. resp.result = req.x * req.y;break;case'/':{if(req.y ==0)
  23. resp.code = Div_Zero;else
  24. resp.result = req.x / req.y;}break;case'%':{if(req.y ==0)
  25. resp.code = Mod_Zero;else
  26. resp.result = req.x % req.y;}break;default:
  27. resp.code = Other_Oper;break;}return resp;}};

Tips:DebugC++代码过程中遇到的问题记录

  • 使用C++类模板时,若在类模板中定义了静态成员函数,且该静态成员函数在类外被调用,则该静态成员函数必须定义在类外,不然链接器无法找到函数体.
  • 注意类模板静态成员的声明格式需要加关键字temlpate<>
  • 声明类模板静态成员时无需特化模版类型参数
  • 跨主机并发通信测试:在这里插入图片描述在这里插入图片描述

本文转载自: https://blog.csdn.net/weixin_73470348/article/details/136219891
版权归原作者 摆烂小青菜 所有, 如有侵权,请联系我们删除。

“[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)”的评论:

还没有评论