送给大家一句话:
没有一颗星,会因为追求梦想而受伤,当你真心渴望某样东西时,整个宇宙都会来帮忙。 – 保罗・戈埃罗 《牧羊少年奇幻之旅》
🏕️🏕️🏕️🏕️🏕️🏕️
🗻🗻🗻🗻🗻🗻
进程通信实战 —— 进程池项目
1 ♻️知识回顾
在之前的讲解中,我们深入探讨了以下几个方面:
- 父子进程的创建与管理:我们详细讲解了父子进程是如何建立的,以及子进程如何继承父进程的代码和数据。子进程通常用于完成特定的任务。
- 文件操作:我们学习了如何使用 read 和 write 操作文件,并了解了文件描述符(fd)的概念,从而能够在文件中进行信息的读取和写入。
- 进程间通信:我们介绍了匿名管道,这是一种父子进程间进行通信的方式。通过共享资源,父子进程可以实现数据的传递和同步。
在接下来的内容中,让我们把所学知识来进行运用,我们将探讨进程池的概念和实现细节。
2 ♻️项目介绍
进程池是一种用于管理和复用进程的技术,它可以有效地管理系统资源并提高程序的性能和效率。通过维护一组预先创建的进程与管道,进程池可以避免频繁地创建和销毁进程,从而减少了系统开销和资源浪费。
主要使用的是池化技术的思想:
池化技术是一种广泛应用于系统开发中的优化策略,旨在通过复用资源来提高性能和效率。池化技术的核心思想是预先分配一组资源,并在需要时进行复用,而不是每次都重新创建和销毁资源。
池化技术(Pooling)涉及创建和管理一组预先分配的资源,这些资源可以是进程、线程、数据库连接或对象实例。在池化系统中,当请求到达时,它会从池中获取一个空闲资源,使用完毕后将其归还池中。这种方法避免了频繁的创建和销毁操作,从而显著减少了系统开销。
进程池就是通过预先创建若干个进程与管道,在需要进行任务时,选择一个进程,通过管道发送信息,让其完成工作。
进程池在实际项目中有广泛的应用,尤其是在处理大量并发任务时,例如:网络服务器中的请求处理、数据处理以及计算密集型任务。通过合理配置进程池的大小和参数,可以有效控制系统负载,提高整体响应速度。
3 ♻️项目实现
3.1 ✨创建信道和子进程
首先我们需要建立一个信道类,来储存管道及其对应的子进程信息。
//信道类classChannel{public:Channel(pid_t id ,int wfd , std::string name):_id(id),_wfd(wfd),_name(name){}~Channel(){}voidClose(){close(_wfd);}//关闭管道时需要等待对应子进程结束voidWaitSub(){
pid_t rid =waitpid(_id,nullptr,0);if(rid >0){
std::cout <<"wait "<< rid <<" success"<< std::endl;}}
pid_t GetId(){return _id;}intGetWfd(){return _wfd;}
std::string GetName(){return _name ;}private:
pid_t _id ;//对应 子进程 idint _wfd ;//写入端
std::string _name ;//管道名称};
然后我们就建立若干个信道与子进程,创建子进程与信道的时候,把信息插入到信道容器中,完成储存。子进程需要阻塞在读取文件,等待父进程写入信息:
voidCreateChannel(int num , std::vector<Channel>* channel){//初始化任务InitTask();for(int i =0; i < num ; i++){//创建管道int pipefd[2]={0};int n =pipe(pipefd);if(n !=0){
std::cout <<"create pipe failed!"<< std::endl;}//创建子进程
pid_t id =fork();if(id ==0){//子进程 --- 只读不写close(pipefd[1]);work(pipefd[0]);close(pipefd[0]);exit(0);}//父进程close(pipefd[0]);
std::string name ="Channel - "+ std::to_string(i);//储存信道信息
channel->push_back(Channel(id , pipefd[1], name));}}
这里提一下传参的规范:
const &
:表示输出型参数,即该参数是输入型,不会被修改。常用于传递不需要修改的对象或数据。&
:表示输入输出型参数,即该参数既是输入参数,又是输出参数,函数可能修改其内容。*
:表示输出型参数,通常用于传递指针,函数通过指针参数返回结果给调用者。
进行一下测试,看看是否可以这正常建立信道与子进程;
intmain(int argc ,char* argv[]){//1. 通过main函数的参数 int argc char* argv[] (./ProcessPool 5) //判断要创建多少个进程if(argc !=2){
std::cout <<"请输入需要创建的信道数量 :"<< std::endl;}
std::vector<Channel> channel;int num = std::stoi(argv[1]);//2. 创建信道和子进程CreateChannel(num ,&channel);//测试:for(auto t : channel){
std::cout<<"==============="<<std::endl;
std::cout<<"信道对应 name :"<< t.GetName()<<std::endl;
std::cout<<"信道对应子进程 pid :"<< t.GetId()<<std::endl;
std::cout<<"信道对应写端 wfd :"<< t.GetWfd()<<std::endl;}return0;}
完美,可以正常创建!!!
3.2 ✨建立任务
完成了信道与子进程的创建,接下来我们就来设置一些任务。我们在
.hpp文件
里直接把声明定义写在一起,确保代码的模块化和可维护性。
voidPrint(){
std::cout <<"this is Print()"<< std::endl;}voidFflush(){
std::cout <<"this is Fflush()"<< std::endl;}voidScanf(){
std::cout <<"this is Scanf()"<< std::endl;}
然后通过函数指针数来储存这些函数,因为子进程会继承父进程的数据,这样通过一个数字下标即可确定调用的函数。只需要传入 4 个字节的int类型,最大程度的减少了通信的成本!!!
#pragmaonce#include<iostream>#include<ctime>#include<cstdlib>#include<sys/types.h>#include<unistd.h>#defineTaskNum3//这个文件里是任务函数typedefvoid(*task_t)();
task_t tasks[TaskNum];//...//...三个函数//...voidInitTask(){srand(time(nullptr)^getpid()^17777);
tasks[0]= Print;
tasks[1]= Fflush;
tasks[2]= Scanf;}//执行任务!!!voidExecuteTask(int num){if(num <0|| num >2)return;
tasks[num]();}//随机挑选一个任务intSelectTask(){returnrand()% TaskNum;}
3.3 ✨控制子进程
首先通过 SelectTask() 选择一个任务,然后选择一个信道和子进程。需要注意的是,这里要依次调用每一组子进程,采用轮询(Round-Robin)方案,以尽可能实现负载均衡。 然后发送任务(向信道写入4字节的数组下标)
intSelectChannel(int n){//静态变量做到轮询方案staticint next =0;int channel = next;
next++;
next %= n;return channel;}voidSendTaskCommond(Channel& channel ,int TaskCommand ){//写入对应信息write(channel.GetWfd(),&TaskCommand ,sizeof(TaskCommand));}voidCtrlProcessOnce(std::vector<Channel>& channel){//选择一个任务int TaskCommand =SelectTask();//选择一个进程与信道int ChannelNum =SelectChannel(channel.size());//发送信号//测试
std::cout <<"taskcommand: "<< TaskCommand <<" channel: "<< channel[ChannelNum].GetName()<<" sub process: "<< channel[ChannelNum].GetId()<< std::endl;SendTaskCommond(channel[ChannelNum],TaskCommand);}
我们写入之后,子进程就可以读取任务并执行,注意子进程读取只读4个字节!!!如果读取的个数不正确,那么就出现了错误,需要报错!!!
//子进程运行函数voidwork(int rfd){while(true){int Commond =0;//等待相应int n =read(rfd ,&Commond ,sizeof(Commond));if(n ==sizeof(int)){
std::cout <<"pid is : "<<getpid()<<" handler task"<< std::endl;//执行命令
std::cout <<"commond :"<< Commond << std::endl;ExecuteTask(Commond);}//写端关闭elseif(n ==0){
std::cout <<"sub Process:"<<getpid()<< std::endl;break;}}}//...//创建子进程
pid_t id =fork();if(id ==0){//子进程 --- 只读不写close(pipefd[1]);work(pipefd[0]);close(pipefd[0]);exit(0);}//...
进行一下测试:
成功执行任务!!!
3.4 ✨回收信道和子进程
首先关闭信道写端,这样子进程会自己退出,然后父进程等待子进程退出(wait等待子进程 )不要出现僵尸进程 !!!
注意由于子进程会继承父进程的数据,所以一个信道实际上会有多个写端。为了不必要的错误,分开集中操作:先关闭所有写端,再等待所以子进程。
voidCleanUpChannel(std::vector<Channel>& channel){for(auto t : channel){
t.Close();}for(auto t : channel){
t.WaitSub();}}
测试一下:
5 个子进程成功退出释放!!!
4 ♻️总结
这样,我们的进程池项目就完成了。不过,实际上我们还可以进一步优化,比如优化 work 函数,将其设置为回调函数,以实现完全解耦。
尽管如此,目前的实现已经能够满足我们的项目需求。一个面向过程的进程池项目就此完成!!!
Thanks♪(・ω・)ノ谢谢阅读!!!
下一篇文章见!!!
版权归原作者 叫我龙翔 所有, 如有侵权,请联系我们删除。