*🍑个人主页:Jupiter.🚀 所属专栏:Linux从入门到进阶欢迎大家点赞收藏评论😊*
目录
🍑进程间通信
🐬进程间通信目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信的
实质
:
让不同的进程看到一份资源
。
📚管道
什么是管道?
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”,其中,管道只能被设计为
单向通信
的。
📕管道的原理
🐧用fork来共享管道原理
🦌站在文件描述符角度-深度理解管道
- 当一个进程
以读和以写
打开同一个文件
的时候,会有两个struct file对象
(如下图),一个被打开的文件只有一个文件缓冲区,所以这两个struct file对象指向的是同一个缓冲区。(struct file允许多个指针指向的,里面有引用计数,当上层调用close的时候,实际上是将对应的struct file* fd_struct里面的数据清空,然后引用计数–,为0就会回收对应的文件缓冲区) - 当我们fork创建子进程后,会以父进程为模板,将进程独有的资源拷贝给子进程(浅拷贝)。
- 这时,子进程也指向父进程所指向的那两个struct file对象,两个进程就看到了同一份资源(缓冲区)。
- 将父子进程对应的读或则写端关闭,就形成了单向的管道。
🚀匿名管道
OS提供的一个系统调用
pipe
,调用后OS还是使用文件那一套,只需创建一个
内存级文件
对象与文件缓冲区,但并不是真的打开了一个文件,其中磁盘中并不存在这个文件,不需要向磁盘做刷新;
头文件: #include <unistd.h>
功能:创建一无名管道
原型:
- int pipe(int fd[2]);
- 参数: - fd:文件描述符数组,其中
fd[0]
表示读端
,fd[1]
表示写端
- 返回值:成功返回0,失败返回错误代码。
示例代码:
voidwriter(int wfd){constchar*str ="hello father, I am child";char buffer[128];snprintf(buffer,sizeof(buffer),"%s", str);write(wfd, buffer,strlen(buffer));}voidreader(int rfd){char buffer[1024];
ssize_t n =read(rfd, buffer,sizeof(buffer)-1);(void)n;printf("father get a message: %s", buffer);}intmain(){// 1. 创建管道int pipefd[2];int n =pipe(pipefd);if(n <0)return1;printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0], pipefd[1]);// 3, 4// 2. 创建子进程
pid_t id =fork();if(id ==0){//子进程作为写端,关闭读close(pipefd[0]);writer(pipefd[1]);exit(0);}//父进程作为读端,关闭写close(pipefd[1]);reader(pipefd[0]);wait(NULL);return0;}
🔒管道读写规则
- 管道内部
没有数据
并且写端没有关闭自己的fd,读端
就会阻塞等待
,直到pipe中有数据。 - 管道内部
写满数据
并且读端没有关闭自己的fd,写端
就会阻塞等待
,直到pipe中有空间可以写。 - 如果
所有管道写端
对应的文件描述符被关闭
,则read返回0
,表示读结束
,类似读到了文件结尾。 - 如果
所有管道读端
对应的文件描述符被关闭
,则write操作
会产生信号SIGPIPE
,进而可能导致write进程退出
。
🐟管道特点
- 只能用于具有共同祖先的进程(具有
亲缘关系
的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。 - 管道是
面向字节流
的。 - 一般而言,进程退出,管道释放,所以管道的生命周期随进程。
- 一般而言,内核会对管道操作进行同步与互斥。
- 当要写入的数据量不大于
PIPE_BUF
时,linux将保证写入的原子性
。当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。(PIPE_BUF==4096字节
) - 管道是
半双工的
,数据只能向一个方向流动(单向通信 );需要双方通信时,需要建立起两个管道 。
🦅基于管道实现一个进程池
实现目标
:创建多个子进程,父进程通过管道向子进程发送任务,对子进程进行控制,使子进程能够负载均衡的完成父进程提供的任务。
#pragmaonce#include<iostream>#include<cstdlib>#include<unistd.h>usingnamespace std;typedefvoid(*work_t)();typedefvoid(*task_t)();voidPrintlog(){
cout<<"Printlog"<<endl;}voidcallone(){
cout<<"callone"<<endl;}voidConnectMysql(){
cout<<"ConnectMysql"<<endl;}
task_t tasks[3]={ Printlog , callone , ConnectMysql};uint32_tnextwork(){returnrand()%3;}voidworker(){while(true){uint32_t command_code =0;
ssize_t n =read(0,&command_code,sizeof(command_code));if(n==sizeof(command_code)){if(command_code>=3)continue;
tasks[command_code]();}elseif(n==0){break;}}}
#include<iostream>#include<unistd.h>#include<string>#include<cstdlib>#include<sys/wait.h>#include<vector>#include<ctime>#include"task.hpp"usingnamespace std;#defineSubprocnum5enum{
UsageError =1,
NumError,
PipeError
};voidUsage(const string &proc){
cout <<"Usage:"<< proc <<"-Subprocnum"<< endl;}//./myprocess 6classchannel{public:channel(int wfd, pid_t Sub_id,const string &name):_wfd(wfd),_id(Sub_id),_name(name){}intwfd(){return _wfd;}voidClose(){close(_wfd);}
pid_t id(){return _id;}~channel(){}private:int _wfd;
pid_t _id;
string _name;};classprocesspool{public:processpool(int sub_num):_Sub_num(sub_num){}intNextchannel(){staticint next =0;int n = next++;
next %= channels.size();return n;}intCreatproc(work_t worker){
vector<int> fds;for(int num =0; num < _Sub_num; num++){int pipefd[2]={0};int n =pipe(pipefd);if(n <0)return PipeError;
pid_t id =fork();if(id ==0){if(!fds.empty()){for(auto fd:fds){close(fd);}}// child rclose(pipefd[1]);dup2(pipefd[0],0);worker();exit(0);}
string cname ="channel-"+to_string(num);// father wclose(pipefd[0]);
channels.push_back(channel(pipefd[1], id, cname));
fds.push_back(pipefd[1]);}return0;}voidSend_task(int nextchannel,uint32_t code){int wfd = channels[nextchannel].wfd();write(wfd,&code,sizeof(code));}voidKillall(){for(auto&channel:channels){
channel.Close();}}voidwait(){for(auto&channel:channels){int status =0;
pid_t rid =waitpid(channel.id(),&status,0);if(rid==channel.id()){
cout<<"wait success..."<<endl;}}}~processpool(){}private:int _Sub_num;
vector<channel> channels;};intCol(processpool *processpool_ptr){while(true){// a.选择进程与通道int nextchannel = processpool_ptr->Nextchannel();// b.选择任务uint16_t code =nextwork();// c.发送任务
processpool_ptr->Send_task(nextchannel, code);}return0;}intmain(int argc,char*argv[]){if(argc !=2){Usage(argv[0]);return UsageError;}int Subnum =stoi(argv[1]);if(Subnum <0)return NumError;srand((uint32_t)time(NULL));// 1. 创建进程
processpool *processpool_ptr =newprocesspool(Subnum);
processpool_ptr->Creatproc(worker);// 2.控制进程Col(processpool_ptr);// 3.回收进程// wait process//杀掉所有的进程
processpool_ptr->Killall();//等待所有进程
processpool_ptr->wait();delete(processpool_ptr);return0;}
代码中细节处理:在创建子进程代码中已经处理。
版权归原作者 Jupiter· 所有, 如有侵权,请联系我们删除。