0


【Linux】管道

一、前言

为了满足各种需求,进程之间是需要通信的。进程间通信的主要目的包括如下几个方面:

  1. 数据传输:一个进程需要将它的数据发送给另一个进程
  2. 资源共享:多个进程之间共享同样的资源。
  3. 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  4. 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

因为进程具有独立性,这增加了通信的成本。要让两个不同的进程通信,前提条件是先让两个进程看到同一份“资源”,这份“资源”通常是由OS直接或间接提供的。

所以任何进程通信手段,无非都包含如下步骤:

  1. 想办法先让不同的进程看到同一份资源。
  2. 让一方写入,另一方读取,完成通信过程。
  3. 至于通信目的与后续工作,要结合具体场景具体分析。

进程间通信可以分为三类:

  1. 管道
  2. System V IPC
  3. POSIX IPC

这三类方法,都是在解决第一个步骤,即让不同的进程看到同一份“资源”。本篇博客针对于管道进行讲解。

二、管道

管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”。

管道也是文件。在使用管道符 "|" 时, "|" 左边的进程以写的方式打开管道文件,将标准输出重定向到管道之中, "|" 右边的进程以读的方式打开管道文件,将标准输入重定向到管道之中。

1、匿名管道

1.1、基本原理

在文章《文件描述符》中,对已经打开的文件与进程的关系进行了较为详细的说明,文件描述符的 0、1、2 默认为标准输入、标准输出、标准错误。这些文件都有对应的缓冲区。

匿名管道文件是OS提供的内存文件,仅存在于内存,而并不需要将该文件的内容刷新到磁盘之中。OS通过某些方式,使用读方式和写方式分别打开这个匿名管道文件。

该进程通过 fork 创建一个子进程,子进程拷贝了父进程的PCB结构,包括 task_struct 与 struct files_struct 。因此子进程的文件描述符表中存储的指针也被拷贝下来了,指向同一批文件对象(创建子进程的时候,fork子进程,只会复制进程相关的数据结构对象,不会复制父进程曾经打开的文件对象)。这时,我们就做到了进程间通信的前提:让不同的进程看到同一份“资源”。

这种管道只支持单向通信,因此在进程通信的时候,需要确定数据的流向,分别关闭和保留父子进程文件描述符表中的读与写端。这是因为文件对象只有一个缓冲区,难以做到同时读写。

进程创建管道的具体过程如下:

1.2、代码实现

创建管道函数:

int pipe(int pipefd[2]);

pipe 系统调用函数的参数列表中有一个数组,是一个输出型参数。如果创建成功,函数返回值是 0 ,失败返回值为 -1 。因为系统调用接口的底层是使用C语言编写的,所以错误码 errno 会被设置。具体用法如下:

#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <cerrno>
#include <assert.h>
#include <string.h>
using namespace std;

int main()
{
    int pipefd[2] = {0};
    //1.创建管道
    int n = pipe(pipefd);
    if(n < 0)
    {
        cout << "pipe error, " << errno << ": " << strerror(errno) << endl;
        return 1;
    }
    cout << "pipefd[0]: " << pipefd[0] << endl; //读端
    cout << "pipefd[1]: " << pipefd[1] << endl; //写端
    
    //2.创建子进程
    pid_t id = fork();
    assert(id != -1);
    if(id == 0)
    {
        //子进程
    //3.关闭不需要的fd
        //这里让父进程读,子进程写
        close(pipefd[0]);

    //4.开始通信
        string namestr = "子进程";
        int cnt = 1;
        char buffer[1024];
        while(1)
        {
            snprintf(buffer, sizeof(buffer), "%s, 计数器:%d, PID: %d\n", namestr.c_str(), cnt++, getpid());
            write(pipefd[1], buffer, strlen(buffer));
            sleep(1);
        }

        exit(0);
    }
    //父进程
    close(pipefd[1]);

    char buffer[1024];
    while(1)
    {
        int n = read(pipefd[0], buffer, sizeof(buffer) - 1);
        if(n > 0)
        {
            buffer[n] = '\0';
            cout << "父进程" << endl;
            cout << "子进程发送消息: " << buffer << endl;
        }
    }

    return 0;
}

编译并运行:

1.3、管道的特点

  1. 管道是单向通信的.
  2. 管道本质是文件。因为文件描述符的生命周期是随进程的,所以管道的生命周期也是随进程的。
  3. 管道通信,通常用来进行具有“血缘”关系的进程之间的通信。如父子进程间通信。
  4. pipe系统调用打开的管道,并不清楚它的名字,称之为匿名管道。
  5. 在管道通信中,写入的次数与读取的次数,不是严格匹配的。
  6. 管道具有一定的协同能力,让读端与写端按照一定的步骤进行通信: 1)如果读端读取完毕了所有的管道数据,此时如果写端不写,读端就只能等待。 2)如果写端将管道写满了,就无法再继续写入,等读端读取之后才能继续写。 3)如果关闭了写端,读端读取完毕管道数据后,再读,read就会返回 0 ,表明读到了文件结尾。 4)如果写端一直在写,并关闭了读端,那么OS会通过信号终止一直在写入的进程。因为OS不会维护无意义、低效率、浪费资源的事情。
  7. 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。

1.4、基于管道的简单设计

现在我们来实现由一个父进程通过管道向多个子进程写入特定消息,让子进程定向执行某种任务的代码:

//task.hpp
#pragma once

#include <iostream>
#include <vector>
#include <unistd.h>

typedef void (*fun_t)();

void Print()
{
    std::cout << "pid: " << getpid() << "打印任务正在执行" << std::endl;
}

void InsertMySQL()
{
    std::cout << "pid: " << getpid() << "数据库任务正在执行" << std::endl;
}

void NetRequest()
{
    std::cout << "pid: " << getpid() << "网络请求任务正在执行" << std::endl;
}

//约定每一个command都是4个字节
#define COMMAND_PRINT 0
#define COMMAND_MYSQL 1
#define COMMAND_REQEUST 2

class Task
{
public:
    Task()
    {
        funcs.push_back(Print);
        funcs.push_back(InsertMySQL);
        funcs.push_back(NetRequest);
    }
    void Execute(int command)
    {
        if(command >= 0 && command < funcs.size())
        {
            funcs[command];
        }
    }
    ~Task(){}

public:
    std::vector<fun_t> funcs;
};

//ctrlProcess.cc
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "task.hpp"
using namespace std;

const int gnum = 3;
Task t;

class EndPoint
{
private:
    static int number;
public:
    pid_t _child_id;
    int _write_fd;
    string processname;
public:
    EndPoint(int id, int fd):_child_id(id),_write_fd(fd)
    {
        char namebuffer[64];
        snprintf(namebuffer, sizeof(namebuffer), "process-%d[%d:%d]", number++, _child_id, _write_fd);
        processname = namebuffer;
    }
    const string& name() const
    {
        return processname;
    }

    ~EndPoint(){}
};
int EndPoint::number = 0;

//子进程要执行的方法
void WaitCommand()
{
    while(1)
    {
        char command = 0;
        int n = read(0, &command, sizeof(int));
        if(n == sizeof(int))
        {
            t.Execute(command);
        }
        else if(n == 0)
        {
            cout << "父进程让我退出,我的pid:" << getpid() << endl;
            break;
        }
        else
        {
            break;
        }
    }
}

void creatProcesses(vector<EndPoint>* end_points)
{
    for(int i = 0; i < gnum; ++i)
    {
        //创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        assert(n == 0);
        (void)n;

        //创建进程
        pid_t id = fork();
        assert(id != -1);
        if(id == 0)
        {
            //子进程
            close(pipefd[1]);
            //我们期望,所有的子进程读取“指令”,都从标准输入读取
            //输入重定向,也可以不重定向,只要在WaitCommand函数里传参fd就可以了
            dup2(pipefd[0], 0);
            //子进程开始等待获取命令
            WaitCommand();

            exit(0);
        }
        //父进程
        close(pipefd[0]);

        //将新的子进程和他的管道写端构造对象
        end_points->push_back(EndPoint(id, pipefd[1]));
    }
}

int ShowBoard()
{
    cout << "########################################" << endl;
    cout << "### 0.执行打印任务     1.执行数据库任务###" << endl;
    cout << "### 2.执行请求任务     3.退出         ###" << endl;
    cout << "########################################" << endl;
    cout << "请选择" << endl;
    int command = 0;
    cin >> command;
    return command;
}

void ctrlProcess(const vector<EndPoint>& end_points)
{
    int num = 0;
    int cnt = 0;
    while(1)
    {
        //选择任务
        int command = ShowBoard();
        if(command == 3) break;
        if(command < 0 || command > 2) continue;
        //选择进程
        int index = cnt++;
        cnt %= end_points.size();
        cout << "选择了进程: " << end_points[index].name() << " | 处理任务: " << command << endl;

        //下发任务
        write(end_points[index]._write_fd, &command, sizeof(command));
    }
}

// void WaitProcess(const vector<EndPoint>& end_points)
// {
//     //让子进程全部退出,通过关闭写端的方式
//     for(const auto& ep : end_points) close(ep._write_fd);
//     cout << "父进程让所有子进程退出" << endl;
//     sleep(5);

//     //父进程回收子进程的僵尸状态
//     for(const auto& ep : end_points) waitpid(ep._child_id, nullptr, 0);
//     cout << "父进程回收了所有的子进程" << endl;
//     sleep(1);
// }

void WaitProcess(const vector<EndPoint>& end_points)
{
    for(int end = end_points.size() - 1; end >= 0; --end)
    {
        cout << "父进程让我退出: " << end_points[end]._child_id << endl;
        close(end_points[end]._write_fd);

        waitpid(end_points[end]._child_id, nullptr, 0);
        cout << "父进程回收了子进程:" << end_points[end]._child_id << endl;
    }
    sleep(10);
}

int main()
{
    //先进行构建控制结构,父进程写入,子进程读取
    vector<EndPoint> end_points;
    creatProcesses(&end_points);
    ctrlProcess(end_points);

    //处理退出问题
    WaitProcess(end_points);

    return 0;
}

其中, WaitProcess 函数内,退出进程的循环要在 vector 中从后向前遍历:

这是因为我们使用的进程退出方式,是通过关闭父进程写端的文件对象,让OS杀死子进程实现的。而在我们创建子进程与管道的 creatProcesses 函数中,是通过循环一个一个创建的:

这种创建方式在创建第一组管道与子进程时不会有任何问题。但是在之后创建第二、第三组乃至更多时,由于它们的 task_struct 也是父进程的拷贝,就会导致后面子进程的文件描述符表里保留指向前几个管道的指针:

如果是从前向后依次关闭父进程的写端,那么因为第一个子进程对应的管道的写端不只有父进程一个,还有其他两个子进程,因此第一个子进程就不会被OS杀死,从而在下面执行 waitpid 函数时造成堵塞。

为了真正构建每一个管道的写端都只有父进程一个的构造,则可以改写 creatProcesses 函数的代码:

在创建时就直接关闭子进程对应的写端文件。

2、命名管道

  • 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
  • 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
  • 命名管道是一种特殊类型的文件

命名管道的打开规则:

  • 如果当前打开操作是为读而打开FIFO时 O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO O_NONBLOCK enable:立刻返回成功
  • 如果当前打开操作是为写而打开FIFO时 O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO O_NONBLOCK enable:立刻返回失败,错误码为ENXIO

创建命名管道指令:

mkfifo [OPTION]... NAME...

具体用法如下:

可以看到 fifo 是管道文件。

2.1、匿名管道与命名管道的区别

  • 匿名管道由pipe函数创建并打开。

  • 命名管道由mkfifo函数创建,打开用open。

  • FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。

    命名管道虽然有自己的 inode ,存在于磁盘上,但也仅仅只有 inode ,代表它存在,而没有自己的 datablock 。与匿名管道相同,命名管道也是内存级的文件,不存在刷盘操作。

我们通过创建命名管道,也可以让不同的进程通过文件路径 + 文件名找到同一个文件,并打开它,让不同的进程看到同一份资源,具备了进程间通信的前提。

2.2、代码实现命名管道通信

创建命名管道文件的系统调用:

int mkfifo(const char *pathname, mode_t mode);

mkfifo 函数的参数列表中, pathname 为创建文件的路径与文件名,如果不指定路径,默认为当前路径。 mode 为创建文件的权限。创建成功就返回 0 。创建失败返回 -1,并且错误码会被设置。

具体代码如下:

//makefile

.PHONY:all
all:server client

server:server.cc 
    g++ -o $@ $^ -std=c++11
client:client.cc
    g++ -o $@ $^ -std=c++11

.PHONY:clean
clean:
    rm -f server client

//comm.hpp

#pragma once
#include <iostream>
#include <string>

#define NUM 1024

const std::string fifoname = "./fifo";
uint32_t mode = 0666;

//server.cc

#include <iostream>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "comm.hpp"
using namespace std;

int main()
{
    //创建一个管道文件
    umask(0); //这个设置不会影响系统默认设置,只会影响当前进程
    int n = mkfifo(fifoname.c_str(), mode);
    if(n != 0)
    {
        cout << errno << " : " << strerror(errno) << endl;
        return 1;
    }
    cout << "creat fifo file success" << endl;

    //让服务器开启管道文件
    int rfd = open(fifoname.c_str(), O_RDONLY);
    if(rfd <= 0)
    {
        cout << errno << " : " << strerror(errno) << endl;
        return 2;
    }
    cout << "open fifo success, begin ipc" << endl;

    //正常通信
    char buffer[NUM];
    while(1)
    {
        buffer[0] = 0;
        ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
        if(n > 0)
        {
            buffer[n] = 0;
            cout << "client: " << buffer << endl;
        }
        else if(n == 0)
        {
            cout << "client quit, me too" << endl;
            break;
        }
        else
        {
            cout << errno << " : " << strerror(errno) << endl;
            break;
        }
    }

    //关闭文件
    close(rfd);

    unlink(fifoname.c_str()); //把管道文件删掉

    return 0;
}

//client.cc

#include <iostream>
#include <cstdio>
#include <cerrno>
#include <cstring>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "comm.hpp"
using namespace std;

int main()
{
    //不需要创建管道文件,只需要打开就可以了
    int wfd = open(fifoname.c_str(), O_WRONLY);
    if(wfd < 0)
    {
        cerr << errno << " : " << strerror(errno) << endl;
        return 1;
    }

    //正常通信
    char buffer[NUM];
    while(1)
    {
        cout << "输入数据:";
        char* msg = fgets(buffer, sizeof(buffer), stdin); //不用 -1
        assert(msg);
        (void)msg;

        if(strcasecmp(buffer, "quit") == 0) break;

        ssize_t n = write(wfd, buffer, strlen(buffer)); //不用 +1
        assert(n > 0);
        (void)n;
    }

    return 0;
}

运行观察现象:

与匿名管道不同,匿名管道在创建时,默认读写端都是打开的。而命名管道在打开文件阶段会被卡住,只有当我们把读端和写端都手动打开后,程序才能继续向下运行。

之所以在 server 端读取的数据中间有一行空行,是因为发送的时候会按下回车键,回车键也会被读取。为了解决这个问题,可以做如下更改:

观察结果:


关于管道的内容就讲到这里,希望同学们多多支持,如果有不对的地方欢迎大佬指正,谢谢!

标签: linux 服务器 运维

本文转载自: https://blog.csdn.net/weixin_74078718/article/details/130442260
版权归原作者 世间是否此山最高 所有, 如有侵权,请联系我们删除。

“【Linux】管道”的评论:

还没有评论