0


【Linux】进程间通信

👀樊梓慕:个人主页****

** 🎥个人专栏:《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C++》《Linux》《算法》**********

🌝每一个不曾起舞的日子,都是对生命的辜负


前言

由于进程间具有独立性,他们之间是不能直接访问获取甚至修改对方的数据的,但是在实际需求中,我们需要进程之间进行交互,所以必须开发某种方式使进程间可以建立联系。

在最开始,进程间通信最原始的方式就是利用管道,当然随之也有诸多不同的进程间通信的标准,接下来就让我们一起来学习下吧!


*欢迎大家📂收藏*📂以便未来做题时可以快速找到思路,巧妙的方法可以事半功倍。 **

=========================================================================

GITEE相关代码:****🌟樊飞 (fanfei_c) - Gitee.com🌟

=========================================================================


1.进程间通信

1.1进程间通信的目的

  • 数据传输:一个进程需要将它的数据发送给另一个进程。
  • 资源共享:多个进程之间共享同样的资源。
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

总结:我们往往需要多个进程协同完成一些事情。

1.2进程间通信的方式

1.2.1一般规律

进程间通信的本质就是让不同的进程看到同一份资源,我们需要一块空间作为交换数据的『 空间』,但是为了保证进程间的独立性,这块『 空间』不能由通信双方任何一个提供,所以我们就需要一个第三方来提供,这个第三方一般就是操作系统。

1.2.2具体做法

操作系统提供的『 空间』有不同的样式,就决定了有不同的通信方式。

本篇文章后面提到的共享内存和信号量都是基于『 System V IPC』的通信方案,这种是本地主机的进程间通信,在当前其实也已经并不流行,现在一般使用网络的进程间通信『 POSIX IPC』方案,当然这里我们只是以学习为目的。

  • 管道(匿名、命名);
  • 共享内存;
  • 消息队列(本篇文章略);
  • 信号量;

进程间通信分类

管道

  • 匿名管道
  • 命名管道

System V IPC

  • System V 消息队列
  • System V 共享内存
  • System V 信号量

POSIX IPC

  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁

2.管道

2.1命令行上的管道

管道是Unix中最古老的进程间通信的形式,我们把从一个进程连接到另一个进程的数据流称为一个“管道”。

例如,统计我们当前使用云服务器上的登录用户个数:

其中,who命令和wc命令都是两个可执行程序,当它们运行起来后就变成了两个进程,who进程通过标准输出将数据送到『 管道』当中,wc进程再通过标准输入从『 管道』当中读取数据,至此便完成了数据的传输。

补充:who命令用于查看当前云服务器的登录用户(一行显示一个用户),wc -l用于统计当前的行数。

2.2匿名管道

2.2.1匿名管道的原理

一个task_struct存有一个文件结构体指针*files,指向file结构体,file结构体中存在一份文件描述符表,表中存储着指向文件流的指针:

当父进程创建子进程时,子进程会生成一份自己的文件描述符表,文件描述符表中的内容都是拷贝自父进程,也就是说通过这样的方式,父子进程拥有了一份共享的数据区:

但是子进程的文件流与父进程的文件流相同,比如父进程是以读的方式打开的文件流,那相应的子进程与父进程相同也只能是以读方式打开,这也并不能达到我们进程间通信的目的。

其实我们可以这样操作,让父进程打开某个文件两次,并且分别以读和写的方式打开,然后子进程继承父进程,也拥有两个文件流,之后关闭父进程中其中一个文件流,再关闭子进程中的一个文件流,比如关闭的是父进程的读文件流和子进程的写文件流,此时父进程就是向该『 共享空间』 写的一方,而子进程就是从该『 共享空间』读的一方,通过这样的方式,我们就完成了进程间通信。

管道只能被设计成单向通信。

由于匿名管道这种方式是通过子进程继承父进程数据来达到的进程间通信的目的,所以匿名管道只支持带有『 血缘关系』的进程间通信。

以上这种方式很明显是基于文件的,让不同进程看到同一份资源的通信方式,这种通信方式就是管道。

2.2.2系统调用pipe()

为了支持管道通信,操作系统提供了一个系统调用:pipe();

pipe函数用于创建匿名管道,pipe函数的函数原型如下:

int pipe(int pipefd[2]);

pipe函数的参数是一个输出型参数,数组pipefd用于返回两个指向管道读端和写端的文件描述符:
数组元素含义pipefd[0]管道读端的文件描述符pipefd[1]管道写端的文件描述符
pipe函数调用成功时返回0,调用失败时返回-1。

通过上面的学习,我们来尝试一下进行父子进程间的通信:

验证代码:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

//写函数
void writer(int wfd)
{
    const char *str = "hello father, I am child";
    char buffer[128];
    int cnt = 0;
    pid_t pid = getpid();
    while(1)
    {
        //向buffer中写入字符串
        snprintf(buffer, sizeof(buffer), "message: %s, pid: %d, count: %d\n", str, pid, cnt);
        //向wfd描述符中写入字符串buffer
        write(wfd, buffer, strlen(buffer));
        cnt++;
        sleep(1);
    }
}

//读函数
void reader(int rfd)
{
    char buffer[1024];
    while(1)
    {
        //从rfd描述符中读取字符串到buffer
        ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
        (void)n;//去除未使用变量n的警告
        printf("father get a message: %s", buffer);
    }
}

int main()
{
    int pipefd[2];
    int n = pipe(pipefd);
    if(n < 0) return 1;
    printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0]/*read*/, pipefd[1]/*write*/); // 3, 4

    pid_t id = fork();
    if(id == 0)
    {
        //child: w
        close(pipefd[0]);

        writer(pipefd[1]);

        exit(0);
    }

    // father: r
    close(pipefd[1]);

    reader(pipefd[0]);
    wait(NULL);

    return 0;
}

验证结果:


2.2.3管道通信的4种特殊情况

  1. 写端进程不写,并且管道内部无数据,那么此时对应的读端进程就会阻塞等待,直到管道里面有数据。
  2. 读端进程不读,并且管道内部被写满,那么此时对应的写端进程就会阻塞等待,直到管道当中的数据被读端进程读取走。
  3. 写端进程将数据写完后将写端关闭,那么读端进程会将管道当中的数据读完,返回值为0,表示读结束。
  4. 读端进程不读并且关闭,而写端进程还在一直向管道写入数据,那么操作系统会通过『 信号13:SIGPIPE 』将写端进程杀掉。

2.2.4管道通信的4种特性

  1. 自带同步与互斥机制,两个进程必须要按照某种次序来对管道进行操作,同步的任务之间有明确的顺序关系;
  • 同步: 两个或两个以上的进程在运行过程中协同步调,按预定的先后次序运行。比如,A任务的运行依赖于B任务产生的数据。
  • 互斥: 一个公共资源同一时刻只能被一个进程使用,多个进程不能同时使用公共资源。
  1. 面向字节流,对于进程A写入管道当中的数据,进程B每次从管道读取的数据的多少是任意的,这种被称为流式服务;
  2. 管道的生命周期随进程,管道本质上是通过文件进行通信的,也就是说管道依赖于文件系统,那么当所有打开该文件的进程都退出后,该文件也就会被释放掉,所以说管道的生命周期随进程;
  3. 管道是半双工通信的,即单向通信。

在数据通信中,数据在线路上的传送方式可以分为以下三种:

  • 单工通信(Simplex Communication):单工模式的数据传输是单向的。通信双方中,一方固定为发送端,另一方固定为接收端。
  • 半双工通信(Half Duplex):半双工数据传输指数据可以在一个信号载体的两个方向上传输,但是不能同时传输。
  • 全双工通信(Full Duplex):全双工通信允许数据在两个方向上同时传输,它的能力相当于两个单工通信方式的结合。全双工可以同时(瞬时)进行信号的双向传输。

2.2.5管道的大小

前面我们提到,当管道满了的时候...

那么在linux中管道的大小是多少呢?

管道的最大容量是65536 字节=64 KB。

但是这里还有一个PIPE_BUF的概念。

当单次写入的数据量不大于PIPE_BUF时,linux将保证写入数据的『 原子性 』。

linux中PIPE_BUF的大小为4096 字节= 4 KB.


2.2.6进程池

操作系统随时随地都有可能收到用户创建进程的请求,那么操作系统为了节省开销,往往是一次申请创建多个进程,这样可以有效减少系统调用的次数。

放到现实场景里,比如我需要完成一项任务,如果操作系统在我提出请求后才开始创建进程是不是效率会很低下,所以一般操作系统都会维护一个进程池,可以随时满足不同任务的进程创建请求。

所以我们希望操作系统可以一次创建一批进程,那么基于刚才学习的匿名管道,我可不可以实现以下这样的功能呢?

一个父进程创建多个子进程,同时配备相同数量的管道,父进程通过管道向子进程发送指令,要求子进程完成某某任务。

注意:下附源码与部分源码讲解。

源码

processpool.cc

#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <ctime>
#include <sys/wait.h>
#include "task.hpp"

using namespace std;

enum
{
    UsageError = 1,
    ArgError,
    PipeError
};

void Usage(const std::string &proc)
{
    cout << "Usage: " << proc << " subprocess-num" << endl;
}

//管道类
class Channel
{
public:
    Channel(int wfd, pid_t sub_id, const std::string &name)
        : _wfd(wfd), _sub_process_id(sub_id), _name(name)
    {
    }

    string name() { return _name; }
    int wfd() { return _wfd; }
    pid_t pid() { return _sub_process_id; }
    void Close() { close(_wfd); }
    ~Channel()
    {
    }

private:
    int _wfd; // 管道写端文件描述符表
    pid_t _sub_process_id;// 子进程pid
    string _name; //管道名称
};

//进程池类
class ProcessPool
{
public:
    ProcessPool(int sub_process_num) : _sub_process_num(sub_process_num)
    {
    }

    //创建进程
    int CreateProcess(work_t work) // 回调函数
    {
        std::vector<int> fds; // 为了处理子进程拷贝父进程描述符表时引起的问题所搭建的容器
        for (int number = 0; number < _sub_process_num; number++)
        {
            int pipefd[2]{0};
            int n = pipe(pipefd);
            if (n < 0)
                return PipeError;
            pid_t id = fork();
            if (id == 0)
            {
                //处理子进程拷贝父进程描述符表时引起问题(下面详谈)
                if (!fds.empty())
                {
                    std::cout << "close w fd: ";
                    for (auto fd : fds)
                    {
                        close(fd);
                        std::cout << fd << " ";
                    }
                    std::cout << std::endl;
                }

                // sleep(1);
                // child -> r
                close(pipefd[1]);
                // 执行任务
                dup2(pipefd[0], 0);//重定向
                work(pipefd[0]);
                exit(0);
            }

            // sleep(2);
            string cname = "channel-" + to_string(number);
            // father
            close(pipefd[0]);
            channels.push_back(Channel(pipefd[1], id, cname));

            // 把父进程的wfd保存,创建后续其他子进程时,及时关闭当前管道的写端
            fds.push_back(pipefd[1]);
        }
        return 0;
    }

    //管道负载均衡,每次选取下一个管道轮询
    int NextChannel()
    {
        static int next = 0;
        int c = next;
        next++;
        next %= channels.size();
        return c;
    }

    //发送任务码
    void SendTaskCode(int index, uint32_t code)
    {
        cout << "send code: " << code << " to " << channels[index].name() << " sub prorcess id: " << channels[index].pid() << endl;
        write(channels[index].wfd(), &code, sizeof(code));
    }

    // 让子进程全部退出,只需要关闭所有的Channel w即可!
    void KillAll()
    {
        for (auto &channel : channels)
        {
            channel.Close();
            pid_t pid = channel.pid();
            
            //回收子进程
            pid_t rid = waitpid(pid, nullptr, 0);
            if (rid == pid)
            {
                std::cout << "wait sub process: " << pid << " success..." << std::endl;
            }
            std::cout << channel.name() << " close done" << " sub process quit now : " << channel.pid() << std::endl;
        }
    }

    ~ProcessPool()
    {
    }

private:
    int _sub_process_num; // 子进程数量
    vector<Channel> channels; // 管道容器
};

//进程池控制
void CtrlProcessPool(ProcessPool *processpool_ptr, int cnt)
{
    while (cnt)
    {
        // a. 选择一个进程和通道
        int channel = processpool_ptr->NextChannel();
        // cout << channel.name() << endl;

        // b. 你要选择一个任务
        uint32_t code = NextTask();

        // c. 发送任务
        processpool_ptr->SendTaskCode(channel, code);

        sleep(1);
        cnt--;
    }
}

// ./processpool 5
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    //获取命令行参数:创建的子进程个数
    int sub_process_num = std::stoi(argv[1]);
    if (sub_process_num <= 0)
        return ArgError;

    srand((uint64_t)time(nullptr));

    // 1. 创建通信信道和子进程
    ProcessPool *processpool_ptr = new ProcessPool(sub_process_num);
    processpool_ptr->CreateProcess(worker);

    // 2. 控制子进程
    CtrlProcessPool(processpool_ptr, 10);

    std::cout << "task run done" << std::endl;
    // sleep(100);

    // 3. 回收子进程
    processpool_ptr->KillAll();

    delete processpool_ptr;
    return 0;
}

task.hpp

#pragma once

#include <iostream>
#include <unistd.h>

using namespace std;

typedef void(*work_t)(int);  //函数指针类型
typedef void(*task_t)(int,pid_t);  //函数指针类型

void PrintLog(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task\n" << endl;
}

void ReloadConf(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task\n" << endl;
}

void ConnectMysql(int fd, pid_t pid)
{
    cout << "sub process: " << pid << ", fd: " << fd<< ", task is : connect mysql task\n" << endl;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

uint32_t NextTask()
{
    return rand() % 3;
}

void worker(int fd)
{
    // 从0中读取任务即可!
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0, &command_code, sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code >= 3) continue;
            tasks[command_code](fd, getpid());
        }
        else if(n == 0)
        {
            std::cout << "sub process: " << getpid() << " quit now..." << std::endl;
            break;
        }
    }
    
}

子进程拷贝父进程导致管道有多个写端的问题解决

这里我们主要需要研究一下:子进程的回收与管道的关闭可能会引发的问题。

以上代码是处理完bug的结果,问题主要发生在创建多个子进程时,子进程都会完整地拷贝父进程的文件描述符表,这就会引发子进程的文件描述符表拷贝下来不需要的读写文件流,这样说比较抽象,我们画图来说明:

你会发现一个问题:当父进程创建子进程2时, 由于子进程2的文件描述符表是拷贝的父进程,那么也会一并将父进程文件描述符表中下标为4的流拷贝过来,导致子进程2中的文件描述符表也有指向管道1的流。

如果你确保子进程2不会向管道1写入数据的话,好像也没什么问题?

注意:主要问题发生在关闭管道和回收子进程上。

根据管道通信的4种特殊情况之一:我们如何让读端进程(子进程)退出呢?

只需要关闭我们管道的写端,那么读端进程就会关闭。

但是由于管道1的写端有两个:一个是父进程,一个是子进程2,那么父进程还能控制子进程的关闭么?

正常来讲:应该是父进程要关闭某个子进程,只需要关闭链接该子进程的管道的写端即可,现在的情况就是,就算父进程关闭了管道的写端,那么还有由于创建子进程拷贝的问题,后续的其他子进程都会有该管道的写端。

那么我们如何解决这个问题呢?我给出的源码在创建进程池函数中已经给出解决方案,在这里我单独拿出来做讲解:

//创建进程
int CreateProcess(work_t work) // 回调函数
{
    std::vector<int> fds; // 为了处理子进程拷贝父进程描述符表时引起的问题所搭建的容器
    for (int number = 0; number < _sub_process_num; number++)
    {
        int pipefd[2]{ 0 };
        int n = pipe(pipefd);
        if (n < 0)
            return PipeError;
        pid_t id = fork();
        if (id == 0)
        {
            //处理子进程拷贝父进程描述符表时引起问题(下面详谈)
            if (!fds.empty())
            {
                std::cout << "close w fd: ";
                for (auto fd : fds)
                {
                    close(fd);
                    std::cout << fd << " ";
                }
                std::cout << std::endl;
            }

            // sleep(1);
            // child -> r
            close(pipefd[1]);
            // 执行任务
            dup2(pipefd[0], 0);//重定向
            work(pipefd[0]);
            exit(0);
        }

        // sleep(2);
        string cname = "channel-" + to_string(number);
        // father
        close(pipefd[0]);
        channels.push_back(Channel(pipefd[1], id, cname));

        // 把父进程的wfd保存,创建后续其他子进程时,及时关闭当前管道的写端
        fds.push_back(pipefd[1]);
    }
    return 0;
}

根据注释,我们维护了一个vector容器,在每次创建子进程后,将父进程当前的wfd保存,放在上面图示中的例子就是下标为4的文件流,保存后,在创建新的子进程时,判断容器fds是否为空,如果不为空,证明可能会发生拷贝风险,所以我们及时将之前保存的下标为4的文件流关闭,通过维护这样一个vector,我们成功解决了这一问题。


2.3命名管道

命名管道与匿名管道的区别在于:命名管道可以让没有血缘关系的进程间通信,其他并无差别。

那么我们之前说过进程间通信的本质就是:进程间通信的本质就是让不同的进程看到同一份资源,

那么如何实现两个毫不相关进程之间的通信呢,很明显我们需要一个两个进程都可以看到的文件,命名管道就是一种特殊类型的文件,两个进程通过命名管道的路径文件名打开同一个管道文件,此时这两个进程也就看到了同一份资源,进而就可以进行通信了。

注意:

  1. 普通文件是很难做到通信的,即便做到通信也无法解决一些安全问题。
  2. 命名管道和匿名管道一样,都是内存文件,只不过命名管道在磁盘有一个简单的映像,但这个映像的大小永远为0,因为命名管道和匿名管道都不会将通信数据刷新到磁盘当中。

2.3.1使用命令创建命名管道

我们可以使用

mkfifo

命令创建一个命名管道。

mkfifo fifo

使用这个命名管道文件,就能实现两个进程之间的通信了。我们在一个进程(进程A)中用shell脚本每秒向命名管道写入一个字符串,在另一个进程(进程B)当中用cat命令从命名管道当中进行读取。

现象就是当进程A启动后,进程B会每秒从命名管道中读取一个字符串打印到显示器上。这就证明了这两个毫不相关的进程可以通过命名管道进行数据传输。

注意:之前我们说过,当管道的读端进程退出后,写端进程再向管道写入数据就没有意义了,此时写端进程会被操作系统杀掉,在这里就可以很好的得到验证:当我们终止掉读端进程后,因为写端执行的循环脚本是由命令行解释器bash执行的,所以此时bash就会被操作系统杀掉,我们的云服务器也就退出了。


2.3.2使用程序创建命名管道

在程序中创建命名管道使用mkfifo函数,mkfifo函数的函数原型如下:

int mkfifo(const char *pathname, mode_t mode);

**参数: **

  • 参数pathname:创建的管道的路径文件名。
  • 参数mode:创建命名管道文件的默认权限。

为了避免创建出来管道文件的权限值受到umask(文件默认掩码)的影响,可以利用umask函数主动文件默认掩码设置为0。

umask(0); //将文件掩码设置为0

tips:实际创建出来文件的权限为:mode&(~umask)。umask的默认值一般为0002,当我们设置mode值为0666时实际创建出来文件的权限为0664。

** 返回值**:

  • 命名管道创建成功,返回0。
  • 命名管道创建失败,返回-1。

2.3.3用命名管道实现server&client通信

实现服务端(server)和客户端(client)之间的通信之前,我们需要先让服务端运行起来,我们需要让服务端运行后创建一个命名管道文件,然后再以读的方式打开该命名管道文件,之后服务端就可以从该命名管道当中读取客户端发来的通信信息了。

我们实现一个Fifo类,让该类替我们处理有关管道的创建销毁的操作:

#ifndef __COMM_HPP__
#define __COMM_HPP__

#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>

using namespace std;

#define Mode 0666
#define Path "./fifo"

class Fifo
{
public:
    Fifo(const string &path) : _path(path)
    {
        umask(0);
        int n = mkfifo(_path.c_str(), Mode);
        if (n == 0)
        {
            cout << "mkfifo success" << endl;
        }
        else
        {
            cerr << "mkfifo failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
        }
    }
    ~Fifo()
    {
        int n = unlink(_path.c_str());
        if (n == 0)
        {
            cout << "remove fifo file " << _path << " success" << endl;
        }
        else
        {
            cerr << "remove failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
        }
    }

private:
    string _path; // 文件路径+文件名
};

#endif

服务端的代码如下:

//server.c
#include "comm.h"

int main()
{
    Fifo fifo(Path);

    int rfd = open(Path, O_RDONLY);
    if (rfd < 0)
    {
        cerr << "open failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
        return 1;
    }
    // 如果我们的写端没打开,先读打开,open的时候就会阻塞,直到把写端打开,读open才会返回
    cout << "open success" << endl;

    char buffer[1024];
    while (true)
    {
        ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;//加上'\0'方便输出
            cout << "client say : " << buffer << endl;
        }
        else if (n == 0)
        {
            cout << "client quit, me too!!" << endl;//写端退出,读端也退出
            break;
        }
        else
        {
            cerr << "read failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
            break;
        }
    }

    close(rfd);
    return 0;
}

对于客户端来说,因为服务端运行起来后命名管道文件就已经被创建了,所以客户端只需以写的方式打开该命名管道文件,之后客户端就可以将通信信息写入到命名管道文件当中,进而实现和服务端的通信。

客户端的代码如下:

//client.c
#include "comm.h"

int main()
{
    int wfd = open(PATH, O_WRONLY); //以写的方式打开命名管道文件
    if (wfd < 0){
        cerr << "open failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
        return 1;
    }
    string inbuffer;
    while (true)
    {
        cout << "Please Enter Your Message# ";
        std::getline(cin, inbuffer);
        if(inbuffer == "quit") break;
        ssize_t n = write(wfd, inbuffer.c_str(), inbuffer.size());
        if (n < 0)
        {
            cerr << "write failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
            break;
        }
    }
    close(wfd); //通信完毕,关闭命名管道文件
    return 0;
}

3.system V进程间通信

system V进程间通信仍然是一种本地通信方案,不同的是,管道的实现其实是基于文件系统,不需要单独设计模块,而system V IPC是操作系统特地设计的一种通信方式,是操作系统内核中专门设计的通信方案。

system V IPC提供的通信方式有以下三种:

  • system V共享内存
  • system V消息队列(略)
  • system V信号量(略)

其中,system V共享内存和system V消息队列是以传送数据为目的的,而system V信号量是为了保证进程间的同步与互斥而设计的,虽然system V信号量和通信好像没有直接关系,但属于通信范畴。

3.1system V共享内存

3.1.1原理

system V共享内存的通信方案的本质仍然是让不同进程看到同一份资源,只不过这个资源现在变成了物理内存。

在物理内存当中申请一块内存空间,然后将这块内存空间分别与各个进程各自的页表之间建立映射,再在虚拟地址空间当中开辟空间并将虚拟地址填充到各自页表的对应位置,使得虚拟地址和物理地址之间建立起对应关系,至此这些进程便看到了同一份物理内存,这块物理内存就叫做共享内存。


系统中一定有很多个进程间进行通信,也就是说共享内存在内核中同时可以存在很多个,那么操作系统需要将这些共享内存管理起来:先描述,再组织。

用户级共享内存的数据结构如下:

struct shmid_ds {
    struct ipc_perm     shm_perm;   /* operation perms */
    int         shm_segsz;  /* size of segment (bytes) */
    __kernel_time_t     shm_atime;  /* last attach time */
    __kernel_time_t     shm_dtime;  /* last detach time */
    __kernel_time_t     shm_ctime;  /* last change time */
    __kernel_ipc_pid_t  shm_cpid;   /* pid of creator */
    __kernel_ipc_pid_t  shm_lpid;   /* pid of last operator */
    unsigned short      shm_nattch; /* no. of current attaches */
    unsigned short      shm_unused; /* compatibility */
    void            *shm_unused2;   /* ditto - used by DIPC */
    void            *shm_unused3;   /* unused */
};

当我们申请了一块共享内存后,为了让要实现通信的进程能够看到同一个共享内存,因此每一个共享内存被申请时都有一个key值,这个key值用于标识系统中共享内存的唯一性。

可以看到上面共享内存数据结构的第一个成员是『 shm_perm』,『 shm_perm』是一个『 ipc_perm』类型的结构体变量,每个共享内存的『 key』值存储在『 shm_perm』这个结构体变量当中,其中『 ipc_perm』结构体的定义如下:

struct ipc_perm{
    __kernel_key_t  key;
    __kernel_uid_t  uid;
    __kernel_gid_t  gid;
    __kernel_uid_t  cuid;
    __kernel_gid_t  cgid;
    __kernel_mode_t mode;
    unsigned short  seq;
};

这个key值我们放到接下来的系统调用函数中再做讲解。


3.1.2共享内存的建立与释放

共享内存的建立大致包括以下两个过程:

  1. 在物理内存当中申请共享内存空间。
  2. 将申请到的共享内存挂接到地址空间,即建立映射关系。

共享内存的释放大致包括以下两个过程:

  1. 将共享内存与地址空间去关联,即取消映射关系。
  2. 释放共享内存空间,即将物理内存归还给系统。

那么如何完成以上过程呢?我们发现操作的对象都是内核级数据结构,所以这里一定需要OS提供系统调用函数,接下来我们一起来学习下吧!


3.1.3系统调用函数

创建共享内存——shmget函数
int shmget(key_t key, size_t size, int shmflg);

shmget函数的参数说明:

  • key,表示待创建共享内存在系统当中的唯一标识。
  • size,表示待创建共享内存的大小,内核中共享内存的大小是以4KB为基本单位的,建议申请大小是4KB的整数倍。
  • shmflg,表示创建共享内存的方式。

shmget函数的返回值说明:

  • shmget调用成功,返回一个有效的共享内存标识符(用户层标识符shmid)。
  • shmget调用失败,返回-1。

参数key讲解

key值我们需要通过ftok函数获取:

key_t ftok(const char *pathname, int proj_id);

ftok函数的作用:将一个已存在的路径名pathname和一个整数标识符proj_id转换成一个key值,称为IPC键值,在使用shmget函数获取共享内存时,这个key值会被填充进维护共享内存的数据结构当中。

思考:这个key值为什么要让用户传入呢?

因为只要通信双方约定好参数,两个通信的进程就能通过ftok函数生成相同的key,就相当于他们看到了那个同一个内存,就能建立通信了。而反过来如果这个key是由内核来设定,那进程B如何知道进程A创建的共享内存的key呢,他们之间可还没有建立通信呢?

注意:pathname所指定的文件必须存在且可存取。

参数shmflg讲解
组合方式****作用IPC_CREAT如果内核中不存在键值与key相等的共享内存,则新建一个共享内存并返回该共享内存的句柄;如果存在这样的共享内存,则直接返回该共享内存的句柄IPC_CREAT | IPC_EXCL如果内核中不存在键值与key相等的共享内存,则新建一个共享内存并返回该共享内存的句柄;如果存在这样的共享内存,则出错返回,即如果创建成功,则一定是全新的共享内存IPC_CREAT | IPC_EXCL | 权限指定创建的共享内存权限,比如IPC_CREAT | IPC_EXCL | 0666

  • 文件的生命周期随进程;
  • 共享内存的生命周期随内核;

进程结束,如果我们不主动释放共享内存,它会一直存在,除非系统重启。


挂接共享内存——shmat函数
void *shmat(int shmid, const void *shmaddr, int shmflg);

shmat函数的参数说明:

  • shmid,表示待关联共享内存的用户级标识符。
  • shmaddr,指定共享内存映射到进程地址空间的某一地址,通常设置为NULL,即让内核自己决定一个合适的地址位置。
  • shmflg,表示关联共享内存时设置的某些属性。

shmat函数的返回值说明:

  • shmat调用成功,返回共享内存映射到进程地址空间中的起始地址。
  • shmat调用失败,返回(void*)-1。

其中,作为shmat函数的第三个参数传入的常用的选项有以下三个:
选项作用SHM_RDONLY关联共享内存后只进行读取操作SHM_RND若shmaddr不为NULL,则关联地址自动向下调整为SHMLBA的整数倍。公式:shmaddr-(shmaddr%SHMLBA)0默认为读写权限


拆卸共享内存——shmdt函数
int shmdt(const void *shmaddr);

取消共享内存与进程地址空间之间的关联。

shmdt函数的参数说明:

  • shmaddr,待拆卸的共享内存的起始地址,即调用shmat函数时得到的起始地址。

shmdt函数的返回值说明:

  • shmdt调用成功,返回0。
  • shmdt调用失败,返回-1。

释放共享内存——shmctl函数
int shmctl(int shmid, int cmd, struct shmid_ds *buf);

shmctl函数用于控制共享内存,可以根据传入的参数cmd的不同,执行不同的动作。

shmctl函数的参数说明:

  • shmid,表示所控制共享内存的用户级标识符。
  • cmd,表示具体的控制动作。
  • buf,用于获取或设置所控制共享内存的数据结构。

shmctl函数的返回值说明:

  • shmctl调用成功,返回0。
  • shmctl调用失败,返回-1。

其中,作为shmctl函数的第二个参数传入的常用的选项有以下三个:
选项作用IPC_STAT获取共享内存的信息,此时参数buf作为输出型参数IPC_SET在进程有足够权限的前提下,将当前共享内存的信息(shmid_ds结构体)设置为buf所指的数据结构中的信息IPC_RMID删除共享内存段


3.1.4命令行操作共享内存

查看共享内存信息——ipcs命令

单独使用ipcs命令时,会默认列出消息队列、共享内存以及信号量相关的信息,若只想查看它们之间某一个的相关信息,可以选择携带以下选项

  • -q:列出消息队列相关信息。
  • -m:列出共享内存相关信息。
  • -s:列出信号量相关信息。

例如,携带-m选项查看共享内存相关信息:

信息介绍:
标题含义key系统区别各个共享内存的唯一标识shmid共享内存的用户层id(句柄)owner共享内存的拥有者perms共享内存的权限bytes共享内存的大小nattch关联共享内存的进程数status共享内存的状态
注意: key是在内核层面上保证共享内存唯一性的方式,而shmid是在用户层面上保证共享内存的唯一性,在系统调用中你会发现基本上使用的都是shmid作为参数。

释放共享内存——ipcrm -m shmid命令

注意:这里命令行参数是shmid,不是key。

比如:


3.1.5用共享内存实现server&client通信

该程序重点在于学习使用共享内存的系统调用函数,并且由于共享内存不提供进程间的协同机制,所以协同工作需要用户提供,一般使用『 信号量』解决,但由于我们还没有学习,所以这里使用『 管道』解决。

源码

Comm.hpp

#pragma once

#include <iostream>
#include <cerrno>
#include <cstring>
#include <cstdlib>
#include <string>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>

using namespace std;

const char *pathname = "/home/ff/linux/test_24_04_14 ";
const int proj_id = 0x66;

// 在内核中,共享内存的大小是以4KB为基本单位的. 你只能用你申请的大小。建议申请大小是n*4KB
const int defaultsize = 4096; // 单位是字节

// 转化成16进制数
std::string ToHex(key_t k)
{
  char buffer[1024];
  snprintf(buffer, sizeof(buffer), "0x%x", k);
  return buffer;
}

// 计算key值
key_t GetShmKeyOrDie()
{
  key_t k = ftok(pathname, proj_id);
  if (k < 0)
  {
    std::cerr << "ftok error, errno : " << errno << ", error string: " << strerror(errno) << std::endl;
    exit(1);
  }
  return k;
}

// 创建共享内存
int CreateShmOrDie(key_t key, int size, int flag)
{
  int shmid = shmget(key, size, flag);
  if (shmid < 0)
  {
    std::cerr << "shmget error, errno : " << errno << ", error string: " << strerror(errno) << std::endl;
    exit(2);
  }
  return shmid;
}

// 封装
int CreateShm(key_t key, int size)
{
  // IPC_CREAT: 不存在就创建,存在就获取
  // IPC_EXCL: 没有意义
  // IPC_CREAT | IPC_EXCL: 不存在就创建,存在就出错返回
  return CreateShmOrDie(key, size, IPC_CREAT | IPC_EXCL | 0666);
}

// 封装
int GetShm(key_t key, int size)
{
  return CreateShmOrDie(key, size, IPC_CREAT);
}

// 释放共享内存
void DeleteShm(int shmid)
{
  int n = shmctl(shmid, IPC_RMID, nullptr);
  if (n < 0)
  {
    std::cerr << "shmctl error" << std::endl;
  }
  else
  {
    std::cout << "shmctl delete shm success, shmid: " << shmid << std::endl;
  }
}

// 打印共享内存信息
void ShmDebug(int shmid)
{
  struct shmid_ds shmds;
  int n = shmctl(shmid, IPC_STAT, &shmds);
  if (n < 0)
  {
    std::cerr << "shmctl error" << std::endl;
    return;
  }
  std::cout << "shmds.shm_segsz: " << shmds.shm_segsz << std::endl;
  std::cout << "shmds.shm_nattch:" << shmds.shm_nattch << std::endl;
  std::cout << "shmds.shm_ctime:" << shmds.shm_ctime << std::endl;
  std::cout << "shmds.shm_perm.__key:" << ToHex(shmds.shm_perm.__key) << std::endl;
}

// 挂接共享内存
void *ShmAttach(int shmid)
{
  void *addr = shmat(shmid, nullptr, 0);
  if ((long long int)addr == -1)
  {
    std::cerr << "shmat error" << std::endl;
    return nullptr;
  }
  return addr;
}

// 拆卸共享内存
void ShmDetach(void *addr)
{
  int n = shmdt(addr);
  if (n < 0)
  {
    std::cerr << "shmdt error" << std::endl;
  }
}

服务端

#include "Comm.hpp"
#include "Fifo.hpp"
#include <unistd.h>

int main()
{
  // 1. 获取key
  key_t key = GetShmKeyOrDie();
  std::cout << "key: " << ToHex(key) << std::endl;
  // sleep(2);

  // 2. 创建共享内存
  int shmid = CreateShm(key, defaultsize);
  std::cout << "shmid: " << shmid << std::endl;
  // sleep(2);

  // ShmDebug(shmid);
  // 4. 将共享内存和进程进行挂接(关联)
  char *addr = (char *)ShmAttach(shmid);
  std::cout << "Attach shm success, addr: " << ToHex((uint64_t)addr) << std::endl;

  // 0. 利用管道实现进程间协同机制
  Fifo fifo;
  Sync syn;
  syn.OpenReadOrDie();

  // 可以进行通信了
  for (;;)
  {
    // Wait返回值:
    // 真:管道读端读取到数据,证明此时有进程向你发数据了
    // 假:管道读端读到末尾或者读取失败
    if (!syn.Wait())
      break;
    cout << "shm content: " << addr << std::endl;
  }

  // 拆卸共享内存,解除关联
  ShmDetach(addr);
  std::cout << "Detach shm success, addr: " << ToHex((uint64_t)addr) << std::endl;

  // 3. 释放共享内存
  DeleteShm(shmid);
  return 0;
}

**客户端 **

#include "Comm.hpp"
#include "Fifo.hpp"
#include <unistd.h>

int main()
{
  key_t key = GetShmKeyOrDie();
  std::cout << "key: " << ToHex(key) << std::endl;
  // sleep(2);

  int shmid = GetShm(key, defaultsize);
  std::cout << "shmid: " << shmid << std::endl;
  // sleep(2);

  char *addr = (char *)ShmAttach(shmid);
  std::cout << "Attach shm success, addr: " << ToHex((uint64_t)addr) << std::endl;
  // sleep(5);

  // 初始化共享内存数据
  memset(addr, 0, defaultsize);
  Sync syn;
  syn.OpenWriteOrDie();

  // 可以进行通信了
  for (char c = 'A'; c <= 'Z'; c++) // pipe, fifo, ->read/write->系统调用, shm -> 没有使用系统调用!!
  {
    addr[c - 'A'] = c;
    sleep(1);

    // 向管道写数据,唤醒读端可以读了
    syn.Wakeup();
  }

  // 拆卸共享内存
  ShmDetach(addr);
  std::cout << "Detach shm success, addr: " << ToHex((uint64_t)addr) << std::endl;
  sleep(5);

  return 0;
}

**管道提供进程间协同机制 **

#ifndef __COMM_HPP__
#define __COMM_HPP__

#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <cassert>

using namespace std;

#define Mode 0666
#define Path "./fifo"

class Fifo
{
public:
  Fifo(const string &path = Path) : _path(path)
  {
    umask(0);
    int n = mkfifo(_path.c_str(), Mode);
    if (n == 0)
    {
      cout << "mkfifo success" << endl;
    }
    else
    {
      cerr << "mkfifo failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
    }
  }
  ~Fifo()
  {
    int n = unlink(_path.c_str());
    if (n == 0)
    {
      cout << "remove fifo file " << _path << " success" << endl;
    }
    else
    {
      cerr << "remove failed, errno: " << errno << ", errstring: " << strerror(errno) << endl;
    }
  }

private:
  string _path; // 文件路径+文件名
};

// 利用管道提供进程间协同机制
class Sync
{
public:
  Sync() : rfd(-1), wfd(-1)
  {
  }
  void OpenReadOrDie()
  {
    rfd = open(Path, O_RDONLY);
    if (rfd < 0)
      exit(1);
  }
  void OpenWriteOrDie()
  {
    wfd = open(Path, O_WRONLY);
    if (wfd < 0)
      exit(1);
  }

  // 实现进程间协同机制,服务端用
  bool Wait()
  {
    bool ret = true;
    uint32_t c = 0;
    ssize_t n = read(rfd, &c, sizeof(uint32_t));
    if (n == sizeof(uint32_t))
    {
      std::cout << "server wakeup, begin read shm..." << std::endl;
    }
    else if (n == 0)
    {
      ret = false;
    }
    else
    {
      return false;
    }
    return ret;
  }

  // 实现进程间协同机制,客户端用
  void Wakeup()
  {
    uint32_t c = 0;
    ssize_t n = write(wfd, &c, sizeof(c));
    assert(n == sizeof(uint32_t));

    std::cout << "wakeup server..." << std::endl;
  }
  ~Sync() {}

private:
  int rfd;
  int wfd;
};

#endif

管道提供进程间协同机制的讲解

什么是进程间协同机制呢?

上面我们提到过管道的4种特殊情况,比如:

写端进程不写,并且管道内部无数据,那么此时对应的读端进程就会阻塞等待,直到管道里面有数据。

你会发现管道通信是可以让通信双方进程协同配合的,某一方进程会根据另一方进程的状态做出反应,这也就是所谓的『 协同』。

但是共享内存并不提供进程间协同机制,如果你不提供协同机制的话,就会出现不管客户端向服务端发没发数据,服务端都一直接收的情况:

管道通信支持进程间协同机制是操作系统内核实现的,共享内存并不支持协同机制,如果需要,用户级实现即可。

实现协同机制一般都采用信号量来解决,但由于我们还没有学习信号量,所以这里我们使用管道来实现。

其实原理很简单,就是进程在利用共享内存通信之前,利用管道的协同机制进行阻塞即可。

// 实现进程间协同机制,服务端用
bool Wait()
{
  bool ret = true;
  uint32_t c = 0;
  ssize_t n = read(rfd, &c, sizeof(uint32_t));
  if (n == sizeof(uint32_t))
  {
    std::cout << "server wakeup, begin read shm..." << std::endl;
  }
  else if (n == 0)
  {
    ret = false;
  }
  else
  {
    return false;
  }
  return ret;
}

// 实现进程间协同机制,客户端用
void Wakeup()
{
  uint32_t c = 0;
  ssize_t n = write(wfd, &c, sizeof(c));
  assert(n == sizeof(uint32_t));

  std::cout << "wakeup server..." << std::endl;
}

以上是很简单的对管道文件的读写操作,rfd代表管道的读端,wfd代表管道的写端,Wait()实现的就是从管道中读取数据到c,Wakeup()实现的就是向管道中写入数据,经过前面管道通信的学习我们知道只有当写端向读端写数据时,读端才读,所以我们可以将向管道写的操作(Wakeup())放到客户端每次向共享内存写入数据之后,即:

// 客户端
for (char c = 'A'; c <= 'Z'; c++) // pipe, fifo, ->read/write->系统调用, shm -> 没有使用系统调用!!
{
  addr[c - 'A'] = c;
  sleep(1);

  // 向管道写数据,唤醒读端可以读了
  syn.Wakeup();
}

在Wakeup()调用之前由于管道的读端没有检测到写端写数据,所以读端会阻塞等待写端写,即:

// 服务端
for (;;)
{
  // Wait返回值:
  // 真:管道读端读取到数据,证明此时有进程向你发数据了
  // 假:管道读端读到末尾或者读取失败
  if (!syn.Wait())
    break;
  cout << "shm content: " << addr << std::endl;
}

所以这样就实现了进程间的协同机制。


3.1.6system V共享内存通信方式与管道通信的对比

共享内存是所有进程间通信方式中最快的一种通信方式。

由共享内存的原理我们知道,这样的通信方式,因为有页表映射的存在,提取共享内存中的数据对于每个进程来说都相当于在自己的地址空间中取数据,而且通信过程不需要像管道一样每次通信都需要调用系统调用接口read、write。

但是共享内存并不提供进程间协同机制,而我们知道管道是自带同步与互斥机制的,所以这也算是共享内存的一个缺点。


3.2system V消息队列

消息队列实际上在当前已经被淘汰了,或者说system V的通信方案都已经落伍了,现在大部分都是网络通信,所以这里消息队列我并不打算细讲了。

消息队列实际上就是在系统内核当中创建了一个队列,队列当中的每个成员都是一个数据块,这些数据块都由类型和信息两部分构成,两个互相通信的进程通过某种方式看到同一个消息队列,这两个进程向对方发数据时,都在消息队列的队尾添加数据块,这两个进程获取数据块时,都在消息队列的队头取数据块。

其中消息队列当中的某一个数据块是由谁发送给谁的,取决于数据块的类型。

消息队列的用户级数据结构与共享内存的内核级数据结构非常相似,并且系统调用接口与共享内存也十分相似,大家可以自行了解。


3.3system V信号量

这里我们简单了解一下同步与互斥。

互斥:竞争临界资源时,同一时间只有一个进程使用该临界资源。

同步:在保证互斥的前提下,有顺序地持有临界资源,避免进程饥饿。

信号量我们主要放在线程部分再与大家探讨,这里直接跳过。


=========================================================================

如果你对该系列文章有兴趣的话,欢迎持续关注博主动态,博主会持续输出优质内容

🍎博主很需要大家的支持,你的支持是我创作的不竭动力🍎

🌟**~ 点赞收藏+关注 ~**🌟

=========================================================================

标签: linux 运维 服务器

本文转载自: https://blog.csdn.net/2301_77112634/article/details/136991224
版权归原作者 樊梓慕 所有, 如有侵权,请联系我们删除。

“【Linux】进程间通信”的评论:

还没有评论