进程间通信——管道
自我名言:只有努力,才能追逐梦想,只有努力,才不会欺骗自己。
喜欢的点赞,收藏,关注一下把!
1.什么是通信
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
2.为什么要通信
我们需要多进程协同完成某种业务内容。
如:
cat file | grep 'hello'
从打印的出的file文件内容里检索hello。
3.如何实现通信
1.两套标准:
System V IPC(聚焦在本地通信)(过时了)
System V 共享内存(重点学这个)
System V 消息队列
System V 信号量
POSIX IPC(让通信过程可以跨主机)
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
2.管道:
管道是一种基于文件系统,实现的通信。
a.匿名管道
b.命名管道
我们知道进程是具有独立性的,今天我们需要通信---------通信代价一定不低!
在通信之前,如何理解通信的本质问题
就如电影中无间道那样,警察派卧底。等卧底在某个地方传信息回来,这个地方能是警察局吗,能是黑社会的地方吗,肯定不能。两个进程也是这样。那这个地方是谁提供的。
1.OS需要直接或间接给通信双方提供"内存空间"。
2.要通信的进程,必要看到一份公共的资源。
不同的通信种类,本质上就是:上面所说的资源,是OS中哪一个模块提供的。
如何通信:
1.需要先让不同的进程看到同一份资源
2.通信。
4.匿名管道
匿名管道:目前能用来让父子进程之间进行进程间通信。
父子进程都看到了同一份内核资源。并且是文件系统提供的。
因此这个文件也叫做管道文件!
管道文件也是文件。可能有人会这样觉得普通文件在磁盘里面,打开时OS也会创建struct file{},那两个进程实现通信,直接让它们在磁盘中进行不也可以吗?
没错,这样实现进程通信也可以,但是访问磁盘速度太慢了。
通信得目的是把一个进程数据交给另一个进程是从内存到内存的。目的并不是把数据写到磁盘。
至于磁盘上是否真正存在这个文件也不在乎了。OS有能力创建出一个struct file对象(管道文件)。让父子进程实现通信。
因此,管道文件也叫内存文件。
接下里看看这个通信过程。
一般而言,我们的管道只能用来进行单向数据通信!
因此必须要保证一读一写。父进程读,子进程写也没问题。
那为什么要保证一读一些呢,不能都读都写吗?
如果都读都写的话,如果父/子进程读数据的时候,还需要分清这是谁的数据,给管道增加麻烦。
下面看看实操。
下面主要实现匿名管道,子进程写,父进程读。
这里是一个大的框架。
#include<iostream>#include<unistd.h>#include<cassert>//C++中使用C的头文件可以这样写#include<cstdlib>intmain(){//父进程创建管道,打开读写端int pfd[2];int n=pipe(pfd);//成功返回0assert(n ==0);(void)n;//linux默认是release,assert就注释掉了,因此会报没有使用n的警告,这里是消除警告//创建子进程
pid_t fd=fork();assert(fd >=0);if(fd ==0){//关闭子进程读close(pfd[0]);//子进程通信代码//子进程退出exit(0);}//走到这里是父进程close(pfd[1]);//关闭父进程写//父进程通信代码//回收子进程资源return0;}
子进程写
这里想这样处理。把格式化的数据写到缓冲区,然后再由write()写到管道文件里。
snprintf:把格式化的数据放在大小为size的str数组里。
if(fd ==0){//关闭子进程读close(pfd[0]);//子进程通信代码constchar* s="我是子进程,我正在给你发信息";int cnt=0;while(true){
cnt++;char buffer[1024];snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());//写write(pfd[1],buffer,strlen(buffer));sleep(2);//这里是一个细节,z子进程sleep}//子进程退出close(pfd[1]);//子进程关闭写端exit(0);}
父进程读+回收子进程
//走到这里是父进程close(pfd[1]);//关闭父进程写端//父进程通信代码while(true){char buffer[1024];
ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0if(s>0) buffer[s]=0;
cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep }close(pfd[0]);//关闭父进程读端//回收子进程资源
pid_t id=waitpid(fd,nullptr,0);assert(id == fd);
虽然父子进程都有buffer数组,但是我们有写时拷贝,所以没问题。
运行结果是正确的,证明可以通信。
但是注意到我们的细节没有,子进程有sleep,父进程没有sleep。
上面图不明显,我们修改一下父进程的代码
//父进程通信代码while(true){char buffer[1024];
cout<<"AAAAAAAAAAA"<<endl;
ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0
cout<<"BBBBBBBBBBB"<<endl;if(s>0) buffer[s]=0;
cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep}
如果管道中没有了数据,读端在读,默认会直接阻塞当前正在读取的进程。
再看另一种情况。子进程一直再写,父进程一直不读。
//子进程通信代码constchar* s="我是子进程,我正在给你发信息";int cnt=0;while(true){
cnt++;char buffer[1024];snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());//写write(pfd[1],buffer,strlen(buffer));
cout<<"count"<<cnt<<endl;//sleep(10); //这里是一个细节,z子进程sleep}//父进程通信代码while(true){sleep(1000);char buffer[1024];
cout<<"AAAAAAAAAAA"<<endl;
ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0
cout<<"BBBBBBBBBBB"<<endl;if(s>0) buffer[s]=0;
cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep}
管道是固定大小的缓冲区,写端写满的时候,再写会阻塞,等到对方进行读取。
另一种情况,写端写完内容关闭,等到读端读到文件结尾也关闭。
父进程针对这个情况的代码需要修改一下。
if(fd ==0){//关闭子进程读close(pfd[0]);//子进程通信代码constchar* s="我是子进程,我正在给你发信息";int cnt=0;while(true){
cnt++;char buffer[1024];snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());//写write(pfd[1],buffer,strlen(buffer));
cout<<"count: "<<cnt<<endl;//sleep(10); //这里是一个细节,z子进程sleepbreak;}//子进程退出close(pfd[1]);//子进程关闭写端
cout<<"子进程关闭了写端"<<endl;exit(0);}//父进程通信代码while(true){sleep(1000);char buffer[1024];//cout<<"AAAAAAAAAAA"<<endl;
ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0//cout<<"BBBBBBBBBBB"<<endl;if(s>0){
buffer[s]=0;
cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep}elseif(s ==0){//文件读到末尾
cout<<"read :"<<s<<endl;break;}}
最后一种情况,读端关闭了,写端会怎么样呢?
读都关闭了,写什么都没有用了,因此OS会终止写端,OS会给写端发送信号,终止写端。
//回收子进程资源int status=0;
pid_t id=waitpid(fd,&status,0);assert(id == fd);
cout<<"pid->"<<id<<(status&0x7F)<<endl;
上面说了这么读,都是关于读写的。这里总结一下。
读写特征:
- 读慢,写快
- 读快,写慢
- 写关闭,读到0
- 读关闭,写? OS会终止写端。
管道特征:
- 管道的生命周期随进程(管道是基于文件的,进程退出会关闭文件) 2.管道可以用来进行具有血缘关系的进程之间进行通信,常用于父子通信 3.管道是面向字节流的。 4.管道是半双工的,数据只能向一个方向流动(单向通信);需要双方通信时,需要建立起两个管道。 5.内核会对管道操作进行同步与互斥,------对共享资源进行保护的方案。
5.进程池
现在我们需求是写一个进程池,用一个进程随意控制其他进程,让另一个进程来按照我的要求去完成特定的任务。
可以借用匿名管道来实现我们的需求。
具体思想+操作如下:
1.既然有任务,先把具体任务写好,然后把任务上传
#include<iostream>#include<vector>#include<unistd.h>#include<cassert>usingnamespace std;///任务///func_t 是一个函数指针 ,typedef之后这是一个函数指针类型typedefvoid(*func_t)();voidDownloadTask(){
cout<<"下载任务"<<endl;sleep(1);}voidIOTask(){
cout<<"IO任务"<<endl;sleep(1);}voidFFlushTask(){
cout<<"刷新任务"<<endl;sleep(1);}voidLoadTask(vector<func_t>& ff){assert(&ff !=nullptr);
ff.push_back(DownloadTask);
ff.push_back(IOTask);
ff.push_back(FFlushTask);}intmain(){//创建任务对象
vector<func_t> funMap;//上传任务LoadTask(funMap);return0;}
2.创建多个子进程,但是父进程要给子进程发送任务指令,我们要知道给那个子进程发,因此再创建一个vector对象,记录父进程写端fd,子进程fd,为了一会看的清楚父进程给那个子进程发送,再增加一个string对象记录信息。让父子进程有对应关系。
创建vector对象
classsubEP{public:subEP(pid_t subfd,int writefd):_subfd(subfd),_writefd(writefd){char buffer[1024];snprintf(buffer,sizeof buffer,"process->%d[pid(%d)-fd(%d)]",cnt++,_subfd,_writefd);
_name=buffer;}public:staticint cnt;
string _name;
pid_t _subfd;int _writefd;};int subEP::cnt=0;intmain(){//创建任务对象
vector<func_t> funMap;//上传任务LoadTask(funMap);//创建子进程,并维护好父子间通信信道//创建对象
vector<subEP> subs;return0;}
创建多个子进程,并且父子进程关系记录下来。
voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){//Pro_NUM是创建几个子进程for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程
pid_t fd=fork();if(fd ==0){//走到这里是子进程close(pipefd[1]);//关闭写//子进程处理任务}//关闭读close(pipefd[0]);
subEP ss(fd,pipefd[1]);
sub.push_back(ss);}}
父进程控制子进程,负载均衡的向子进程发送命令码,使每个子进程都可能处理任务。
voidSendTask(const subEP& process,int tasknum){
cout<<"send task num: "<<tasknum<<"send to: "<<process._name<<endl;//父进程发的是4字节的任务码
ssize_t n=write(process._writefd,&tasknum,sizeof tasknum);assert(n ==sizeof(int));(void)n;}voidloadBlanceContrl(vector<subEP>& sub,vector<func_t> func){int processnum=sub.size();int funcnum=func.size();//让每个子进程都可能被选,因此加一个随机数while(true){//选某个子进程int subidx=rand()%processnum;//选某个任务int taskidx=rand()%funcnum;//发送任务SendTask(sub[subidx],taskidx);}//走到这里关闭写write quit read-->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}}
子进程处理任务
intrecTask(int readFd){int code=0;
ssize_t n=read(readFd,&code,sizeof code);assert(n ==sizeof(int));(void)n;if(n ==4)return code;elseif(n <=0)return-1;elsereturn0;}voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程
pid_t fd=fork();if(fd ==0){//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
subEP ss(fd,pipefd[1]);
sub.push_back(ss);}}
回收子进程资源
voidwaitprocess(vector<subEP> sub){for(int i=0;i<sub.size();++i){waitpid(sub[i]._subfd,nullptr,0);
cout <<"wait sub process success ...: "<< sub[i]._subfd << endl;}}
看运行结果和我们预期的一样。但是通信一直持续。
这里我想给通信过程添加一个次数。所以在修改一下代码。
如果是0就一直通信。
voidloadBlanceContrl(vector<subEP>& sub,vector<func_t>& func,int& count){int processnum=sub.size();int funcnum=func.size();//控制通信次数bool flage=(count ==0?true:false);//让每个子进程都可能被选,因此加一个随机数while(true){//选某个子进程int subidx=rand()%processnum;//选某个任务int taskidx=rand()%funcnum;//发送任务SendTask(sub[subidx],taskidx);sleep(2);if(!flage){
count--;if(count ==0)break;}}//走到这里关闭写 write quit,read->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}}
虽然结果都能符合我们的预期,但是我们在创建子进程的时候有bug。虽然没有报错。
voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程
pid_t fd=fork();if(fd ==0){//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
subEP ss(fd,pipefd[1]);
sub.push_back(ss);}}
代码看着没错,我们画图分析一下。
我们的父子进程就不是一一对应的关系了。
再关闭文件的时候虽然代码是从上往下关的。
//走到这里关闭写 write quit,read->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}
但是真实情况是从下到上关的。
这里解决方法是每次在创建子进程的时候,手动关闭子进程拷贝过来的上一层写端。
voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程要删除上一层写端的对象
vector<int> deletefd;//创建子进程
pid_t fd=fork();if(fd ==0){for(int i=0;i<deletefd.size();++i){close(deletefd[i]);}//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
subEP ss(fd,pipefd[1]);
sub.push_back(ss);
deletefd.push_back(pipefd[1]);}}
希望这一小段代码能够更加深对匿名管道的理解。
5.1进程池完整代码
#include<iostream>#include<vector>#include<unistd.h>#include<cassert>#include<cstdlib>#include<ctime>#include<sys/types.h>#include<sys/wait.h>#definePro_NUM5usingnamespace std;///任务///func_t 是一个函数指针 ,typedef之后这是一个函数指针类型typedefvoid(*func_t)();voidDownloadTask(){
cout<<"下载任务"<<endl;sleep(1);}voidIOTask(){
cout<<"IO任务"<<endl;sleep(1);}voidFFlushTask(){
cout<<"刷新任务"<<endl;sleep(1);}voidLoadTask(vector<func_t>& ff){assert(&ff !=nullptr);
ff.push_back(DownloadTask);
ff.push_back(IOTask);
ff.push_back(FFlushTask);}///下面是一个多进程///classsubEP{public:subEP(pid_t subfd,int writefd):_subfd(subfd),_writefd(writefd){char buffer[1024];snprintf(buffer,sizeof buffer,"process->%d[pid(%d)-fd(%d)]",cnt++,_subfd,_writefd);
_name=buffer;}public:staticint cnt;
string _name;
pid_t _subfd;int _writefd;};int subEP::cnt=0;intrecTask(int readFd){int code=0;
ssize_t n=read(readFd,&code,sizeof code);assert(n ==sizeof(int));(void)n;if(n ==4)return code;elseif(n <=0)return-1;elsereturn0;}voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;
vector<int> deletefd;//创建子进程
pid_t fd=fork();if(fd ==0){for(int i=0;i<deletefd.size();++i){close(deletefd[i]);}//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
subEP ss(fd,pipefd[1]);
sub.push_back(ss);
deletefd.push_back(pipefd[1]);}}voidSendTask(const subEP& process,int tasknum){
cout<<"send task num: "<<tasknum<<"send to: "<<process._name<<endl;//父进程发的是4字节的任务码
ssize_t n=write(process._writefd,&tasknum,sizeof tasknum);assert(n ==sizeof(int));(void)n;}voidloadBlanceContrl(vector<subEP>& sub,vector<func_t>& func,int& count){int processnum=sub.size();int funcnum=func.size();bool flage=(count ==0?true:false);//让每个子进程都可能被选,因此加一个随机数while(true){//选某个子进程int subidx=rand()%processnum;//选某个任务int taskidx=rand()%funcnum;//发送任务SendTask(sub[subidx],taskidx);sleep(2);if(!flage){
count--;if(count ==0)break;}}//走到这里关闭写 write quit,read->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}}voidwaitprocess(vector<subEP> sub){for(int i=0;i<sub.size();++i){waitpid(sub[i]._subfd,nullptr,0);
cout <<"wait sub process success ...: "<< sub[i]._subfd << endl;}}intmain(){//生成随机数种子srand((unsignedint)time(nullptr));//创建任务对象
vector<func_t> funMap;//上传任务LoadTask(funMap);//创建子进程,并维护好父子间通信信道
vector<subEP> subs;CreateSubProcess(subs,funMap);//走到这里是父进程,控制子进程,负载均衡的向子进程发送命令码int count=5;loadBlanceContrl(subs,funMap,count);//回收子进程资源waitprocess(subs);return0;}
6.命名管道
有了匿名管道的基础,命名管道学起来非常简单。
命名管道----->毫不相干的进程进行通信。
现在见识一下命令行式的管道。
mkfifo name_pipe //创建命名管道
p代表管道文件。
左右属于不同的进程(没有血缘关系),成功让他们进行了通信。
问:命名管道如何做到让不同进程看到同一份资源呢?
可以让不同进程打开指定名称(路径+文件名)的同一个文件。
文件唯一性=路径+文件名
接下来看看实操
创建管道
#include<iostream>#include<string>#include<sys/types.h>#include<sys/stat.h>#include<fcntl.h>#include<cassert>#include<cstring>#include<unistd.h>#include<cstring>#defineNAME_PIPE"mypipe.txt"usingnamespace std;//创建管道boolCreatepipe(const string& p){//这里我们想要创建出的文件的权限就是0666,umask(0);int n=mkfifo(p.c_str(),0666);if(n ==0){returntrue;}else{
cout<<"errno: "<<errno<<"err string: "<<strerror(errno)<<endl;returnfalse;}}
删除管道
//删除管道voidRemovePipe(const string& p){int n=unlink(p.c_str());assert(n ==0);(void)n;}
写端
#include"comm.hpp"intmain(){bool flage=Createpipe(NAME_PIPE);assert(flage);(void)flage;int wfd=open(NAME_PIPE,O_WRONLY|O_TRUNC);if(wfd<0)exit(1);char buffer[1024];while(true){
cout<<"Play Say#";fgets(buffer,sizeof buffer,stdin);if(strlen(buffer)>0)//假设输入abcd后面肯定跟\n--->abcd\n,想把\n去掉
buffer[strlen(buffer)-1]=0;
ssize_t n=write(wfd,buffer,strlen(buffer));assert(n ==strlen(buffer));(void)0;}close(wfd);RemovePipe(NAME_PIPE);return0;}
读端
#include"comm.hpp"intmain(){int rfd=open(NAME_PIPE,O_RDONLY);if(rfd <0)exit(1);char buffer[1024];while(true){
ssize_t n=read(rfd,buffer,sizeof(buffer)-1);if(n>0){
buffer[n]=0;
cout<<"send->receive# :"<<buffer<<endl;}elseif(n ==0){
cout<<"send quit , Me too"<<endl;break;;}else{
cout<<"error string: "<<strerror(errno)<<endl;break;}}close(rfd);return0;}
这里有一个细节的地方。
写端open之后没有往后运行
等到读端open才会往后运行。
两个进程(或写端/读端)都必须同时打开文件,此时才能往后继续进行。
版权归原作者 LuckyRich1 所有, 如有侵权,请联系我们删除。