🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
目录
👉进程间通信介绍👈
进程间通信(Interprocess Communication)就是两个进程之间进行通信。进程是具有独立性(虚拟地址空间 + 页表保证进程运行的独立性),所以进程间通信成本会比较高!进程间通信的前提条件是先让不同的进程看到同一份资源(内存空间),该资源不能隶属于任何一个进程,应该属于操作系统,被进行通信的进程所共享。
进程间通信目的
单进程无法使用并发能力,更加无法实现多进程协同,那么就有了进程间通信。进程间通信的目的如下:
- 数据传输:一个进程需要将它的数据发送给另一个进程。
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如 Debug 进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信发展和分类
进程间通信的发展和分类如下:
- Linux 原生能提供的管道,管道主要包括匿名管道 pipe 和命名管道。
- SystemV 进程间通信,System V IPC 主要包括 System V 消息队列、System V 共享内存和 System V 信号量。System V 只能本地通信。
- POSIX 进程间通信,POSIX IPC 主要包括消息队列、共享内存、信号量、互斥量、条件变量和读写锁。POSIX 进程通信既能进行本地通信,又能进行网络远程通信,具有高扩展和高可用性。
👉管道👈
什么是管道
日常生活中,有非常多管道,如:天然气管道、石油管道和自来水管道等。管道是 Unix 中最古老的进程间通信的形式,我们把从一个进程连接到另一个进程的一个数据流称为管道。管道传输的都是资源,并且只能单向通信。
管道的原理
管道的本质就是文件。与文件的区别就是管道中的数据是不用写入到磁盘中的(持久化)。进程间通信都是内存级别的通信,如果还要将数据写入到内存,那么通信的效率就会大大下降。
如何做到让不同的进程看到同一份资源的呢?fork 创建子进程,让子进程继承父进程的与进程管理相关的内核数据结构,这样就能够让具有血缘关系的进程进行进程间通信,常用于父子进程。
匿名管道
匿名管道就是没有名字的管道,可以通过系统调用 pipe 来创建匿名管道。pipe 函数的参数是 int pipefd[2],它是输出型参数,通过 pipefd 数组可以拿到系统为我们创建的匿名管道文件。pipefd[0] 是读端,pipefd[1] 是写端(巧记:0 像嘴巴,用来读书;1 像钢笔,用来写字)。如果管道创建成功,返回值为 0;如果管道创建失败,返回值为 -1,并设置相应的错误码。
Makefile 文件
mypipe:mypipe.cc
g++ $^ -o$@-std=c++11 #-D DEBUG
.PHONY:clean
clean:
rm-f mypipe
注:.cc 后缀也是 C++ 文件的表示方法之一,-D 是命令行定义,可用于 Debug。如果一个变量只声明并没有被使用,在 Realease 版本下会有大量的告警。为了避免告警,可以将该变量强转为 void。assert 在 Realease 版本下不起作用。
#include<iostream>#include<string>#include<assert.h>#include<unistd.h>#include<sys/types.h>#include<sys/wait.h>#include<cstdio>#include<cstring>usingnamespace std;intmain(){// 1. 创建管道// pipefd[0]:读端(0像嘴巴,读书)// pipefd[1]:写端(1像钢笔,写字)int pipefd[2]={0};int n =pipe(pipefd);assert(n !=-1);(void)n;// 避免Realease编译时出现大量告警// 条件编译可以搭配命令行定义来进行debug#ifdefDEBUG
cout <<"pipefd[0]:"<< pipefd[0]<< endl;// 3
cout <<"pipefd[1]:"<< pipefd[1]<< endl;// 4#endif// 2. 创建子进程// fork创建子进程失败返回-1
pid_t id =fork();assert(id !=-1);if(id ==0){// 关闭子进程不需要的fd,子进程进行读取close(pipefd[1]);char buffer[1024];// 缓冲区while(true){
ssize_t s =read(pipefd[0], buffer,sizeof(buffer)-1);if(s >0){
buffer[s]=0;
cout <<"child process["<<getpid()<<"]"<<"get a message, Fathe#"<< buffer << endl;}}// close(pipefd[0]); exit(0);// 进程退出,文件描述符会被关掉,不代表文件被关掉}// 关闭父进程不需要的fd,父进程进行写入close(pipefd[0]);
string message ="我是父进程,我正在给你发消息";char send_buffer[1024];int count =0;while(true){// 构造变化的字符串snprintf(send_buffer,sizeof(send_buffer),"%s[%d] : %d", message.c_str(),getpid(), count++);write(pipefd[1], send_buffer,strlen(send_buffer));sleep(1);}
pid_t ret =waitpid(id,nullptr,0);// 阻塞等待assert(ret >0);(void)ret;close(pipefd[1]);return0;}
注:不能定义全局缓冲区 buffer 来通信,因为有写时拷贝的存在会保证父子进程信息的独立,所以就无法通过全局的 buffer 来进行通信。
管道的特点
- 管道是用来进行具有血缘关系的进程进行进程间通信,常用于父子进程。
- 匿名管道需要在创建子进程之前创建,因为只有这样才能复制到管道的操作句柄,与具有亲缘关系的进程实现访问同一个管道通信。
- 管道本质是内核中的一块缓冲区,多个进程通过访问同一块缓冲区实现通信。
- 显示器也是一个文件,父子进程同时向显示器写入的时候,没有一个进程等另一个进程的情况,也就是说缺乏访问控制。而管道是为了让进程间协同,其提供了访问控制。 - 写快,读满,将管道文件写满了就不能再写了- 写满,读快,管道文件中没有数据的时候,读端必须等写端进行数据写入- 写关,读 0,标识读到了管道文件的结尾- 读关,写继续写,操作系统会终止写进程。
- 管道提供的是面向流式的通信服务(面向字节流),需要定制协议来进行数据区分。
- 管道是基于文件的,文件的生命周期是随进程的,那么管道的生命周期也是随进程的。
- 一般而言,内核会对管道操作进行同步与互斥
- 管道是单向通信的,就是半双工通信的一种特殊情况,数据只能向一个方向流动。需要双方通信时,需要建立起两个管道。半双工通信就是要么在收数据,要么在发数据,不能同时在收数据和发数据(比如两个人在交流时,一个人在说,另一个人在听);而全双工通信是同时进行收数据和发数据(比如两个人吵架的时候,相互问候对方,一个人既在问候对方又在听对方的问候)。
- 当要写入的数据量不大于 PIPE_BUF 时,Linux 将保证写入的原子性。
- 当要写入的数据量大于 PIPE_BUF 时,Linux 将不再保证写入的原子性。
- 指事务的不可分割性,一个事务的所有操作要么不间断地全部被执行,要么一个也没有执行。
写关读 0 的情况
#include<iostream>#include<string>#include<assert.h>#include<unistd.h>#include<sys/types.h>#include<sys/wait.h>#include<cstdio>#include<cstring>usingnamespace std;intmain(){// 1. 创建管道// pipefd[0]:读端(0像嘴巴,读书)// pipefd[1]:写端(1像钢笔,写字)int pipefd[2]={0};int n =pipe(pipefd);assert(n !=-1);(void)n;// 避免Realease编译时出现大量告警// 条件编译可以搭配命令行定义来进行debug#ifdefDEBUG
cout <<"pipefd[0]:"<< pipefd[0]<< endl;// 3
cout <<"pipefd[1]:"<< pipefd[1]<< endl;// 4#endif// 2. 创建子进程// fork创建子进程失败返回-1
pid_t id =fork();assert(id !=-1);if(id ==0){// 关闭子进程不需要的fd,子进程进行读取close(pipefd[1]);char buffer[1024];while(true){// 写入的一方,fd没有关闭,如果有数据,就读,没有数据就等// 写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾!
ssize_t s =read(pipefd[0], buffer,sizeof(buffer)-1);if(s >0){
buffer[s]=0;
cout <<"child process["<<getpid()<<"]"<<"get a message, Fathe#"<< buffer << endl;}elseif(s ==0){
cout <<"writer quit(father), me quit too!!!"<< endl;break;}}// close(pipefd[0]); exit(0);// 进程退出,文件描述符会被关掉,不代表文件被关掉}// 关闭父进程不需要的fd,父进程进行写入close(pipefd[0]);
string message ="我是父进程,我正在给你发消息";char send_buffer[1024];int count =0;while(true){// 构造变化的字符串snprintf(send_buffer,sizeof(send_buffer),"%s[%d] : %d", message.c_str(),getpid(), count++);write(pipefd[1], send_buffer,strlen(send_buffer));sleep(1);if(count ==5){
cout <<"writer quit(father)"<< endl;break;}}close(pipefd[1]);
pid_t ret =waitpid(id,nullptr,0);// 阻塞等待assert(ret !=-1);(void)ret;return0;}
读关,写继续写,操作系统终止写进程
#include<iostream>#include<string>#include<assert.h>#include<unistd.h>#include<sys/types.h>#include<sys/wait.h>#include<cstdio>#include<cstring>usingnamespace std;intmain(){// 1. 创建管道// pipefd[0]:读端(0像嘴巴,读书)// pipefd[1]:写端(1像钢笔,写字)int pipefd[2]={0};int n =pipe(pipefd);assert(n !=-1);(void)n;// 避免Realease编译时出现大量告警// 条件编译可以搭配命令行定义来进行debug#ifdefDEBUG
cout <<"pipefd[0]:"<< pipefd[0]<< endl;// 3
cout <<"pipefd[1]:"<< pipefd[1]<< endl;// 4#endif// 2. 创建子进程// fork创建子进程失败返回-1
pid_t id =fork();assert(id !=-1);if(id ==0){// 关闭子进程不需要的fd,子进程进行读取close(pipefd[1]);char buffer[1024];int count =0;while(true){// 写入的一方,fd没有关闭,如果有数据,就读,没有数据就等// 写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾!
ssize_t s =read(pipefd[0], buffer,sizeof(buffer)-1);if(s >0){
buffer[s]=0;++count;
cout <<"child process["<<getpid()<<"]"<<"get a message, Fathe#"<< buffer << endl;}else{
cout <<"writer quit(father), me quit too!!!"<< endl;break;}// 验证读提前退出,写继续写,操作系统终止写进程的情况if(count ==5){
cout <<"child quit!"<< endl;break;}}close(pipefd[0]);exit(0);// 进程退出,文件描述符会被关掉,不代表文件被关掉}// 关闭父进程不需要的fd,父进程进行写入close(pipefd[0]);
string message ="我是父进程,我正在给你发消息";char send_buffer[1024];int count =0;while(true){// 构造变化的字符串snprintf(send_buffer,sizeof(send_buffer),"%s[%d] : %d", message.c_str(),getpid(), count++);write(pipefd[1], send_buffer,strlen(send_buffer));sleep(1);if(count ==10){
cout <<"writer quit(father)"<< endl;break;}}close(pipefd[1]);
pid_t ret =waitpid(id,nullptr,0);// 阻塞等待assert(ret >0);(void)ret;return0;}
读快写满和读慢写快的两种情况,大家可以自己尝试一下!
mini版进程池的实现
实现思路:首先先定义一些任务并将这些任务加载。然后创建管道文件和子进程,将子进程的写端关闭并等待父进程派发任务(父进程向管道文件中写入数据就是某个子进程派发任务)。如果父进程没有给子进程派发任务的话,子进程只能阻塞等待(对应写满读快的情况)。注:该进程池是单机的负载均衡。
// hpp为后缀的文件既有函数的声明又有函数的定义// Task.hpp#pragmaonce#include<iostream>#include<string>#include<vector>#include<unistd.h>#include<functional>#include<map>// 包装器:定义一种函数类型,其返回值为void,没有参数// using func = std::function<void()> ; // C++11的做法typedef std::function<void()> func;// callBacks存的是函数类型,也就是任务
std::vector<func> callBacks;// desc是任务的下标和任务的描述
std::map<int, std::string> desc;voidreadMySQL(){
std::cout <<"sub process["<<getpid()<<"] 执行访问数据的任务\n"<< std::endl;}voidexeculUrl(){
std::cout <<"sub process["<<getpid()<<"] 执行URL解析\n"<< std::endl;}voidcal(){
std::cout <<"sub process["<<getpid()<<"] 执行加密任务\n"<< std::endl;}voidsave(){
std::cout <<"sub process["<<getpid()<<"] 执行数据持久化任务\n"<< std::endl;}// 加载任务voidload(){
desc[callBacks.size()]="readMySQL: 读取数据库";
callBacks.push_back(readMySQL);
desc[callBacks.size()]="execulUrl: 进行URL解析";
callBacks.push_back(execulUrl);
desc[callBacks.size()]="cal: 进行加密计算";
callBacks.push_back(cal);
desc[callBacks.size()]="save: 进行数据的文件保存";
callBacks.push_back(save);}// 展示任务列表voidshowHandler(){for(constauto& kv : desc){
std::cout << kv.first <<"\t"<< kv.second << std::endl;}}// 返回任务的个数inthandlerSize(){return callBacks.size();}// ProcessPool.cc#include<iostream>#include<vector>#include<cstdlib>#include<ctime>#include<cassert>#include<unistd.h>#include<sys/types.h>#include<sys/wait.h>#include"Task.hpp"usingnamespace std;#definePROCESS_NUM5// 如果父进程没有给子进程派发任务,子进程就阻塞等待任务intwaitCommand(int waitFd,bool& quit){uint32_t command =0;
ssize_t s =read(waitFd,&command,sizeof(command));// 写端退出,读端读到0,则写端也要退出if(s ==0){
quit =true;return-1;}assert(s ==sizeof(uint32_t));// 要求必须读到4给字节return command;}voidsendAndWakeup(pid_t who,int fd,uint32_t command){write(fd,&command,sizeof(command));// 父进程通过fd唤醒子进程并给它派发任务desc[command]
cout <<"father process call child process["<< who <<"] execul "<< desc[command]<<" through "<< fd << endl;}intmain(){// 加载任务load();// pid_t是子进程的id, int是写端的fd
vector<pair<pid_t,int>> slots;// 先创建多个进程for(int i =0; i < PROCESS_NUM;++i){// 创建管道int pipefd[2]={0};int n =pipe(pipefd);assert(n ==0);// 判断管道是否创建成功(void)n;// 创建子进程
pid_t id =fork();assert(id !=-1);// 判断子进程是否创建成功// child processif(id ==0){// 关闭子进程的写端close(pipefd[1]);// 子进程等待父进程派发任务while(true){// false表示父进程的写端没有关闭bool quit =false;// 如果父进程不派发任务,子进程就阻塞等待int command =waitCommand(pipefd[0], quit);// 父进程的写端关闭,子进程的读端也要退出if(quit)break;// 执行对应的任务if(command >=0&& command <handlerSize())
callBacks[command]();else
cout <<"非法command: "<< command << endl;}
cout <<"sender quit, receiver quit too!!!"<< endl;close(pipefd[0]);exit(0);}// father process// 关闭父进程的读端,将子进程的id和父进程的写端保存到slots中close(pipefd[0]);
slots.push_back(make_pair(id, pipefd[1]));}// 父进程随机给子进程派发任务srand((unsignedint)(time(nullptr)^getpid()^2023222));// 让数据源更随机int count =0;// 父进程给子进程总共派发5个任务后,关闭父进程的所有写端while(true){// 随机选取一个任务int command =rand()%handlerSize();// 随机选取一个子进程,随机数方式的负载均衡int choice =rand()% slots.size();// 把任务派发给指定的进程sendAndWakeup(slots[command].first, slots[command].second, command);sleep(1);++count;if(count ==5){
cout <<"父进程的任务全部派发完成"<< endl;break;}// 下方的代码是用户指定做哪一个任务// int select;// int command;// cout << endl;// cout << "############################################" << endl;// cout << "# 1. show funcitons 2.send command #" << endl;// cout << "############################################" << endl;// cout << "Please Select> ";// cin >> select;// if (select == 1)// showHandler();// else if (select == 2)// {// cout << "Enter Your Command> ";// // 选择任务// cin >> command;// // 选择进程// int choice = rand() % slots.size();// // 把任务给指定的进程// sendAndWakeup(slots[choice].first, slots[choice].second, command);// sleep(1);// }// else// {// cout << "select error!" << endl;// continue;// }}// 关闭fd, 所有的子进程都会读到0,关闭读端并退出for(constauto&slot : slots){close(slot.second);}// 等待子进程for(constauto&slot : slots){waitpid(slot.first,nullptr,0);}return0;}
随机派发任务
用户派发指定任务
命名管道
匿名管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。如果我们想在不相关的进程之间交换数据,可以使用 FIFO 文件来做这项工作,它经常被称为命名管道,命名管道是一种特殊类型的文件。
命令行创建管道文件
while:;doecho"hello world";sleep1;done> name_pipe #向命名管道写入数据cat< name_pipe #读取命名管道中的数据
命名管道就是有名字的管道文件,如上图所示。命名管道主要用于没有任何血缘关系的两个进程进行通信。创建命名管道文件的接口如下:
- int mkfifo(const char *pathname, mode_t mode);
- pathname 是命名管道所在的路径和命名管道的名字,如果是在当前路径下创建管道文件,只需要提供管道文件的名字即可。如果不是,需要指明管道文件所处的路径。
- mode 是管道文件的权限。
模拟客户端和服务端
# Makefile
.PHONY:all
all: server client
server:server.cxx
g++ $^ -o$@-std=c++11
client:client.cxx
g++ $^ -o$@-std=c++11
.PHONY:clean
clean:
rm-f client server
// Log.hpp#ifndef_LOG_H_#define_LOG_H_#include<iostream>#include<ctime>// 日志信息等级#defineDebug0#defineNotice1#defineWarning2#defineError3const std::string msg[]={"Debug","Notice","Warning","Error"};// 输入日志信息的函数
std::ostream&Log(std::string message,int level){
std::cout <<" | "<<(unsigned)time(nullptr)<<" | "<< msg[level]<<" | "<< message;return std::cout;}#endif// Common.hpp#ifndef_COMM_H_#define_COMM_H_#include<iostream>#include<string>#include<cstdio>#include<cstring>#include<unistd.h>#include<sys/types.h>#include<sys/stat.h>#include<sys/wait.h>#include<fcntl.h>#include"Log.hpp"usingnamespace std;#defineMODE0666// 权限#defineSIZE128// 缓冲区大小
string ipcPath ="./fifo.ipc";#endif// server.cxx#include"Comm.hpp"intmain(){// 1. 创建管道文件if(mkfifo(ipcPath.c_str(), MODE)<0){perror("mkfifo");exit(1);}Log("创建管道文件成功", Debug)<<"step 1"<< endl;// 2. 正常的文件操作int fd =open(ipcPath.c_str(), O_RDONLY);if(fd <0){perror("server open");exit(2);}Log("打开管道文件成功", Debug)<<"step 2"<< endl;// 3. 编写正常的通信代码char buffer[SIZE];while(true){memset(buffer,'\0',sizeof(buffer));
ssize_t s =read(fd, buffer,sizeof(buffer)-1);if(s >0){
cout <<"client say: "<< buffer << endl;}elseif(s ==0)// 客户端退出{// end of file
cerr <<"read end of file, client quit, server quit too!!!"<< endl;break;}else{perror("read error");exit(3);}}// 4. 关闭管道文件close(fd);Log("关闭管道文件成功", Debug)<<"step 3"<< endl;// 5. 删除管道文件unlink(ipcPath.c_str());Log("删除管道文件成功", Debug)<<"step 4"<< endl;return0;}// client.cxx#include"Comm.hpp"intmain(){// 1. 获取管道文件int fd =open(ipcPath.c_str(), O_WRONLY);if(fd <0){perror("client open");exit(1);}// 2. IPC过程
string buffer;while(true){
cout <<"Please Enter Message :>";getline(cin, buffer);write(fd, buffer.c_str(), buffer.size());}// 3. 关闭管道文件close(fd);return0;}
注:client 是客户端,客户端向管道文件写入数据,也就是给服务端发信息;server 是服务端,服务端读取管道文件的数据,接收客户端发过来的信息。管道文件只要在服务端创建接口,客户端不需要创建管道文件。Ctrl + Backspace 可以删除字符。
服务端有多个子进程竞争客户端发来的信息
# Makefile
.PHONY:all
all: multiServer client
multiServer:server.cxx
g++ $^ -o$@-std=c++11
client:client.cxx
g++ $^ -o$@-std=c++11
.PHONY:clean
clean:
rm-f client multiServer
// server.cxx#include"Comm.hpp"staticvoidgetMessage(int fd){char buffer[SIZE];while(true){memset(buffer,'\0',sizeof(buffer));
ssize_t s =read(fd, buffer,sizeof(buffer)-1);if(s >0){
cout <<"["<<getpid()<<"] "<<"client say> "<< buffer << endl;}elseif(s ==0)// 客户端退出{// end of file
cerr <<"["<<getpid()<<"] "<<"read end of file, client quit, server quit too!!!"<< endl;break;}else{perror("read error");exit(3);}}}intmain(){// 1. 创建管道文件if(mkfifo(ipcPath.c_str(), MODE)<0){perror("mkfifo");exit(1);}Log("创建管道文件成功", Debug)<<"step 1"<< endl;// 2. 正常的文件操作int fd =open(ipcPath.c_str(), O_RDONLY);if(fd <0){perror("server open");exit(2);}Log("打开管道文件成功", Debug)<<"step 2"<< endl;int nums =3;for(int i =0; i < nums;++i){
pid_t id =fork();if(id ==0){// 3. 编写正常的通信代码getMessage(fd);exit(1);}}for(int i =0; i < nums;++i){// -1表示等待任意一个子进程waitpid(-1,nullptr,0);}// 4. 关闭管道文件close(fd);Log("关闭管道文件成功", Debug)<<"step 3"<< endl;// 5. 删除管道文件unlink(ipcPath.c_str());Log("删除管道文件成功", Debug)<<"step 4"<< endl;return0;}
匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由 mkfifo 函数创建,打开用 open。
- FIFO(命名管道)与 pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。
👉总结👈
本篇博客主要讲解了什么是进程间通信、进程间通信的目的、什么是管道、管道的原理、匿名管道、管道的特点、命名管道等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️
版权归原作者 阿亮joy. 所有, 如有侵权,请联系我们删除。