文章目录
📖 前言
从本章开始,我们开始学习进程通信相关的知识,本章将来详细探讨一下管道,学习匿名管道和命名管道的原理和代码实现等相关操作。目标已经确定,接下来就要搬好小板凳,准备开讲了…🙆🙆🙆🙆
1. 通信背景
在我们之前的学习中,我们知道进程是具独立性的。但是不要以为进程独立了,就是彻底独立,有时候,我们需要进程间能够进行一定程度的信息交互。
1.1 进程通信的目的:
- IPC就是通信的简称Inter - Process Communication
进程间通信目的:
- 数据传输: 一个进程需要将它的数据发送给另一个进程。
- 资源共享: 多个进程之间共享同样的资源。
- 通知事件: 一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制: 有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
需要多进程进行协同处理一件事情(并发处理)。单纯的数据传输,一个进程想把数据发给另一个进程。多进程之间共享同样的资源。一个进程想让另一个进程做其他的事情,进程控制。
举一个通信的例子:
两个通信标准:
- system V IPC: 用的非常少了,设计的非常重,更多的用来本地通信。
- POSIX IPC: 设计的很轻,可以本地,可以做成网络,因为里面有套接字。
1.2 管道的引入:
在我们刚学Linux时,就接触过竖划线
|
的操作,那么究竟什么是管道呢?
- 管道是
Unix
中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个 “管道”。
两个进程看到同一份资源才具备通信的条件:
- 通信之前,让不同的进程看到同一份资源(文件,内存块…)
- 我们要学的进程间通信,不是告诉我们如何通信,而是让两个进程如何看到同一份资源!
- 资源的不同,决定了,不同种类的通信方式!
其中【管道 】就提供了共享资源的一种手段!
如何才能让两个进程看到同一份资源?
- 让两个进程同时看到磁盘上的同一份文件:
- 这种方法太矬了,CPU和外设之间的读写速度相差太大了。
- 因为通信也是一个相对常规的操作,将数据刷到外设再从外设上读取,效率太低了。
- 通信到多数都是内存级的。
- 让两个进程打开同一个文件:
- 【文件描述符 - 复习传送门】对同一个文件进行管理,写 -> 管道 -> 读。
生活中的管道大多数都是单向的,进程通信中的管道数据传输也是单向的。
进程通信的核心思想:让两个进程获取到同一份资源
2. 匿名管道
2.1 匿名管道的原理:
创建子进程,子进程是以父进程为模板,代码共享,数据要发生写时拷贝,文件描述符的映射表也拷贝了一份,并且内容也拷贝到子进程中了。
- 但是struct file并没有被拷贝。
- struct file中有个引用计数,是指对该文件的引用数量,用于跟踪文件被多少个进程或内核对象所引用。
- 父进程指向什么文件,子进程也指向什么文件。
- 这也就是为什么,创建fork子进程之后,我们让父子printf打印的时候,父子进程都会向同一个显示器打印,因为它们俩都指向了同一个文件。
Linux中可以通过特定的系统调用来判断文件是普通文件还是管道文件:
- 能通过特定调用来识别到文件是普通文件还是管道文件,还是一个字符设备文件。
- 知道了之后就能指向对应的底层设备是什么。
- 如果该文件不再是一个磁盘文件,通过特定的接口来表征自己的身份。
- 不再是一个磁盘文件,相当于和磁盘脱离。
- 自己读写数据时,就在这个文件对应的内存缓冲区里面来完成数据交互,我们把这个文件我们称之为管道。
如果设计的时候就设计成,如果是普通文件就往磁盘上写,如果是管道文件也往缓冲区里写,但是就
不再往磁盘上刷新了。如果是管道,就把它和对应的磁盘去关联。
Linux下一切皆文件,管道也是文件~
管道式基于文件设计的,是个内存级文件,当中的数据不需要刷新到磁盘当中。
2.2 匿名管道的创建:
匿名管道主要用于父子进程之间的通信,用
pipe
接口来创建管道:
- 可以理解成
pipe
封装了open
,open
了两次。 - 创建文件时在内核当中把文件类型给确定成了管道文件。
- 返回值为0表示成功,-1表示失败。
- 在失败的情况下,可以使用errno来获取具体的错误信息。
- 常见的错误包括文件描述符达到上限、内存不足等。
输出型参数:
我们需要传入一个由两个整型元素组成的数组作为参数,例如 int fd[2]。这个数组被称为pipe函数的输出型参数,它用于接收pipe函数返回的两个文件描述符。
具体来说,fd[0] 是管道的读端文件描述符,用于从管道中读取数据;fd[1] 是管道的写端文件描述符,用于向管道中写入数据。
注意:
- 在使用系统调用pipe()创建管道时,pipefd[0]是指向管道读取端的文件描述符,pipefd[1]是指向管道写入端的文件描述符。
- 管道是一种单向的通信机制,数据从写入端流入,然后从读取端流出。
- pipefd[0]和pipefd[1]的用途是固定的,并且无法更改。
- 根据管道的设计原则,读取端只能从pipefd[0]读取数据,而写入端只能向pipefd[1]写入数据。
- 如果你尝试将pipefd[0]用作写入端,或者将pipefd[1]用作读取端,将会导致错误。
- 这样做会破坏管道的设计约定,数据无法正确地流动和传输。
- 因此,请务必遵循约定,使用管道的读取端和写入端,以确保正确的管道通信。
- 如果你需要双向通信,可以考虑使用两个独立的管道或其他适合的通信机制。
- 向父进程写,子进程读,还是父进程读,向子进程写,都看个人的需求。
- 一般都是由父进程调用pipe函数打开两个文件描述符,后面子进程继承。
连通一个管道:
- 如果要父进程写,子进程读,就要父进程关闭读端,子进程关闭写端。
- 如果要父进程读,子进程写,就要父进程关闭写段,子进程关闭读端。
2.3 父子进程通信:
我们以子进程关闭写端,父进程关闭读端为例:
#include<iostream>#include<string>#include<cstdio>#include<cstring>#include<ctime>#include<unistd.h>#include<sys/wait.h>#include<sys/types.h>usingnamespace std;// 演示pipe通信的基本过程 -- 匿名管道intmain(){// 1. 创建管道int pipefd[2]={0};if(pipe(pipefd)!=0){
cerr <<"pipe erro"<< endl;return1;}// 2. 创建子进程
pid_t id =fork();if(id <0){
cerr <<"fork error"<< endl;return2;}elseif(id ==0){// 子进程// 让子进程来进行读取,子进程就应该关掉写端close(pipefd[1]);#defineNUM1024char buffer[NUM];while(true){
cout <<"时间戳"<<(uint64_t)time(nullptr)<< endl;// 子进程没有带sleep,为什么子进程也会休眠呢??memset(buffer,0,sizeof(buffer));
ssize_t s =read(pipefd[0], buffer,sizeof(buffer)-1);if(s >0){// 读取成功
buffer[s]='\0';
cout <<"子进程收到消息,内容是:"<< buffer << endl;}elseif(s ==0){
cout <<"父进程写完了,我也退出了!"<< endl;break;}else{//do noting}}close(pipefd[0]);exit(0);}else{// 父进程// 让父进程进行写入,父进程就应该关掉读端close(pipefd[0]);constchar* msg ="你好子进程,我是父进程,这次发送的信息编号是";int cnt =0;while(cnt <5){char sendBuffer[1024];sprintf(sendBuffer,"%s : %d", msg, cnt);write(pipefd[1], sendBuffer,strlen(sendBuffer));sleep(1);
cnt++;}close(pipefd[1]);
cout <<"父进程写完了"<< endl;}
pid_t res =waitpid(id,nullptr,0);if(res >0){
cout <<"等待子进程成功"<< endl;}// 0 -> 嘴巴 -> 读(嘴巴)// 1 -> 笔 -> 写// cout << "fd[0]" << pipefd[0] << endl;// cout << "fd[1]" << pipefd[1] << endl;return0;}
通过文件接口对
pipefd
返回的两个文件描述符,进行
read/write
,就能让父进程写进管道的字符串被子进程从管道读取到了:
2.3.1 read()阻塞等待
当父进程没有写入数据的时候,子进程在等!所以,父进程写入之后,子进程才能read到数据,子进程打印读取数据要以父进程的节奏为主!
管道内部,没有数据,读端就必须阻塞等待(read)
- 等待管中有数据,否则无法执行后面的代码。
管道内部,如果数据被写满了,写端就必须阻塞等待(write)
- 等待管中有空间,否则此时写入会覆盖之前的数据。
管道内数据,写满了就不能再写了,读完了就不能再读了,这样就保证了管道内数据的合理性。
将当前进程的
task_struct
放入等待队列中,并将状态从R设置为S/D/T!等待一定是在一个文件上等的,这个文件一定是个管道文件。
而这个管道文件内部一定维护了一个等待队列
wait_queue_head_t
,一个链表结构。一个进程条件不满足时,会将自己列入到管道资源对的等待队列里。
当管道里有数据了操作系统就知道了,将进程从等待队列里拿出来,再放到运行队列里。
如果父进程就是不写入,那么子进程就一直在等待:
在管道中,对于读取端的
read
操作,当管道中没有数据可读时,read
函数会阻塞等待,直到有数据可读或管道被关闭。所以在父进程写入数据的过程中,子进程调用
read
函数,父进程写入休眠1秒并不会导致read
函数立即返回0,而是等待父进程写入数据。只有当父进程关闭写端并且将所有数据写入管道后,子进程的read函数才会返回0,示管道中的数据已经读取完毕。
因此,在父进程写入数据时,子进程在调用
read
函数时会等待父进程写入数据,并不会判断为文件读取完毕。只有当父进程关闭写端时,子进程的read函数才会判断为文件读取完毕。
- 如果管道的写入端已经关闭(所有写入端都关闭),但读取端仍然打开,那么读取端的 read() 调用将会阻塞等待,直到有数据可读或者管道被关闭。
- 反之,如果管道的读取端已经关闭(所有读取端都关闭),而写入端仍然打开,那么写入端的 write() 调用可能会引发信号 SIGPIPE 或返回错误。
2.4 父进程给子进程派发任务:
我们结合上述所学知识,就可以简单写一个通过通信管道父进程给子进程派发任务执行的代码了。
#include<iostream>#include<vector>#include<string>#include<unordered_map>#include<cstdio>#include<cstring>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#include<sys/types.h>#include<cassert>usingnamespace std;// 父进程控制子进程typedefvoid(*functor)();
vector<functor> functors;// 方法集合//for debug
unordered_map<uint32_t, string> info;voidf1(){
cout <<"这是一个处理日志的任务, 执行的进程 ID ["<<getpid()<<"]"<<"执行时间是["<<time(nullptr)<<"]\n"<< endl;}voidf2(){
cout <<"这是一个备份数据任务, 执行的进程 ID ["<<getpid()<<"]"<<"执行时间是["<<time(nullptr)<<"]\n"<< endl;}voidf3(){
cout <<"这是一个处理网络连接的任务, 执行的进程 ID ["<<getpid()<<"]"<<"执行时间是["<<time(nullptr)<<"]\n"<< endl;}voidloadFunctor(){
info.insert({functors.size(),"处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(),"备份数据任务"});
functors.push_back(f2);
info.insert({functors.size(),"网络连接的任务"});
functors.push_back(f3);}intmain(){// 0. 加载任务列表loadFunctor();// 1. 创建管道int pipefd[2]={0};if(pipe(pipefd)!=0){
cerr <<"pipe error"<< endl;return1;}// 2. 创建子进程
pid_t id =fork();if(id <0){// 创建失败
cerr <<"fork error"<< endl;return2;}elseif(id ==0){// 子进程,read - 读取// 3. 关闭不需要的文件fdclose(pipefd[1]);// 子进程不断根据收到的信息,执行对应的方法// 如果没有人往管道中写,此时子进程就卡在了read这里等待别人分配任务while(true){uint32_t operatorType =0;// 从fd为pipefd[0]的文件里读sizeof(uint32_t)个字节的内容,写到operatorType中去// 如果有数据就读取,如果没有数据就阻塞等待,等待任务的到来。
ssize_t s =read(pipefd[0],&operatorType,sizeof(uint32_t));if(s ==0){
cout <<"我要退出了..."<< endl;break;}assert(s ==sizeof(uint32_t));(void)s;// 走到这里一定是一个成功的读取if(operatorType < functors.size()){
functors[operatorType]();}else{
cerr <<"bug? operatorType = "<< operatorType << endl;}}close(pipefd[0]);exit(0);}elseif(id >0){srand((longlong)time(nullptr));// 父进程,write - 操作// 3. 关闭不需要的文件fdclose(pipefd[0]);// 4. 指派任务int num = functors.size();int cnt =10;while(cnt--){// 5. 形成任务码uint32_t commandCode =rand()% num;
cout <<"父进程指派任务完成,任务是:"<< info[commandCode]<<"任务的编号是: "<< cnt << endl;// 向指定的进程下达执行任务的操作write(pipefd[1],&commandCode,sizeof(uint32_t));sleep(1);}close(pipefd[1]);
pid_t res =waitpid(id,nullptr,0);if(res) cout <<"wait success"<< endl;}return0;}
编码小细节:
- 我们代码当中用到了一个assert 。
- assert是断言,编译有效,debug 模式。
- 但是在release模式下,断言就没了。
- 一旦断言没有了,s变量就是只被定义了,没有被使用。
- release模式中,可能会有warning。
- (void)s 是一个用于消除编译器警告的技巧。
由于在这里我们并不使用这个值,所以加上(void)前缀可以告诉编译器我们明确地不打算使用它,以避免产生未使用变量的警告信息。
查看一下匿名管道:
2.5 控制多个子进程(进程池):
控制多个进程:
#include<iostream>#include<vector>#include<string>#include<unordered_map>#include<cstdio>#include<cstring>#include<ctime>#include<cstdlib>#include<unistd.h>#include<sys/wait.h>#include<sys/types.h>#include<cassert>usingnamespace std;// 进程池typedefvoid(*functor)();
vector<functor> functors;// 方法集合//for debug
unordered_map<uint32_t, string> info;voidf1(){
cout <<"这是一个处理日志的任务, 执行的进程 ID ["<<getpid()<<"]"<<"执行时间是["<<time(nullptr)<<"]\n"<< endl;}voidf2(){
cout <<"这是一个备份数据任务, 执行的进程 ID ["<<getpid()<<"]"<<"执行时间是["<<time(nullptr)<<"]\n"<< endl;}voidf3(){
cout <<"这是一个处理网络连接的任务, 执行的进程 ID ["<<getpid()<<"]"<<"执行时间是["<<time(nullptr)<<"]\n"<< endl;}voidloadFunctor(){
info.insert({functors.size(),"处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(),"备份数据任务"});
functors.push_back(f2);
info.insert({functors.size(),"网络连接的任务"});
functors.push_back(f3);}// 第一个int32_t: 进程pid,第二个int32_t: 该进程对应的管道写端fdtypedef pair<int32_t,int32_t> elem;int processNum =5;voidwork(int blockFd){// 子进程核心工作的代码while(true){// a. 阻塞等待 b. 获取任务信息uint32_t operatorCode =0;
ssize_t s =read(blockFd,&operatorCode,sizeof(uint32_t));if(s ==0)break;
cout <<"进程["<<getpid()<<"]"<<"开始工作"<< endl;assert(s ==sizeof(uint32_t));(void)s;// 编程小技巧if(operatorCode < functors.size()){// c. 处理任务
functors[operatorCode]();}else{
cerr <<"bug? operatorCode = "<< operatorCode << endl;}}
cout <<"进程["<<getpid()<<"]"<<"结束工作"<< endl;}// [子进程的pid, 子进程的管道fd]voidblanceSendTask(const vector<elem>& processFds){srand((longlong)time(nullptr));// 随机给某个进程派发随机某个任务:// uint32_t cnt = 10;// while (cnt--)// {// sleep(1);// // 选择一个进程,选择进程是随机的,没有压着一个进程给任务// // 较为均匀的将任务给所有的子进程 -- 负载均衡// uint32_t pick = rand() % processFds.size();// // 选择一个任务// uint32_t task = rand() % functors.size();// // 把任务给一个指定的进程// write(processFds[pick].second, &task, sizeof(task));// // 打印对应的提示信息// cout << "父进程指派任务->" << info[task] << "给进程: " // << processFds[pick].first << "编号: " << pick << endl;// }// 将这几个进程创建的管道的写端给挨个关上// for(int i = 0; i < processFds.size(); i++)// {// close(processFds[i].second);// }// 给这几个进程挨个派发随机任务;for(int i =0; i < processFds.size(); i++){sleep(1);int j =rand()% functors.size();write(processFds[i].second,&j,sizeof(int));close(processFds[i].second);}}intmain(){// 加载任务列表loadFunctor();
vector<elem> assignMap;// 创建processNum个进程for(int i =0; i < processNum; i++){// 定义管道保存fd的对象int pipefd[2]={0};if(pipe(pipefd)!=0){
cerr <<"pipe error"<< endl;return1;}// 创建子进程
pid_t id =fork();if(id ==0){// 子进程执行,read -> pipefd[0]close(pipefd[1]);// 子进程执行work(pipefd[0]);close(pipefd[0]);exit(0);}elseif(id >0){// 父进程做的事情,pipefd[1]close(pipefd[0]);
elem e(id, pipefd[1]);// 将创建出来的子进程的pid存起来
assignMap.push_back(e);}}
cout <<"creat all process success!\n"<< endl;// 父进程,派发任务blanceSendTask(assignMap);// 回收资源for(int i =0; i < processNum; i++){if(waitpid(assignMap[i].first,nullptr,0)>0)
cout <<"wait for:"<< assignMap[i].first <<" wait success! "<<" number: "<< i << endl;}
cout <<"----------------------------程序结束----------------------------"<< endl;return0;}
- 定义了三个任务函数:f1()、f2()和f3(),分别代表处理日志的任务、备份数据任务和处理网络连接的任务。
- loadFunctor()函数用于将任务函数加载到functors向量中,并在info哈希表中保存每个任务函数对应的描述信息。
- main()函数中使用fork()函数创建了processNum个子进程,每个子进程都执行work()函数来处理任务。
- work()函数是子进程的核心工作代码,它通过管道从父进程接收任务编号,并根据编号调用相应的任务函数进行处理。
- blanceSendTask()函数在父进程中循环运行,每隔一秒随机选择一个进程和一个任务,并通过管道向选中的进程发送任务编号。
- 最后,在主函数中回收子进程资源并关闭管道。
总体来说,这段代码实现了一个简单的进程池,通过负载均衡的方式将任务分发给子进程进行处理。
分别给进程池里的进程派发随机任务:
2.6 命令行 | 操作:
- 命令行中输入的
|
命令,其实就是一个匿名管道:
我们来查看一下进程状态:
我们看到这两个进程同属于一个父进程,这就说明
sleep
进程是一对兄弟进程。
PID
不同,PPID
相同,说明有相同的父进程。- 说明这两个进程是兄弟关系。
由父子之间的通信转化成兄弟之间的通信:
- 父进程创建一个子进程, 文件描述符表就被于进程继承了。
- 创建两个子进程也是继承了文件描述符表。
- 相当于两个子进程共享同一个管道,然后两个子进程各自关闭读写端,通信过程就变成两个子进程之间的通信。
所以竖划线本质是匿名管道,底层实现就是用匿名管道做的:
- 命令行分析,碰到
|
时,我们左边当一条命令,右边当一条命令。 - 然后将这两个进程之间的管道创建好,再
fork
两次创建子进程,让这两个子进程各自继承对应的文件描述符。 - 再关闭对应的读写端对
cat mytest
做输出重定向,对wc -l
做输入重定向。 - 第一个子进程将输出重定向到管道的写端,第二个子进程将输入重定向到管道的读端。
- 这样就建立了两个命令之间的通信。
在 Linux 中,符号 “|” 表示管道(pipeline),用于将一个命令的输出连接到另一个命令的输入。在使用 “|” 时,前一个进程的标准输出会被连接到后一个进程的标准输入。这意味着前一个进程是写端,后一个进程是读端。
3. 命名管道
类似于创建匿名管道:
返回值:
3.1 创建一个命名管道:
创建命名管道时候,要指明路径,和
umask
值,为了防止默认
umask
的扰乱,我们一开始将`umask``置为0。
umask(0);if(mkfifo("./.fifo",0600)!=0)//当返回值不为0的时候,代表出现了错误{
cerr <<"mkfifo error"<< endl;return1;}
管道文件是以p开头的:
通过管道实现的,两个终端虽然不一样,但是cat是进程,echo也是个进程,这两个进程都属于操作系统,写和读是同一个文件:
3.2 两个进程之间的通信:
匿名管道之间的通信是基于父子进程继承的关系来实现的。而让两个毫不相干的进程实现进程通信则是命名管道做的事情。
命名管道,进程间通信的本质是:不同的进程要看到同一份资源。
- 匿名管道:子进程继承父进程。
- 命名管道:通过一个
fifo文件
有路径就具有唯一性,通过路径,就能找到同一个资源! - 只要都通过对应的管道文件所在的路径,就能保证使用路径的唯一性,就能够打开同一个文件。
- 只要打开的是同一个文件在内核里用的就是同一个
struct file
,那么指向的就是同一个inode
,用的就是同一个缓冲区。 - 此时就看到了同一个资源。
命名管道是让两个进程之间是看到同一个文件,这个文件做了符号处理,相当于管道文件(通信时,数据不会刷新到磁盘上),操作系统一看到这个文件就知道了,这个文件的数据不用刷新到磁盘上,所以此时就在内存里,就有了管道。
头文件:
#pragma#include<iostream>#include<cstdio>#include<string>#include<cstring>#include<cerrno>#include<sys/types.h>#include<sys/stat.h>#include<sys/fcntl.h>#include<unistd.h>#defineIPC_PATH"./.fifo"usingnamespace std;
客户端:
#include"comm.h"// 写入intmain(){int pipeFd =open(IPC_PATH, O_WRONLY);if(pipeFd <0){
cerr <<"open: "<<strerror(errno)<< endl;return1;}#defineNUM1024char line[NUM];// 进行通信while(true){printf("请输入你的消息# ");fflush(stdout);memset(line,0,sizeof(line));// fgets -> C语言的函数 -> line结尾自动添加\0if(fgets(line,sizeof(line),stdin)!=nullptr){
line[strlen(line)-1]='\0';write(pipeFd, line,strlen(line));}else{break;}}close(pipeFd);
cout <<"客户端退出了"<< endl;return0;}
服务端:
#include"comm.h"// 读取intmain(){umask(0);// server创建好了,client就不用创建了if(mkfifo(IPC_PATH,0600)!=0){
cerr <<"mkfifo error"<< endl;return1;}int pipeFd =open(IPC_PATH, O_RDONLY);if(pipeFd <0){
cerr <<"open fifo error"<< endl;return2;}#defineNUM1024// 正常的通信过程char buffer[NUM];while(true){
ssize_t s =read(pipeFd, buffer,sizeof(buffer)-1);if(s >0){
buffer[s]='\0';
cout <<"客户端->服务器#"<< buffer << endl;}elseif(s ==0){
cout <<"客户退出了,我也推出了"<< endl;break;}else{// do nothing
cout <<"read: "<<strerror(errno)<< endl;}}close(pipeFd);
cout <<"服务端退出了"<< endl;// 跑完之后删除管道unlink(IPC_PATH);return0;}
必须server先跑,才能出现管道文件:
4. 特征总结
- 管道只能用来进行具有血缘关系的进程之间,进行进程间通信。常用于父子通信
- 管道只能单向通信(内核实现决定的),半双工的一种特殊情况。(半双工和全双工是网络的概念)
- 管道自带同步机制(pipe满,writer等,pipe空,reader等),自带访问控制。
- 管道是面向字节流的,现在还解释不清楚。先写的字符,一定是先被读取的,没有格式边界,需要用户来定义区分内容的边界。
- 管道的生命周期,管道是文件,进程退出了,曾经打开的文件会怎么办?退出(自动关闭文件) – 随进程。
版权归原作者 yy_上上谦 所有, 如有侵权,请联系我们删除。