提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
前言
世上有两种耀眼的光芒,一种是正在升起的太阳,一种是正在努力学习编程的你!一个爱学编程的人。各位看官,我衷心的希望这篇博客能对你们有所帮助,同时也希望各位看官能对我的文章给与点评,希望我们能够携手共同促进进步,在编程的道路上越走越远!
提示:以下是本篇文章正文内容,下面案例可供参考
一、进程为什么要通信?
进程也是需要某种协同的,所以如何协同的前提条件 --- 通信 --- 数据是有类别的 --- 通知就绪的,单纯的要传递给我的数据,控制相关的信息...
事实:进程是具有独立性的。进程 = 内核数据结构 + 代码和数据
父子进程,父进程的数据会被子进程继承下去,父进程的数据交给子进程,子进程是只读不能修改,且不能一直传递信息,所以父子进程不属于通信。(能继承,但是不能一直继承,因为有写实拷贝)。
能传递信息和一直能传递信息,是属于两个概念。
二、进程间通信目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。
三、进程如何通信?
- a、进程间通信,成本可能会稍微高一些!
- b、进程间通信的前提,先让不同的进程,看到同一份(操作系统)资源(“一段内存”)!
我们要的通信是一直可以通信。
每个进程开辟的空间都是属于自己的。
所以得让两个进程之间完成通信,需要加入第三方(OS)
如何让操作系统创建这份资源呢?
- 一定是某一个进程先需要通信,让OS创建一个共享资源;
- OS必须提供很多的系统调用接口。
OS创建的共享资源的不同,系统调用接口的不同 --- 进程间通信会有不同的种类!
联想父母吵架,父亲要孩子当第三方去调节。
进程通过OS系统调用接口,让OS在内存中创建一份公共资源
三、进程间通信的常见方式是什么?
进程间通信发展
- System V进程间通信
- POSIX进程间通信
- 管道
进程间通信分类
- 匿名管道pipe
- 命名管道
System V IPC
- System V 消息队列 --- ×
- System V 共享内存
- System V 信号量
POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
直接复用内核代码直接通信呢?命名管道、匿名管道
四、管道 --- 匿名管道
一个文件要被打开两次,struct file文件对象要被创建两个;比如:先以读的方式打开,那么分配的文件描述符是3;再以写的方式打开,分配的文件描述符是4.
struct file与操作有关
struct file文件对象当中包含这么几个重要的东西:
- 1、文件所有的属性inode(指针指向真正的inode),inode中加载文件的属性;
- 2、打开文件的方式,如:读、写等;
- 3、要有一张操作方法集合的表;
- 4、内核级文件缓冲区(加载文件的内容)
如果一个文件以读的方式打开一次,再以写的方式打开一次,那么打开第二次时,inode、flag、操作方法的集合、内核级的缓冲区是不需要再加载到内存中的。
父进程创建子进程:进程具有独立性,但是文件系统是不需要满足独立性的。
将进程控制块和文件描述符表拷贝一份给子进程。文件描述符表是浅拷贝。
所有的进程在命令行当中,都是bash的子进程,也就意味着只要bash打开了,所有的子进程都打开了。
文件描述符表后面的文件系统相关的内容,都是父进程让OS做的,例如:上面的就是调用open()系统调用接口让OS做的,在调用fork()创建子进程,让子进程拷贝一份进程控制块和文件描述符表。
让多个进程看到OS创建的同一份资源,叫做管道文件。
管道文件只允许单向通信 --- 简单(父 -> 子;子 -> 父)
例如:我们想要父进程进行读取,让子进程进行写入,历史上3号文件描述符是读的方式打开的文件,4号文件描述符是写的方式打开的;父进程只进行读,那么父进程就会关闭不需要的4号文件描述符;子进程只进行写,那么子进程就会关闭不需要的3号文件描述符;那么父子进程就可以看到同一块文件的内核级的缓冲区了;那么父进程就能通过3号文件描述符找到struct file对象,来访问文件的内核级的缓冲区进行读取;子进程就能通过4号文件描述符找到struct file对象,来访问文件的内核级的缓冲区进行写;那么双方就可以写入同一个管道文件了。
之前我们学到是进程写入文件当中的内容,要刷新到磁盘当中的;但是进程间的通信就不需要再刷新到磁盘当中了,我们要的是把一个进程的数据交给另一个进程;所以我们得重新设计通信接口。
man pipe 创建管道的系统调用(底层就是open,与open的区别是指定文件不需要带文件路径和文件名,所以这种创建的文件叫匿名文件或匿名管道)
#include <unistd.h>
功能:创建一匿名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组, 其中fd[0]表示读端, fd[1]表示写端
返回值 : 成功返回0,失败返回错误代码
int pipe(int pipefd[2])
输出型参数,将以读和以写的方式打开的文件,在文件描述符数组中,以文件描述符的形式带出来
这种方式打开的管道文件中间的文件系统这部分。
父子既然要关闭不需要的fd,为什么曾经要打开呢?可以不关闭吗?
假如:
- 父进程以读的方式打开文件,未来子进程继承之后,打开文件也是以读的方式打开,一个管道不能同时存在两个读;存在两个读的方式,不能通信了;
- 那么父进程以读写的方式打开文件,子进程也将有读写的方式,不过这样违反了管道文件的单向通信,因为当父进程在写入文件的话,子进程也有写的权限,子进程也会误写;
- 可以不关闭,但是容易发生误操作,建议关闭不需要的文件描述符。
文件描述符是以数组的形式打开的,也就注定了,一个进程能打开的文件描述符是有上限的。(文件描述符的数组的资源是有上限的)
为什么管道是单向通信的呢?
刚开始的人们设计管道的时候,初心就是复用代码,复用代码的根本就是为了简单,为了快速,为了减少成本;如果父进程往管道里写入内容,子进程也往管道里写入内容,父进程和子进程都可以往管道里读,这注定了会带来一个问题,父进程写的数据是要交给子进程的,子进程读的消息一定是父进程的,父子进程的信息混在管道里;所以要将父子进程在管道里的信息区分开来;因为初心是复用代码,为了简单,所以只让管道进行单向通信。
曾经任何一个文件将数据写在缓冲区里,在将数据刷新到文件里,本身就是一个单向通信。将进程中的信息通过管道通信到另一个进程中,复用的就是文件的基本特征的代码。
如果我想双向通信呢?两个管道
五、写一个管道通信的代码
#include <iostream>
#include <string>
#include <cerrno> // C++版本的errno.h
#include <cstring> // C++版本的string.h
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
// fork之后子进程是能拿到父进程的数据的 --- 通信吗?写时拷贝,对方都看不到
// char buffer[1024]; // 不行的
const int size = 1024;
// 获取动态信息
std::string getOtherMessage()
{
static int cnt = 0;// 计数器
std::string messageid = std::to_string(cnt); // stoi -> string 转换成 int
cnt++;
pid_t self_id = getpid();// 获取自己的pid
std::string stringpid = std::to_string(self_id);
std::string message = "messageid: ";
message += messageid;
message += " my pid is : ";
message += stringpid;
return message;
}
// 子进程进行写入
void SubProcessWrite(int wfd)
{
int pipesize = 0;// 管道文件对应的文件级缓冲区的大小
std::string message = "father, I am your son prcess!";// 固定的消息
char c = 'A';
while (true)
{
std::cerr << "+++++++++++++++++++++++++++++++++" << std::endl;
std::string info = message + getOtherMessage(); // 这条消息,就是我们子进程发给父进程的消息
// write:系统调用接口
write(wfd, info.c_str(), info.size()); // 写入管道的时候,没有写入\0, 有没有必要?没有必要
std::cerr << info << std::endl;
// sleep(1); // 子进程写慢一点
// write(wfd, &c, 1);
// 计算管道文件对应的文件级缓冲区的大小
// std::cout << "pipesize: " << ++pipesize << " write charator is : "<< c++ << std::endl;
// // if(c == 'G') break;
// sleep(1);
}
std::cout << "child quit ..." << std::endl;
}
// 父进程进行读取
void FatherProcessRead(int rfd)
{
char inbuffer[size]; // c99 , gnu g99
while (true)
{
sleep(2);
std::cout << "-------------------------------------------" << std::endl;
// sleep(500);
// ssize_t其实就是int;从rfd文件中读取字符串放入inbuffer数组中;返回值n是读取到的有效字符串的个数
ssize_t n = read(rfd, inbuffer, sizeof(inbuffer) - 1); // 是sizeof(inbuffer)不是strlen(inbuffer);
if (n > 0)
{
// 推荐做法:曾经不写'\0',后来再加'\0'
inbuffer[n] = 0; // == '\0',n:最后一个有效字符的下一个位置
std::cout << inbuffer << std::endl;
}
else if (n == 0)
{
// 如果read的返回值是0,表示写端直接关闭了,我们读到了文件的结尾
std::cout << "client quit, father get return val: " << n << " father quit too!" << std::endl;
break;
}
else if (n < 0)
{
std::cerr << "read error" << std::endl;
break;
}
// sleep(1);
// break;
}
}
int main()
{
// 1. 创建管道(必须的有两个元素的数组)
int pipefd[2];
int n = pipe(pipefd); // 输出型参数,rfd, wfd,未来读和写两个文件描述符会被放入pipefd数组中
if (n != 0)
{
std::cerr << "errno: " << errno << ": "
<< "errstring : " << strerror(errno) << std::endl;
return 1;
}
// pipefd[0]->0->r(嘴巴 - 读) pipefd[1]->1->w(笔->写)
// 0下标对应的永远是读端;1下标对应的永远是写端
std::cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << std::endl;
sleep(1);
// 上面对应的也是父进程
// 2. 创建子进程
pid_t id = fork();
if (id == 0)
{
std::cout << "子进程关闭不需要的fd了, 准备发消息了" << std::endl;
sleep(1);
// 子进程 --- write
// 3. 关闭不需要的fd
close(pipefd[0]);
// if(fork() > 0) exit(0);
// 如果大于0,就是孙子进程,让原先的子进程退出;下来的代码由孙子进程执行;爷孙进程之间也能通信
SubProcessWrite(pipefd[1]);
close(pipefd[1]);
exit(0);
}
std::cout << "父进程关闭不需要的fd了, 准备收消息了" << std::endl;
sleep(1);
// 父进程 --- read
// 3. 关闭不需要的fd
close(pipefd[1]);
FatherProcessRead(pipefd[0]);
std::cout << "5s, father close rfd" << std::endl;
sleep(5);
close(pipefd[0]);
int status = 0;// 获取子进程的退出信息
pid_t rid = waitpid(id, &status, 0);
if (rid > 0)
{
// status是一个位图结构,子进程的低七位表示当前进程收到的信号;次低八位对应的是子进程的退出码
std::cout << "wait child process done, exit sig: " << (status & 0x7f) << std::endl;
std::cout << "wait child process done, exit code(ign): " << ((status >> 8) & 0xFF) << std::endl;
}
return 0;
}
管道的4中情况:
- 如果管道内部是空的 && write fd 没有关闭,读取条件不具备,读进程会被阻塞 -- wait -> 读取条件具备(写入数据);
- 管道被写满 && read fd不读且没有关闭,管道被写满,写进程会被阻塞(管道被写满 -- 写条件不具备) --- wait --写条件具备(读取数据);
- 管道一直在读 && 写端关闭了wfd,读端read返回值会读到0,表示读到了文件结尾;
- rfd直接关闭,写端wfd一直在进行写入?OS认为此时的管道是坏的管道(broken pipe),OS不做浪费时空的事情。因此OS会杀掉对应的进程,会给写端对应的目标进程发送信号:13)SIGPIPE。写端进程会被操作系统直接使用13号信号关掉。相当于进程出现了异常。
管道的5种特征:
- 匿名管道:只用来进行具有血缘关系的进程之间,进行通信,常用与父子进程之间通信;
- 管道内部,自带进程之间同步的机制(多执行流执行代码的时候,具有明显的顺序性);
- 管道文件的声明周期是随进程的;
- 管道文件在通信的时候,是面向字节流的,write的次数和读取的次数不是一一匹配的;
- 管道的通信模式,是一种特殊的半双工模式。
面向字节流:比如:子进程写端不断的在管道文件里写入数据,父进程读端隔两秒便在在管道文件里读取数据,读端会一次性读取非常多的数据,但是读端还要进行解析,因为要使用的是一条完整的消息,读到两秒处的消息不一定是完整的消息。(就像自来水管道一样,一端水不断流进管道,一端水不断流出管道)
- 半双工:你可以说话,我也可以说话,但是我们两个不能同时说话。管道通信属于一种特殊的半双工。
- 管道只能单向通信,所以是特殊的;半双工是双向的,只不过不能同时进行。
- 全双工:吵架的时候。
当shell在执行用管道链接起来的多个命令时,shell内部会把管道连接起来的命令各自变成一个进程,这几个进程是同时启动的。这几个进程之间的关系叫做兄弟关系,它们的父进程都是bash。
六、写一个进程池的代码
- 管道里面没有数据,子进程就在阻塞等待,等待任务的到来。
- 虽然父子进程可能因为写实拷贝,导致数据不一致,但是子进程在父进程拷贝下来的代码是同一份的。
- 父进程可以提前在代码中创建一些任务(函数),这些任务函数都放在一张函数指针数组表中,父进程只需要给每个管道输入不同的固定长度的4字节的数组下标,也叫任务码;每个子进程从管道中读取各自的任务码,从而执行不同的函数,实现进程之间的协同。
ProcessPool.cpp
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
// 子进程不断的从管道文件中读取内容
//void work(int rfd)
//{
// while (true)
// {
// int command = 0;// 个人理解,和scanf()的用法一致
// // 从管道文件的读端读取管道的内容,放入command变量中,读取的内容大小是command
// int n = read(rfd, &command, sizeof(command));
// // 写了4个字节,必须读取的也是4个字节
// if (n == sizeof(int))
// {
// std::cout << "pid is : " << getpid() << " handler task" << std::endl;
// ExcuteTask(command);
// }
// else if (n == 0)
// {
// std::cout << "sub process : " << getpid() << " quit" << std::endl;
// break;
// }
// }
//}
// master:用一个类描述管道
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string& name)
: _wfd(wfd), _subprocessid(id), _name(name)
{
}
int GetWfd() { return _wfd; }
pid_t GetProcessId() { return _subprocessid; }
std::string GetName() { return _name; }
void CloseChannel()
{
close(_wfd);// 关闭写端
}
void Wait() // 回收子进程
{
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if (rid > 0)
{
std::cout << "wait " << rid << " success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;// 通过_wfd文件描述符向指定管道里写
pid_t _subprocessid;// 子进程id
std::string _name;// 管道的名字
};
// 形参类型和命名规范
// const &: 输入参数
// & : 输入输出型参数
// * : 输出型参数
// 创建信道和子进程
// task_t task: 回调函数(每一次创建一个子进程,所有的子进程都帮我们去调用设置的这个方法)
void CreateChannelAndSub(int num, std::vector<Channel>* channels, task_t task)
{
// BUG? --> fix bug
for (int i = 0; i < num; i++)
{
// 1. 创建管道
int pipefd[2] = { 0 };
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 2. 创建子进程
pid_t id = fork();
if (id == 0)
{
// child - read
close(pipefd[1]);
dup2(pipefd[0], 0);
// 将管道的读端,重定向到标准输入:本来是在文件描述符放的是读端的文件对象的地址,
// 现在将读端的文件对象的地址存放入文件描述符0里面
//work(pipefd[0]);
task();
close(pipefd[0]);
exit(0);
}
// 3.构建一个channel名称
std::string channel_name = "Channel-" + std::to_string(i);
// 父进程
close(pipefd[0]);
// a. 子进程的pid b. 父进程关心的管道的w端
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
// 选择一个信道和进程:0 1 2 3 4 channelnum
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
// 发送任务码
void SendTaskCommand(Channel& channel, int taskcommand)
{
// 通过文件描述符写端对应的管道里写,写任务码的内容
write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
// 通过channel控制子进程(只做一次)
void ctrlProcessOnce(std::vector<Channel>& channels)
{
sleep(1);
// a. 选择一个任务
int taskcommand = SelectTask();
// b. 选择一个信道和进程
int channel_index = NextChannel(channels.size());
// c. 发送任务
// 向指定的信道当中发送指定的任务码
SendTaskCommand(channels[channel_index], taskcommand);
std::cout << std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: "
<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
// 通过channel控制子进程(控制指定的次数)
void ctrlProcess(std::vector<Channel>& channels, int times = -1)
{
if (times > 0)
{
while (times--)
{
ctrlProcessOnce(channels);
}
}
else
{
while (true)
{
ctrlProcessOnce(channels);
}
}
}
// 回收管道和子进程.
void CleanUpChannel(std::vector<Channel>& channels)
{
// 所以我们可以倒着从尾部开始关闭
// int num = channels.size() -1;
// while(num >= 0)
// {
// channels[num].CloseChannel();
// channels[num--].Wait();
// }
// a. 关闭所有的写端
for (auto& channel : channels)
{
channel.CloseChannel();
//channel.Wait();
}
// 注意:b. 回收子进程
for (auto &channel : channels)
{
channel.Wait();
}
}
// ./processpool 5
int main(int argc, char* argv[])
{
// argc<2说明管道不够用
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
return 1;
}
// num:管道的数目
int num = std::stoi(argv[1]);// 将字符串转换成整数
LoadTask();// 把所有的任务进行对应的装载
// 用vector将管道管理起来
std::vector<Channel> channels;
// 1. 创建信道和子进程
CreateChannelAndSub(num, &channels, work);// 日后子进程回调其它的函数,可以直接修改参数3
// 2. 通过channel控制子进程
ctrlProcess(channels, 5);
// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程
CleanUpChannel(channels);
// sleep(100);
return 0;
}
Task.hpp
.hpp也是C++当中的一种头文件,允许声明和实现写在一个文件里
#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3 // 任务码设为3
typedef void (*task_t)(); // task_t 函数指针类型
void Print()
{
std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
std::cout << "I am a download task" << std::endl;
}
void Flush()
{
std::cout << "I am a flush task" << std::endl;
}
// 函数指针数组
task_t tasks[TaskNum];
void LoadTask()
{
// 种一棵随机数种子,一方面是以时间为种子,另一方面以进程的pid为种子,还以17777为种子,让随机数变的更随机
srand(time(nullptr) ^ getpid() ^ 17777);
tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}
// 执行任务
void ExcuteTask(int number)
{
if (number < 0 || number > 2)
return;
tasks[number]();// 调用函数
}
// 随机选择一个任务的下标
int SelectTask()
{
return rand() % TaskNum;// 0、1、2
}
void work()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process : " << getpid() << " quit" << std::endl;
break;
}
}
}
void work1()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process : " << getpid() << " quit" << std::endl;
break;
}
}
}
void work2()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process : " << getpid() << " quit" << std::endl;
break;
}
}
}
我们要的是保留父进程的写端4号,子进程的读端3号。
我们刚开始是在父进程中在文件描述符中3号和4号,分别以读和写的方式打开文件,关闭不需要的4号读端;子进程1拷贝的是父进程的PCB和文件描述符表,所以子进程保留4号读端,关闭写端;父进程在创建子进程2,父进程以3号读端和5号写端打开文件,子进程会拷贝下来父进程的3号读端、5号写端和4号指向第一个管道文件的写端,接着父进程关闭3号读端,子进程关闭5号写端,保留下来的是3号读端和4号指向第一个管道文件的读端;那么依次类推下去,到父进程创建第10个子进程的时候,以写端方式指向第一个管道文件的就有10个。
如果关闭写端和回收子进程写在同一个循环当中的话,第一个管道文件只会被关闭一个写端,还剩9个写端,依次类推下去,只有最后一个子进程是成功回收的,上面的9个子进程会处于僵尸状态,等待写入信息,回收子进程失败。
方法一:
所以我们将关闭写端和回收子进程放入两个循环中,第一个循环关闭写端,每一个子进程文件描述符表中指向自己的管道文件都会关闭,最后一个子进程指向的管道文件的写端会全部关闭(只有一个);第二个循环,回收最后子进程成功,那么这个子进程的文件描述符表中的内容都会被释放,如此,倒数第二个管道文件的写端也将全部关闭,倒数第二个子进程回收成功;依次类推下去,相当于逆递归的方式回收之前所有的子进程。
方法二:
总结
好了,本篇博客到这里就结束了,如果有更好的观点,请及时留言,我会认真观看并学习。
不积硅步,无以至千里;不积小流,无以成江海。
版权归原作者 2301_79585944 所有, 如有侵权,请联系我们删除。