这篇博客讲解了进程池的创建过程,并在最后附上了完整代码。
现在有一个父进程,然后提前创建出一批子进程,未来如果父进程master有一些任务要交给子进程去运行,而不用像shell,需要执行命令才回去创建进程,创建进程本身也是有成本的。父进程要把任务派发给一些子进程,注定了要进行通信,我们可以提前给每一个进程创建管道,由父进程持有写端,子进程持有读端。我们有了管道这种技术,就可以让父进程通过管道将任务传递给子进程,想让哪个进程执行任务,就给哪个管道写入任务,我们把提前创建的这批进程叫做进程池, 这种预先创建的进程就可以大大减少未来执行任务时创建进程的成本。master只负责往管道写任务,子进程只会等待任务的到来,一旦来了就会处理。如果父进程不往管道里写任务,管道里没数据,管道读写端的文件描述符也没关,子进程会阻塞等待,等待任务的道理!!master向哪一个管道写入,就是唤醒哪一个子进程来处理任务!
这样就通过管道实现了进程的协同,可以由父进程定向唤醒一个或多个进程。我们在给子进程分配任务时,不能让一个特别忙而是让它们均衡一些,父进程在进行任务划分时要做到划分的负载均衡!
我们站在父进程的角度,创建一个信道类Channel,
//master
class Channel
{
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
其中,_wfd是管道的写入端,_subprocessid是对应子进程的id,_name表示管道的名字。
并通过vector来管理管道:
std::vector<Channel> channels;
有一个提示需要交代一下,在C++中,我们的形参命名规范:
const &:输入型参数
& :输入输出型参数
- :输出型参数
我们接下来就是创建信道和子进程,并把它们打印出来,看看我们的代码框架有没有问题:
#include <iostream>
#include <string>
#include <vector>
#include <sys/types.h>
#include <unistd.h>
void work(int rfd)
{
while(true)
{
sleep(1);
}
}
//master
class Channel
{
public:
Channel(int wfd,pid_t id,const std::string& name)
:_wfd(wfd)
,_subprocessid(id)
,_name(name)
{
}
int GetWfd() {return _wfd ;}
pid_t GetProcessId(){return _subprocessid;}
std::string GetName(){return _name;}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(int num, std::vector<Channel>* channels)
{
for(int i = 0; i<num ; i++)
{
//1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if(n < 0) exit(1);
//2.创建子进程
pid_t id = fork();
if(id == 0)
{
//child
close(pipefd[1]);
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//father
close(pipefd[0]);
//a.子进程的pid b.父进程关心的管道的w端
//3.构造一个通道的名字
std::string channel_name = "Channel-" + std::to_string(i);
channels->push_back(Channel(pipefd[1],id,channel_name));
}
}
// ./processpool 5
int main(int argc,char* argv[])
{
if(argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
return 1;
}
std::vector<Channel> channels;
int num = std::stoi(argv[1]);
//1.创建信道和子进程
CreateChannelAndSub(num, &channels);
//for test
for(auto& channel : channels)
{
std::cout<<"==============================="<<std::endl;
std::cout<<channel.GetName()<<std::endl;
std::cout<<channel.GetWfd()<<std::endl;
std::cout<<channel.GetProcessId()<<std::endl;
}
sleep(100);
return 0;
}
通过运行程序,我们给的命令行参数是10,创建10个子进程,然后打开进程监测窗口:
可以看到我们创建管道和子进程成功,非常好!
我们搭建好框架后,接下来就要通过channel控制子进程、回收管道和子进程。
父进程需要给子进程发任务,那任务是什么呢?父进程是没有办法把函数发送到管道里的,而任务其实就是让子进程执行某段代码,而父子进程数据可以写时拷贝,但是代码是共享的,所以,我们要构建任务,我们可以由父进程预先规定一些任务,这些任务本质就是一张表(函数指针数组),保存了各种方法的地址,未来我们就可以往管道里写固定长度的4字节的数组下标(任务码),所以,我们现在要转过头构建一批任务,为了方便,我们创建Task.hpp文件,.hpp允许声明和实现写在一个头文件里,.hpp文件的缺点是无法形成库,只能开源式地给别人,一般在开源项目里会用到:
#pragma once
#include <iostream>
#include <stdlib.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3
typedef void (*task_t)(); // task_t 函数指针类型
void Print()
{
std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
std::cout<< "I am a download task" << std::endl;
}
void Flush()
{
std::cout<< "I am a flush task" << std::endl;
}
task_t tasks[TaskNum];
void LoadTask()
{
srand(time(nullptr) ^ getpid() ^ 1642);
tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}
void ExcuteTask(int number)
{
if(number < 0 || number > 2) return;
tasks[number]();
}
int SelectTask()
{
return rand() % TaskNum;
}
在我们的代码中,按顺序给各个子进程发送任务,这种叫轮询方案,以下是我们的具体实现代码:
#include <iostream>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "Task.hpp"
void work(int rfd)
{
while(true)
{
int command = 0;
int n = read(rfd,&command,sizeof(command));
if(n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if(n == 0)
{
std::cout << "sub process : " << getpid() << "quit" << std::endl;
break;
}
}
}
//master
class Channel
{
public:
Channel(int wfd,pid_t id,const std::string& name)
:_wfd(wfd)
,_subprocessid(id)
,_name(name)
{
}
int GetWfd() {return _wfd ;}
pid_t GetProcessId(){return _subprocessid;}
std::string GetName(){return _name;}
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if(rid > 0)
{
std::cout << "wait " << rid << " success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(int num, std::vector<Channel>* channels)
{
for(int i = 0; i<num ; i++)
{
//1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if(n < 0) exit(1);
//2.创建子进程
pid_t id = fork();
if(id == 0)
{
//child
close(pipefd[1]);
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//father
close(pipefd[0]);
//a.子进程的pid b.父进程关心的管道的w端
//3.构造一个通道的名字
std::string channel_name = "Channel-" + std::to_string(i);
channels->push_back(Channel(pipefd[1],id,channel_name));
}
}
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(Channel& channel, int taskcommand)
{
write(channel.GetWfd(),&taskcommand,sizeof(taskcommand));
}
void ctrlProcessOnce(std::vector<Channel>& channels)
{
sleep(1);
//a. 选择一个任务
int taskcommand = SelectTask();
//b. 选择一个信道和进程
int channel_index = NextChannel(channels.size());
//c. 发送任务
SendTaskCommand(channels[channel_index],taskcommand);
std::cout << std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: " \
<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void ctrlProcess(std::vector<Channel>& channels, int times = -1)
{
if(times > 0)
{
while(times--)
{
ctrlProcessOnce(channels);
}
}
else
{
while(true)
{
ctrlProcessOnce(channels);
}
}
}
void CleanUpChannel(std::vector<Channel>& channels)
{
for(auto& channel : channels)
{
channel.CloseChannel();
}
for(auto& channel : channels)
{
channel.Wait();
}
}
// ./processpool 5
int main(int argc,char* argv[])
{
if(argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
return 1;
}
std::vector<Channel> channels;
int num = std::stoi(argv[1]);
LoadTask();
//1.创建信道和子进程
CreateChannelAndSub(num, &channels);
//2.通过channel控制子进程
ctrlProcess(channels,num);
//3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
CleanUpChannel(channels);
sleep(5);
return 0;
}
其中,上面的代码有两个小细节处理:
1.在创建子进程时, ,这样就可以让子进程不关心管道读端,只需要从标准输入读就行。
这样就可以将管道的逻辑和子进程执行任务的逻辑解耦。
2.子进程要执行的word本身就是一个任务,可以作为CreateChannelAndSub的参数传入,在然后回调work。其中task_t task叫做回调函数,未来所有子进程都回去调用传入的task。这样之后,进程池本身的代码和任务本身两个文件就彻底解耦了!(把work函数放到Task.hpp )
可是现在我们的代码还存在一个BUG,我们来看,
里面写了两个循环,不可以放到一个循环里吗?
我们发现,随着管道的创建,越来越多的写端指向第一个管道,如果创建了10个子进程,那就有10个写端指向第一个管道。所以,如果两个循环写到一起,就会从头向后关管道的文件描述符,第一个关完后,还是有九个文件描述符指向第一个管道,管道中对文件描述符有引用计数,此时,这个管道并没有向我们预期的那样退出,写端没有关完,读端什么都读不到,读端依旧阻塞,子进程不退出,进程就阻塞了。
那为什么写成两个循环就可以呢?因为当关掉最后一个管道时,最后一个子进程指向上一个管道的写端就被释放了,类似于递归,从下往上就关掉了。
我们现在意识到了这个问题,那我们就可以倒着先关闭最下面的管道,
void CleanUpChannel(std::vector<Channel>& channels)
{
int num = channels.size()-1;
while(num >= 0)
{
channels[num].CloseChannel();
channels[num--].Wait();
}
}
这样做是没有问题的,但是我们并没有从源头上解决这个bug,我们就不应该让这种情况发生,我们的想法是这样的,如果是第二次及之后创建子进程时,*channels数组一定不为空,里面一定包含写端,那就把里面的每一个管道的写端关闭一下就好:
完整的进程池代码附上:
Makefile
processpool:ProcessPool.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f processpool
Task.hpp
#pragma once
#include <iostream>
#include <stdlib.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3
typedef void (*task_t)(); // task_t 函数指针类型
void Print()
{
std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
std::cout<< "I am a download task" << std::endl;
}
void Flush()
{
std::cout<< "I am a flush task" << std::endl;
}
task_t tasks[TaskNum];
void LoadTask()
{
srand(time(nullptr) ^ getpid() ^ 1642);
tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}
void ExcuteTask(int number)
{
if(number < 0 || number > 2) return;
tasks[number]();
}
int SelectTask()
{
return rand() % TaskNum;
}
void work()
{
while(true)
{
int command = 0;
int n = read(0,&command,sizeof(command));
if(n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if(n == 0)
{
std::cout << "sub process : " << getpid() << "quit" << std::endl;
break;
}
}
}
ProcessPool.cc
#include <iostream>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "Task.hpp"
// void work(int rfd)
// {
// while(true)
// {
// int command = 0;
// int n = read(rfd,&command,sizeof(command));
// if(n == sizeof(int))
// {
// std::cout << "pid is : " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if(n == 0)
// {
// std::cout << "sub process : " << getpid() << "quit" << std::endl;
// break;
// }
// }
// }
// master
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
: _wfd(wfd), _subprocessid(id), _name(name)
{
}
int GetWfd() { return _wfd; }
pid_t GetProcessId() { return _subprocessid; }
std::string GetName() { return _name; }
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if (rid > 0)
{
std::cout << "wait " << rid << " success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
for (int i = 0; i < num; i++)
{
// 1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 2.创建子进程
pid_t id = fork();
if (id == 0)
{
// 第二次及之后创建管道
if (!channels->empty())
{
for (auto &channel : *channels)
{
channel.CloseChannel();
}
}
// child
close(pipefd[1]);
dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入
task();
close(pipefd[0]);
exit(0);
}
// father
close(pipefd[0]);
// a.子进程的pid b.父进程关心的管道的w端
// 3.构造一个通道的名字
std::string channel_name = "Channel-" + std::to_string(i);
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(Channel &channel, int taskcommand)
{
write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
void ctrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
// a. 选择一个任务
int taskcommand = SelectTask();
// b. 选择一个信道和进程
int channel_index = NextChannel(channels.size());
// c. 发送任务
SendTaskCommand(channels[channel_index], taskcommand);
std::cout << std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: "
<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{
if (times > 0)
{
while (times--)
{
ctrlProcessOnce(channels);
}
}
else
{
while (true)
{
ctrlProcessOnce(channels);
}
}
}
void CleanUpChannel(std::vector<Channel> &channels)
{
int num = channels.size() - 1;
while (num >= 0)
{
channels[num].CloseChannel();
channels[num--].Wait();
}
// for(auto& channel : channels)
// {
// channel.CloseChannel();
// }
// for(auto& channel : channels)
// {
// channel.Wait();
// }
}
// ./processpool 5
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
return 1;
}
std::vector<Channel> channels;
int num = std::stoi(argv[1]);
LoadTask();
// 1.创建信道和子进程
CreateChannelAndSub(num, &channels, work);
// 2.通过channel控制子进程
ctrlProcess(channels, num);
// 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
CleanUpChannel(channels);
sleep(5);
return 0;
}
版权归原作者 核动力C++选手 所有, 如有侵权,请联系我们删除。