0


【linux】进程间通信——管道

进程间通信——管道

自我名言只有努力,才能追逐梦想,只有努力,才不会欺骗自己。在这里插入图片描述
喜欢的点赞,收藏,关注一下把!在这里插入图片描述

1.什么是通信

  1. 数据传输:一个进程需要将它的数据发送给另一个进程
  2. 资源共享:多个进程之间共享同样的资源。
  3. 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  4. 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

2.为什么要通信

我们需要多进程协同完成某种业务内容。
如:

cat file | grep 'hello'

从打印的出的file文件内容里检索hello。

3.如何实现通信

1.两套标准:

System V IPC(聚焦在本地通信)(过时了)
System V 共享内存(重点学这个)
System V 消息队列
System V 信号量

POSIX IPC(让通信过程可以跨主机)
消息队列
共享内存
信号量
互斥量
条件变量
读写锁

2.管道:

管道是一种基于文件系统,实现的通信。

a.匿名管道
b.命名管道

我们知道进程是具有独立性的,今天我们需要通信---------通信代价一定不低!

在通信之前,如何理解通信的本质问题

就如电影中无间道那样,警察派卧底。等卧底在某个地方传信息回来,这个地方能是警察局吗,能是黑社会的地方吗,肯定不能。两个进程也是这样。那这个地方是谁提供的。

1.OS需要直接或间接给通信双方提供"内存空间"。
2.要通信的进程,必要看到一份公共的资源。

不同的通信种类,本质上就是:上面所说的资源,是OS中哪一个模块提供的。

如何通信:
1.需要先让不同的进程看到同一份资源
2.通信。

4.匿名管道

匿名管道:目前能用来让父子进程之间进行进程间通信。

在这里插入图片描述

父子进程都看到了同一份内核资源。并且是文件系统提供的。

因此这个文件也叫做管道文件!

管道文件也是文件。可能有人会这样觉得普通文件在磁盘里面,打开时OS也会创建struct file{},那两个进程实现通信,直接让它们在磁盘中进行不也可以吗?

没错,这样实现进程通信也可以,但是访问磁盘速度太慢了。

通信得目的是把一个进程数据交给另一个进程是从内存到内存的。目的并不是把数据写到磁盘。
至于磁盘上是否真正存在这个文件也不在乎了。OS有能力创建出一个struct file对象(管道文件)。让父子进程实现通信。

因此,管道文件也叫内存文件。

接下里看看这个通信过程。
在这里插入图片描述
一般而言,我们的管道只能用来进行单向数据通信!
因此必须要保证一读一写。父进程读,子进程写也没问题。

那为什么要保证一读一些呢,不能都读都写吗?
如果都读都写的话,如果父/子进程读数据的时候,还需要分清这是谁的数据,给管道增加麻烦。

下面看看实操。

在这里插入图片描述
下面主要实现匿名管道,子进程写,父进程读。

这里是一个大的框架。

#include<iostream>#include<unistd.h>#include<cassert>//C++中使用C的头文件可以这样写#include<cstdlib>intmain(){//父进程创建管道,打开读写端int pfd[2];int n=pipe(pfd);//成功返回0assert(n ==0);(void)n;//linux默认是release,assert就注释掉了,因此会报没有使用n的警告,这里是消除警告//创建子进程
    pid_t fd=fork();assert(fd >=0);if(fd ==0){//关闭子进程读close(pfd[0]);//子进程通信代码//子进程退出exit(0);}//走到这里是父进程close(pfd[1]);//关闭父进程写//父进程通信代码//回收子进程资源return0;}

子进程写

这里想这样处理。把格式化的数据写到缓冲区,然后再由write()写到管道文件里。
在这里插入图片描述
snprintf:把格式化的数据放在大小为size的str数组里。

if(fd ==0){//关闭子进程读close(pfd[0]);//子进程通信代码constchar* s="我是子进程,我正在给你发信息";int cnt=0;while(true){
            cnt++;char buffer[1024];snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());//写write(pfd[1],buffer,strlen(buffer));sleep(2);//这里是一个细节,z子进程sleep}//子进程退出close(pfd[1]);//子进程关闭写端exit(0);}

父进程读+回收子进程

//走到这里是父进程close(pfd[1]);//关闭父进程写端//父进程通信代码while(true){char buffer[1024];
        ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0if(s>0) buffer[s]=0;
        cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep     }close(pfd[0]);//关闭父进程读端//回收子进程资源
    pid_t id=waitpid(fd,nullptr,0);assert(id == fd);

虽然父子进程都有buffer数组,但是我们有写时拷贝,所以没问题。

在这里插入图片描述
运行结果是正确的,证明可以通信。

但是注意到我们的细节没有,子进程有sleep,父进程没有sleep。

上面图不明显,我们修改一下父进程的代码

//父进程通信代码while(true){char buffer[1024];
        cout<<"AAAAAAAAAAA"<<endl;
        ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0
        cout<<"BBBBBBBBBBB"<<endl;if(s>0) buffer[s]=0;
        cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep}

在这里插入图片描述
如果管道中没有了数据,读端在读,默认会直接阻塞当前正在读取的进程。

再看另一种情况。子进程一直再写,父进程一直不读。

//子进程通信代码constchar* s="我是子进程,我正在给你发信息";int cnt=0;while(true){
            cnt++;char buffer[1024];snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());//写write(pfd[1],buffer,strlen(buffer));
            cout<<"count"<<cnt<<endl;//sleep(10); //这里是一个细节,z子进程sleep}//父进程通信代码while(true){sleep(1000);char buffer[1024];
        cout<<"AAAAAAAAAAA"<<endl;
        ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0
        cout<<"BBBBBBBBBBB"<<endl;if(s>0) buffer[s]=0;
        cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep}

在这里插入图片描述

管道是固定大小的缓冲区,写端写满的时候,再写会阻塞,等到对方进行读取。

另一种情况,写端写完内容关闭,等到读端读到文件结尾也关闭
父进程针对这个情况的代码需要修改一下。

if(fd ==0){//关闭子进程读close(pfd[0]);//子进程通信代码constchar* s="我是子进程,我正在给你发信息";int cnt=0;while(true){
            cnt++;char buffer[1024];snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());//写write(pfd[1],buffer,strlen(buffer));
            cout<<"count: "<<cnt<<endl;//sleep(10); //这里是一个细节,z子进程sleepbreak;}//子进程退出close(pfd[1]);//子进程关闭写端
        cout<<"子进程关闭了写端"<<endl;exit(0);}//父进程通信代码while(true){sleep(1000);char buffer[1024];//cout<<"AAAAAAAAAAA"<<endl;
        ssize_t s=read(pfd[0],buffer,sizeof(buffer)-1);//我们期望读一个字符串,因此保留一个位置放/0//cout<<"BBBBBBBBBBB"<<endl;if(s>0){
            buffer[s]=0;
            cout <<"Get Message# "<< buffer <<" | my pid: "<<getpid()<< endl;//细节父进程可没有sleep}elseif(s ==0){//文件读到末尾
            cout<<"read :"<<s<<endl;break;}}

在这里插入图片描述

最后一种情况,读端关闭了,写端会怎么样呢?

读都关闭了,写什么都没有用了,因此OS会终止写端,OS会给写端发送信号,终止写端。

//回收子进程资源int status=0;
    pid_t id=waitpid(fd,&status,0);assert(id == fd);
    cout<<"pid->"<<id<<(status&0x7F)<<endl;

在这里插入图片描述
上面说了这么读,都是关于读写的。这里总结一下。

读写特征:

  1. 读慢,写快
  2. 读快,写慢
  3. 写关闭,读到0
  4. 读关闭,写? OS会终止写端。

管道特征:

  1. 管道的生命周期随进程(管道是基于文件的,进程退出会关闭文件) 2.管道可以用来进行具有血缘关系的进程之间进行通信,常用于父子通信 3.管道是面向字节流的。 4.管道是半双工的,数据只能向一个方向流动(单向通信);需要双方通信时,需要建立起两个管道。 5.内核会对管道操作进行同步与互斥,------对共享资源进行保护的方案。

5.进程池

现在我们需求是写一个进程池,用一个进程随意控制其他进程,让另一个进程来按照我的要求去完成特定的任务。

可以借用匿名管道来实现我们的需求。
在这里插入图片描述
具体思想+操作如下:

1.既然有任务,先把具体任务写好,然后把任务上传

#include<iostream>#include<vector>#include<unistd.h>#include<cassert>usingnamespace std;///任务///func_t 是一个函数指针 ,typedef之后这是一个函数指针类型typedefvoid(*func_t)();voidDownloadTask(){
    cout<<"下载任务"<<endl;sleep(1);}voidIOTask(){
    cout<<"IO任务"<<endl;sleep(1);}voidFFlushTask(){
    cout<<"刷新任务"<<endl;sleep(1);}voidLoadTask(vector<func_t>& ff){assert(&ff !=nullptr);
    ff.push_back(DownloadTask);
    ff.push_back(IOTask);
    ff.push_back(FFlushTask);}intmain(){//创建任务对象
    vector<func_t> funMap;//上传任务LoadTask(funMap);return0;}

2.创建多个子进程,但是父进程要给子进程发送任务指令,我们要知道给那个子进程发,因此再创建一个vector对象,记录父进程写端fd,子进程fd,为了一会看的清楚父进程给那个子进程发送,再增加一个string对象记录信息。让父子进程有对应关系。

创建vector对象

classsubEP{public:subEP(pid_t subfd,int writefd):_subfd(subfd),_writefd(writefd){char buffer[1024];snprintf(buffer,sizeof buffer,"process->%d[pid(%d)-fd(%d)]",cnt++,_subfd,_writefd);
        _name=buffer;}public:staticint cnt;
    string _name;
    pid_t _subfd;int _writefd;};int subEP::cnt=0;intmain(){//创建任务对象
    vector<func_t> funMap;//上传任务LoadTask(funMap);//创建子进程,并维护好父子间通信信道//创建对象
    vector<subEP> subs;return0;}

创建多个子进程,并且父子进程关系记录下来。

voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){//Pro_NUM是创建几个子进程for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程
        pid_t fd=fork();if(fd ==0){//走到这里是子进程close(pipefd[1]);//关闭写//子进程处理任务}//关闭读close(pipefd[0]);
        subEP ss(fd,pipefd[1]);
        sub.push_back(ss);}}

父进程控制子进程,负载均衡的向子进程发送命令码,使每个子进程都可能处理任务。

voidSendTask(const subEP& process,int tasknum){
    cout<<"send task num: "<<tasknum<<"send to: "<<process._name<<endl;//父进程发的是4字节的任务码
    ssize_t n=write(process._writefd,&tasknum,sizeof tasknum);assert(n ==sizeof(int));(void)n;}voidloadBlanceContrl(vector<subEP>& sub,vector<func_t> func){int processnum=sub.size();int funcnum=func.size();//让每个子进程都可能被选,因此加一个随机数while(true){//选某个子进程int subidx=rand()%processnum;//选某个任务int taskidx=rand()%funcnum;//发送任务SendTask(sub[subidx],taskidx);}//走到这里关闭写write quit  read-->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}}

子进程处理任务

intrecTask(int readFd){int code=0;
    ssize_t n=read(readFd,&code,sizeof code);assert(n ==sizeof(int));(void)n;if(n ==4)return code;elseif(n <=0)return-1;elsereturn0;}voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程
        pid_t fd=fork();if(fd ==0){//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
                    func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
        subEP ss(fd,pipefd[1]);
        sub.push_back(ss);}}

回收子进程资源

voidwaitprocess(vector<subEP> sub){for(int i=0;i<sub.size();++i){waitpid(sub[i]._subfd,nullptr,0);
        cout <<"wait sub process success ...: "<< sub[i]._subfd << endl;}}

在这里插入图片描述
看运行结果和我们预期的一样。但是通信一直持续。

这里我想给通信过程添加一个次数。所以在修改一下代码。
如果是0就一直通信。

voidloadBlanceContrl(vector<subEP>& sub,vector<func_t>& func,int& count){int processnum=sub.size();int funcnum=func.size();//控制通信次数bool flage=(count ==0?true:false);//让每个子进程都可能被选,因此加一个随机数while(true){//选某个子进程int subidx=rand()%processnum;//选某个任务int taskidx=rand()%funcnum;//发送任务SendTask(sub[subidx],taskidx);sleep(2);if(!flage){
            count--;if(count ==0)break;}}//走到这里关闭写 write quit,read->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}}

在这里插入图片描述

虽然结果都能符合我们的预期,但是我们在创建子进程的时候有bug。虽然没有报错。

voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程
        pid_t fd=fork();if(fd ==0){//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
                    func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
        subEP ss(fd,pipefd[1]);
        sub.push_back(ss);}}

代码看着没错,我们画图分析一下。

在这里插入图片描述
我们的父子进程就不是一一对应的关系了。
再关闭文件的时候虽然代码是从上往下关的。

//走到这里关闭写 write quit,read->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}

但是真实情况是从下到上关的。

这里解决方法是每次在创建子进程的时候,手动关闭子进程拷贝过来的上一层写端。

voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;//创建子进程要删除上一层写端的对象
        vector<int> deletefd;//创建子进程
        pid_t fd=fork();if(fd ==0){for(int i=0;i<deletefd.size();++i){close(deletefd[i]);}//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
                    func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
        subEP ss(fd,pipefd[1]);
        sub.push_back(ss);
        deletefd.push_back(pipefd[1]);}}

希望这一小段代码能够更加深对匿名管道的理解。

5.1进程池完整代码

#include<iostream>#include<vector>#include<unistd.h>#include<cassert>#include<cstdlib>#include<ctime>#include<sys/types.h>#include<sys/wait.h>#definePro_NUM5usingnamespace std;///任务///func_t 是一个函数指针 ,typedef之后这是一个函数指针类型typedefvoid(*func_t)();voidDownloadTask(){
    cout<<"下载任务"<<endl;sleep(1);}voidIOTask(){
    cout<<"IO任务"<<endl;sleep(1);}voidFFlushTask(){
    cout<<"刷新任务"<<endl;sleep(1);}voidLoadTask(vector<func_t>& ff){assert(&ff !=nullptr);
    ff.push_back(DownloadTask);
    ff.push_back(IOTask);
    ff.push_back(FFlushTask);}///下面是一个多进程///classsubEP{public:subEP(pid_t subfd,int writefd):_subfd(subfd),_writefd(writefd){char buffer[1024];snprintf(buffer,sizeof buffer,"process->%d[pid(%d)-fd(%d)]",cnt++,_subfd,_writefd);
        _name=buffer;}public:staticint cnt;
    string _name;
    pid_t _subfd;int _writefd;};int subEP::cnt=0;intrecTask(int readFd){int code=0;
    ssize_t n=read(readFd,&code,sizeof code);assert(n ==sizeof(int));(void)n;if(n ==4)return code;elseif(n <=0)return-1;elsereturn0;}voidCreateSubProcess(vector<subEP>& sub,vector<func_t> func){for(int i=0;i<Pro_NUM;++i){//创建管道int pipefd[2];int n=pipe(pipefd);assert(n ==0);(void)n;
        vector<int> deletefd;//创建子进程
        pid_t fd=fork();if(fd ==0){for(int i=0;i<deletefd.size();++i){close(deletefd[i]);}//子进程close(pipefd[1]);//关闭写//子进程处理任务while(true){int Commande=recTask(pipefd[0]);if(Commande >=0&& Commande < func.size())
                    func[Commande]();elseif(Commande ==-1)//读到文件结尾break;}exit(0);}//关闭读close(pipefd[0]);
        subEP ss(fd,pipefd[1]);
        sub.push_back(ss);
        deletefd.push_back(pipefd[1]);}}voidSendTask(const subEP& process,int tasknum){
    cout<<"send task num: "<<tasknum<<"send to: "<<process._name<<endl;//父进程发的是4字节的任务码
    ssize_t n=write(process._writefd,&tasknum,sizeof tasknum);assert(n ==sizeof(int));(void)n;}voidloadBlanceContrl(vector<subEP>& sub,vector<func_t>& func,int& count){int processnum=sub.size();int funcnum=func.size();bool flage=(count ==0?true:false);//让每个子进程都可能被选,因此加一个随机数while(true){//选某个子进程int subidx=rand()%processnum;//选某个任务int taskidx=rand()%funcnum;//发送任务SendTask(sub[subidx],taskidx);sleep(2);if(!flage){
            count--;if(count ==0)break;}}//走到这里关闭写 write quit,read->0for(int i=0;i<processnum;++i){close(sub[i]._writefd);}}voidwaitprocess(vector<subEP> sub){for(int i=0;i<sub.size();++i){waitpid(sub[i]._subfd,nullptr,0);
        cout <<"wait sub process success ...: "<< sub[i]._subfd << endl;}}intmain(){//生成随机数种子srand((unsignedint)time(nullptr));//创建任务对象
    vector<func_t> funMap;//上传任务LoadTask(funMap);//创建子进程,并维护好父子间通信信道
    vector<subEP> subs;CreateSubProcess(subs,funMap);//走到这里是父进程,控制子进程,负载均衡的向子进程发送命令码int count=5;loadBlanceContrl(subs,funMap,count);//回收子进程资源waitprocess(subs);return0;}

6.命名管道

有了匿名管道的基础,命名管道学起来非常简单。

命名管道----->毫不相干的进程进行通信。

现在见识一下命令行式的管道。

在这里插入图片描述

mkfifo name_pipe    //创建命名管道

在这里插入图片描述
p代表管道文件。

在这里插入图片描述

左右属于不同的进程(没有血缘关系),成功让他们进行了通信。

在这里插入图片描述
问:命名管道如何做到让不同进程看到同一份资源呢?

可以让不同进程打开指定名称(路径+文件名)的同一个文件。
文件唯一性=路径+文件名

接下来看看实操

在这里插入图片描述

创建管道

#include<iostream>#include<string>#include<sys/types.h>#include<sys/stat.h>#include<fcntl.h>#include<cassert>#include<cstring>#include<unistd.h>#include<cstring>#defineNAME_PIPE"mypipe.txt"usingnamespace std;//创建管道boolCreatepipe(const string& p){//这里我们想要创建出的文件的权限就是0666,umask(0);int n=mkfifo(p.c_str(),0666);if(n ==0){returntrue;}else{
        cout<<"errno: "<<errno<<"err string: "<<strerror(errno)<<endl;returnfalse;}}

删除管道

在这里插入图片描述

//删除管道voidRemovePipe(const string& p){int n=unlink(p.c_str());assert(n ==0);(void)n;}

写端

#include"comm.hpp"intmain(){bool flage=Createpipe(NAME_PIPE);assert(flage);(void)flage;int wfd=open(NAME_PIPE,O_WRONLY|O_TRUNC);if(wfd<0)exit(1);char buffer[1024];while(true){
        cout<<"Play Say#";fgets(buffer,sizeof buffer,stdin);if(strlen(buffer)>0)//假设输入abcd后面肯定跟\n--->abcd\n,想把\n去掉
            buffer[strlen(buffer)-1]=0;
        ssize_t n=write(wfd,buffer,strlen(buffer));assert(n ==strlen(buffer));(void)0;}close(wfd);RemovePipe(NAME_PIPE);return0;}

读端

#include"comm.hpp"intmain(){int rfd=open(NAME_PIPE,O_RDONLY);if(rfd <0)exit(1);char buffer[1024];while(true){
        ssize_t n=read(rfd,buffer,sizeof(buffer)-1);if(n>0){
            buffer[n]=0;
            cout<<"send->receive# :"<<buffer<<endl;}elseif(n ==0){
            cout<<"send quit , Me too"<<endl;break;;}else{
            cout<<"error string: "<<strerror(errno)<<endl;break;}}close(rfd);return0;}

在这里插入图片描述

这里有一个细节的地方。

写端open之后没有往后运行

在这里插入图片描述
等到读端open才会往后运行。

在这里插入图片描述
两个进程(或写端/读端)都必须同时打开文件,此时才能往后继续进行。

标签: linux 服务器 java

本文转载自: https://blog.csdn.net/fight_p/article/details/134333333
版权归原作者 LuckyRich1 所有, 如有侵权,请联系我们删除。

“【linux】进程间通信——管道”的评论:

还没有评论