0


进程间通信(上)

进程间通信(上)

背景

1、进程是具有独立性的,所以进程间想要交互数据,成本会非常高

2、为什么要进行进程间通信?有的时候需要多进程协同处理一件事情。

进程间通信目的

  • 数据传输:一个进程需要将它的数据发送给另一个进程
  • 资源共享:多个进程之间共享同样的资源。
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
  • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。

进程间通信发展

  • 管道
  • System V进程间通信
  • POSIX进程间通信

进程间通信分类

管道

  • 匿名管道
  • pipe 命名管道

System V IPC(用的不多,更多的是进行本地通信)

  • System V 消息队列
  • System V 共享内存
  • System V 信号量

POSIX IPC(用的较多,也可以用来进行网络通信)

  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁

注意:System V IPC和POSIX IPC是两套标准(IPC是通信的简称)。

管道

什么是管道

  • 管道是Unix中最古老的进程间通信的形式。
  • 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”

image-20221031150427050

匿名管道

image-20221031170631490

里面封装了两次open,第一次是以读方式打开,返回值写在fd[0]中,也就是打开文件的fd,第二次是以写方式打开,返回值写在fd[1]中,也就是打开文件的fd。同时通过上面的联合体标定它是一个管道文件。

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码-1

image-20221031155504450

实例代码

简单的匿名管道实现

#include<iostream>#include<cstdio>#include<fcntl.h>#include<unistd.h>#include<sys/types.h>#include<sys/stat.h>#include<string>#include<cstring>#include<sys/wait.h>usingnamespace std;intmain(){//1.创建管道int pipefd[2]={0};if(pipe(pipefd)!=0){
    cerr <<"pipe error"<< endl;return1;}//2.创建子进程
  pid_t id =fork();if(id <0){
    cerr <<"fork error"<< endl;return2;}elseif(id ==0)//子进程{//子进程来读取,关闭写端close(pipefd[1]);#defineMAX_NUM1024char buffer[MAX_NUM];while(true){memset(buffer,0,sizeof(buffer));
      ssize_t ret =read(pipefd[0], buffer,sizeof(buffer)-1);if(ret >0){//读取成功,可以进行写入
        buffer[ret]='\0';
        cout <<"子进程受到消息了,消息内容:"<< buffer << endl;}elseif(ret ==0){sleep(1);//此处是为了稍微等一下父进程
        cout <<"父进程写完了,我也退出了!"<< endl;break;}else{//Do nothing}}close(pipefd[0]);exit(0);}else//父进程{//父进程来写入,关闭读端close(pipefd[0]);const string msg ="你好子进程,我是父进程!这次发送的信息编号是: ";int cnt =0;while(cnt <5){char sendBuffer[1024];sprintf(sendBuffer,"%s:%d", msg.c_str(), cnt);write(pipefd[1], sendBuffer,strlen(sendBuffer));sleep(1);//为了看现象明显设计的
      cnt++;}close(pipefd[1]);
    cout <<"父进程写完了"<< endl;}
  pid_t res =waitpid(id,nullptr,0);if(res >0){
    cout <<"等待子进程成功!"<< endl;}return0;}

问:父进程关闭写端了,子进程是如何知道父进程关闭写端的?

答:通过引用计数知道的,file结构体中,有类似引用的变量记录了有几个指针指向该文件。当引用计数为1了,说明此时就只有一个进程指向该文件了。此时子进程读完就不再有进程指向该文件了。

问:父进程每隔一秒写一次,为什么子进程也是一秒读一次呢?

答:当父进程在写入数据的时候,子进程在等(阻塞等待:将当前进程放在等待队列中(管道资源的等待队列中))!所以,父进程写入之后,子进程才能read(会返回)到数据,子进程打印读取数据要以父进程的节奏为主。所以父进程和子进程在读写的时候,是有一定的顺序性的(pipe内部自带访问控制(同步和互斥机制))。(父子进程在各自printf的时候(向显示器写入文件),并没有顺序,谁快谁先写,缺乏访问控制)。

管道内部,没有数据,reader就必须阻塞等待(等管道有数据);管道内部如果被写满了,writer就必须阻塞等待(等数据被读走)。

一个父进程控制单个子进程完成指定任务

代码:

#include<iostream>#include<cstdio>#include<sys/types.h>#include<sys/wait.h>#include<unistd.h>#include<ctime>#include<string>#include<vector>#include<unordered_map>#include<cassert>#include<cstdlib>usingnamespace std;typedefvoid(*functor)();
vector<functor> functors;//方法集合//for debug
unordered_map<uint32_t, string> info;voidf1(){
  cout <<"这是一个处理日志的任务, 执行的进程id:"<<getpid()<<"执行时间是"<<time(nullptr)<< endl;;}voidf2(){
  cout <<"这是一个备份数据的任务, 执行的进程id:"<<getpid()<<"执行时间是"<<time(nullptr)<< endl;;}voidf3(){
  cout <<"这是一个处理网络请求的任务, 执行的进程id:"<<getpid()<<"执行时间是"<<time(nullptr)<< endl;;}voidloadFunctor(){
  info.insert({functors.size(),"处理日志的任务"});
  functors.push_back(f1);
  info.insert({functors.size(),"处理备份数据的任务"});
  functors.push_back(f2);
  info.insert({functors.size(),"处理网络请求的任务"});
  functors.push_back(f3);}intmain(){//0.加载任务列表loadFunctor();//1.创建管道int pipefd[2]={0};if(pipe(pipefd)!=0){
    cerr <<"pipe error"<< endl;return1;}//2.创建子进程
  pid_t id =fork();if(id <0){
    cerr <<"fork error"<< endl;return2;}elseif(id ==0){//3.关闭不需要的文件close(pipefd[1]);//child:read//4.业务处理while(true){uint32_t operatorType =0;//如果有数据就读取,如果没有数据就阻塞等待,等待任务的到来
      ssize_t s =read(pipefd[0],&operatorType,sizeof(uint32_t));if(s ==0){
        cout <<"读取数据结束!退出!"<< endl;break;}assert(s ==sizeof(uint32_t));//assert是断言,是编译有效debug模式//release模式,断言就没有了//如果断言没有了,那么s变量就是只被定义,没有被使用,在release模式中,就会有warning存在(void)s;if(operatorType < functors.size()){
        functors[operatorType]();}else{
        cerr <<"bug?operatorType = "<< operatorType << endl;}}close(pipefd[0]);exit(0);}elseif(id >0){srand((longlong)time(nullptr));//3.关闭不需要的文件close(pipefd[0]);//parant:write//4.业务生成int num = functors.size();int cnt =10;while(cnt--){//5.形成任务码uint32_t commandCode =rand()% num;
      cout <<"父进程指派任务完成,任务是:"<< info[commandCode]<<", 任务的编号是:"<< cnt << endl;//向指定的进程下达执行任务的操作write(pipefd[1],&commandCode,sizeof(uint32_t));sleep(1);}close(pipefd[1]);
    pid_t res =waitpid(id,nullptr,0);if(res){
      cout <<"wait success"<< endl;}}return0;}

父进程控制一批子进程完成任务(进程池)

代码:

#include<iostream>#include<cstdio>#include<sys/types.h>#include<sys/wait.h>#include<unistd.h>#include<ctime>#include<string>#include<vector>#include<unordered_map>#include<cassert>#include<cstdlib>usingnamespace std;typedefvoid(*functor)();
vector<functor> functors;//方法集合//for debug
unordered_map<uint32_t, string> info;voidf1(){
  cout <<"这是一个处理日志的任务, 执行的进程id:"<<getpid()<<"执行时间是"<<time(nullptr)<< endl << endl;;}voidf2(){
  cout <<"这是一个备份数据的任务, 执行的进程id:"<<getpid()<<"执行时间是"<<time(nullptr)<< endl <<  endl;;}voidf3(){
  cout <<"这是一个处理网络请求的任务, 执行的进程id:"<<getpid()<<"执行时间是"<<time(nullptr)<< endl << endl;;}voidloadFunctor(){
  info.insert({functors.size(),"处理日志的任务"});
  functors.push_back(f1);
  info.insert({functors.size(),"处理备份数据的任务"});
  functors.push_back(f2);
  info.insert({functors.size(),"处理网络请求的任务"});
  functors.push_back(f3);}//int32_t:进程pid  int32_t:该进程对应的管道写端fdtypedef pair<int32_t,int32_t> elem;int processNum =5;//创建子进程的个数voidwork(int blockFd){ 
  cout <<"进程:"<<getpid()<<"开始工作"<< endl;//子进程核心工作的代码while(true){//a.阻塞等待 b.获取任务信息uint32_t operatorCode =0;
    ssize_t s =read(blockFd,&operatorCode,sizeof(uint32_t));if(s ==0){break;}assert(s ==sizeof(uint32_t));(void)s;//c.处理任务if(operatorCode < functors.size()){
      functors[operatorCode]();}}
  cout <<"进程:"<<getpid()<<"结束工作"<< endl;}//[子进程的pid, 子进程的管道fd]voidBalanceSendTask(vector<elem>& processFds){srand((longlong)time(nullptr));int cnt =10;//cnt是要分配任务的数目while(cnt !=0){sleep(1);//选择一个进程int pick =rand()% processFds.size();//选择一个任务int task =rand()% functors.size();//把任务给一个指定的进程write(processFds[pick].second,&task,sizeof(int));//打印对应的提示信息
    cout <<"父进程指派任务"<< info[task]<<"给进程:"<< processFds[pick].first <<"编号:"<< pick << endl;
    cnt--;}}intmain(){loadFunctor();
  vector<elem> assignMap;//创建processNum个进程for(int i =0; i < processNum; i++){//定义保存管道fd的对象int pipefd[2]={0};//创建管道pipe(pipefd);//创建子进程
    pid_t id =fork();if(id ==0){//子进程读取close(pipefd[1]);//子进程执行work(pipefd[0]);close(pipefd[0]);exit(0);}//父进程做的事情   close(pipefd[0]);
    elem e(id, pipefd[1]);
    assignMap.push_back(e);}
  cout <<"creat all process success!"<< endl;//父进程派发任务BalanceSendTask(assignMap);//回收资源for(int i =0; i < processNum; i++){close(assignMap[i].second);}for(int i =0; i < processNum; i++){if(waitpid(assignMap[i].first,nullptr,0)>0){
      cout <<"wait for"<< assignMap[i].first <<"success!"<<"number:"<< i << endl;}}return0;}

用fork来共享管道

image-20221031161452915

站在文件描述符角度-深度理解管道

image-20221031164213553

问:为什么父进程要分别打开读和写?

答:为了让子进程继承,让子进程不必再打开了。

问:为什么父子要关闭对应的读和写?

答:因为管道必须是单向通信的,一端是读端,另已端必须是写端。

问:谁决定父子关闭读端还是写端?

答:由需求决定。

站在内核角度-管道本质

image-20221101192704676

理解管道操作 – |

注意:|操作的本质就是匿名管道

image-20221101193113741

sleep 1000 | sleep 100

这两个进程(sleep 1000和sleep 100)的关系是什么呢?两个进程的ppid是一样的,即有同样的父进程。

以下面的命令进行举例:

cat pipe.cc | wc -l 

**父进程fork两个子进程即

cat pipe.cc

wc -l

,父进程在创建进程的同时,创建了一条匿名管道,两个进程通过该匿名管道来进行通信,cat pipe.cc和wc -l两个进程分别关闭读写端,父进程关闭这个进程的读写端,cat进程进行输出重定向,wc进程进行输入重定向。**

管道读写规则

  • 当没有数据可读时: - O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止- O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
  • 当管道满的时候 - O_NONBLOCK disable: write调用阻塞,直到有进程读走数据- O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
  • 如果所有管道写端对应的文件描述符被关闭,则read返回0
  • 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程 退出
  • 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
  • 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。

管道的特点

  1. 管道只能用来进行具有血缘关系的进程之间进行进程间通信,常用于父子间通信或者兄弟间通信。(只属于匿名管道)
  2. 管道只能单向通信(内核实现决定的),半双工的一种特殊情况
  3. 管道自带同步机制(内核会对管道操作进行同步与互斥)(pipe满,writer等,pipe空,reader等)
  4. 管道是面向字节流的,先写的字符,一定是先被读取的,没有格式边界,需要用户来定义区分内容的边界
  5. 管道的生命周期跟随进程 – 管道是文件 – 进程退出了,曾经打开是文件引用计数到达0就会自动退出

命名管道

  • 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
  • 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
  • 命名管道是一种特殊类型的文件

原理

image-20221101210357942

和匿名管道的区别:

匿名管道:子进程继承父进程。

命名管道:通过打开同一个fifo文件,进行信息的交互(路径具有唯一性)

注意:我们使用的命名管道,更多的时候是作为一种标定的作用,内存中的管道文件中的数据不会刷新到磁盘中,即使在进行通信的时候,命名管道的大小也始终是0个字节

创建一个命名管道

  • 命名管道可以从命令行上创建,命令行方法是使用下面这个命令:$ mkfifo filename
  • 命名管道也可以从程序里创建,相关函数有:int mkfifo(const char *filename,mode_t mode);

命名管道具体示例

命令行创建

简单使用:

image-20221101202732807

image-20221101202801137

代码示例

创建管道文件:

image-20221101202319701

删除管道文件:

image-20221102101704696

代码示例:

clientFifo.cpp文件:

#include"comm.h"usingnamespace std;intmain(){int pipeFd =open(IPC_PATH, O_WRONLY);if(pipeFd <0){
    cerr <<"open error"<< endl;return1;}#defineNUM1024char line[NUM];while(true){printf("请输入你的消息#");fflush(stdout);memset(line,0,sizeof(line));if(fgets(line,sizeof(line),stdin)!=nullptr){
      line[strlen(line)-1]='\0';write(pipeFd, line,strlen(line));//12345\n\0}else{break;}}
  cout <<"退出客户端"<< endl;close(pipeFd);return0;}

serverFifo.cpp文件:

//写入#include"comm.h"usingnamespace std;intmain(){if(mkfifo(IPC_PATH,0666)!=0)//创建管道文件{
    cerr <<"mkfifo error"<< endl;return1;}int pipeFd =open(IPC_PATH, O_RDONLY);if(pipeFd <0){
    cerr <<"open error"<< endl;return2;}//正常的通信过程#defineNUM1024char buffer[NUM];while(true){
    ssize_t s =read(pipeFd, buffer,sizeof(buffer)-1);if(s >0){
      buffer[s]='\0';
      cout <<"客户端->服务器#"<< buffer << endl;}elseif(s ==0){
      cout <<"客户退出了,我也退出了!"<< endl;break;}else{//do nothing
      cerr <<"read"<<strerror(errno)<< endl;}}close(pipeFd);
  cout <<"服务端退出了"<< endl;unlink(IPC_PATH);//sreturn0;}

comm.h文件:

#pragmaonce#include<cstdio>#include<iostream>#include<sys/types.h>#include<sys/stat.h>#include<fcntl.h>#include<unistd.h>#include<cstring>#include<cerrno>#defineIPC_PATH"./.fifo"

makefile文件:

.PHONY:all
all:clientFifo serverFifo
    
clientFifo:clientFifo.cpp
    g++  -o $@ $^ -std=c++11
serverFifo:serverFifo.cpp
    g++  -o $@ $^ -std=c++11
.PHONY:clean
clean:
    rm -rf clientFifo serverFifo .fifo
标签: unix linux 服务器

本文转载自: https://blog.csdn.net/m0_57304511/article/details/128996048
版权归原作者 鹿九丸 所有, 如有侵权,请联系我们删除。

“进程间通信(上)”的评论:

还没有评论