0


【Linux】生产者消费者模型代码实现和信号量

一定要先理解生产者消费者模型的原理~

文章目录


一、生产者消费者模型实现代码

下面我们实现基于阻塞队列的生产消费模型:

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

首先我们创建一个头文件用来放阻塞队列的类:

#include <iostream>
#include <queue>
template <class T>
class BlockQueue
{
public:
     BlockQueue()
     {

     }
     void push(const T& in)
     {

     }
     void pop(T* out)
     {
        
     }
     ~BlockQueue()
     {

     }

private:
     std::queue<T> _q;
     int _cap;   //代表队列的容量
     pthread_mutex_t _mutex;
     pthread_cond_t _cond;
};

这个类也很简单,只需要有一个普通队列,一个记录队列容量的变量,一个互斥锁(保护共享资源的安全),一个条件变量(维护线程的互斥与同步)。类中最重要的两个接口是push和pop,然后构造和析构函数必须要有。下面进入测试文件:

#include "blockQueue.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

void* consumer(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int> *>(args);

    while (true)
    {
        int data = 0;
        //1.将数据从blockqueue中获取 ---- 获取到了数据
        bq->pop(&data);
        //2.结合某种业务逻辑,处理数据!
        std::cout<<"consumer data: "<<data<<std::endl;
    }
}

void* productor(void* args)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int> *>(args);

    while (true)
    {
        //先通过某种渠道获取数据
        int data = rand()%10+1;
        //然后将数据推送到blockqueue ----- 完成生产过程
        bq->push(data);
        std::cout<<"productor data: "<<data<<std::endl;
    }
}

int main()
{
    srand((uint64_t)time(nullptr)^getpid());
    BlockQueue<int>* bq = new BlockQueue<int>();
    //单生产单消费 -> 多生产和多消费
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

在测试文件中,首先我们要有一个阻塞队列对象,然后创建两个线程,一个代表生产者一个代表消费者。然后我们创建这两个线程并且将阻塞队列传进去,这样这两个线程就能看到同一个缓冲区了。最main函数返回前记得等待线程并且释放开辟的空间。在生产者函数中,由于我们需要生产资源,所以我们直接用一个随机数种子来代替,保证每次生成的都是随机数然后入队列并且打印生产者生产的数据。在消费者函数中因为我们要从队列拿取数据所以直接用一个变量接收pop出来的变量即可,拿出来后打印消费者。

下面我们实现对象内部接口:

 const int gcap = 5;
 BlockQueue(const int cap = gcap)
     :_cap(cap)
     {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&cond,nullptr);
     }
 ~BlockQueue()
     {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
     }

首先我们构造的时候要确定队列为空为满,所以我们直接定义一个全局变量,这个全局变量在构造的时候给cap用来初始化_cap.我们初始化的时候只需要将锁和条件变量初始化了即可。析构也同理,我们只需要将锁和条件变量销毁掉。

当然我们的阻塞队列在满的时候是不能生产的,这个时候我们应该将生产者放到条件变量里。而当阻塞队列为空的时候消费者是不能消费的,所以我们应该将消费者放到条件变量里,这样的话我们原先一个条件变量是不够的,所以我们直接多加一个条件变量:

下面我们实现push接口:

 void push(const T& in)
     {
        pthread_mutex_lock(&mutex);
        if (isfull())  //1.我们只能在临界区内部,判断临界资源是否就绪,注定了我们在当前一定是持有锁的
        {
            //2.要让线程去等待就不能持有锁
            //3.当线程被唤醒的时候,这个线程一定是从wait语句后面继续执行代码
            pthread_cond_wait(&_productorCond,&_mutex);
        }
        _q.push(in);
        pthread_mutex_unlock(&mutex);
     }

我们生产的时候一定是持有锁生产的(因为要保护临界资源),所以先加锁然后判断队列是否满了,满了我们就让线程去等待,不满的时候我们就让数据进入队列然后释放锁。

下面我们实现pop接口:

void pop(T* out)
     {
        pthread_mutex_lock(&_mutex);
        if (isempty())
        {
           pthread_cond_wait(&_consumerCond,&_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_mutex_unlock(&_mutex);
     }

pop接口与push一样,我们一定要在判断队列是否为空之前就需要持有锁,所以我们先加锁,然后判断队列是否为空,如果为空则说明现在消费者不能消费,我们直接让消费者去条件变量中等待即可。如果不为空,我们就将队头元素传给out,然后pop队列即可,最后记得解锁。

bool isfull() const
     {
         return _q.size()==_cap;
     }
     bool isempty() const
     {
         return _q.empty();
     }

判断为空为满的函数非常简单,这里就不解释了。

下面我们考虑什么时候该唤醒休眠的线程呢?其实很简单,我们如何知道队列是否有数据呢?只需要看生产者是否生产了,所以我们在生产者生产完后就可以唤醒消费者了。

同理,我们的消费者只要消费了我们就唤醒生产者继续生产:

下面我们运行起来:

运行起来我们发现打印速度太快了 ,所以我们让消费者开始消费前先sleep 1 秒,这样的话在这一秒期间实际上生产者就全部生产完毕了。:

从结果我们也可以看到是满足队列的特性的,生产者一次生产5个后消费者每消费一个生产者就生产一个。

下面我们再试试:让生产者生产前先sleep 1秒:

通过结果我们可以看到生产者每生产一个消费者就消费一个,下面我们需要修改代码的一个问题:

实际上我们在这里用if判断是有bug的,因为我们不仅仅可以一次唤醒一个线程,也可以唤醒多个线程,当我们唤醒多个线程的时候,如果只用一个if判断就会造成生产者生产的数据超过队列上限,因为是多个线程同时被唤醒生产。所以要解决这个问题我们换成while循环判断,这样即使线程被唤醒也依旧要去判断队列是否为空或者队列是否满的条件。

下面我们不再仅仅在阻塞队列中存放整数等简单类型,我们修改代码在里面放入任务让消费者去完成任务:

首先新建一个任务的头文件:

#include <iostream>

class Task
{
public:
    Task()
    {

    }
    Task(int x,int y,char op)
       :_x(x)
       ,_y(y)
       ,_op(op)
       ,_result(0)
       ,_exitCode(0)
    {

    }
    ~Task()
    {

    }
private:
    int _x;
    int _y;
    char _op;
    int _result;
    int _exitCode;
};

然后我们写出这个任务的大致结构,有两个操作数x和y,然后字符类型的op是+-*/%的运算符号,result是算出来的结果,exitCode是返回码,代码这个计算是否正确,比如正常情况下是0,当出现除0问题就是负数,下面我们直接写操作函数:

void operator()()
    {
       switch(_op)
       {
       case '+':
           _result = _x + _y;
           break;
       case '-':
           _result = _x - _y;
           break;
       case '*':
           _result = _x * _y;
           break;
       case '/':
           if (_y==0)
           {
              _exitCode = -1;
           }
           else 
           {
              _result = _x / _y;
           }
           break;
       case '%':
           if (_y==0)
           {
              _exitCode = -2;
           }
           else 
           {
              _result = _x % _y;
           }
           break;
       default:
           break;
       }
    }

这是一个仿函数,我们用仿函数的目的是到时候我们的类对象直接调用()就能完成运算,函数体里面是一个简单的switch语句,在遇到除0问题时我们会将返回码设置为-1或-2,然后退出。

 std::string formatArg()
    {
        return std::to_string(_x) +_op +std::to_string(_y) + "=";
    }
    std::string formatRes()
    {
        return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
    }

这两个函数中Arg是我们要把表达式打印出来,这个时候是没有结果的,Res是打印计算出来的结果并且会把返回码放在后面。

然后我们再修改一下主函数:

我们将刚刚的随机数种子替换为我们的Task类

生产者函数会随便生成两个操作数和一个运算符,然后交给我们的Task类去处理,最后打印相关信息即可。

下面我们就运行起来:

我们可以看到确实是生产者生产了任务,消费者去完成了任务。

下面我们试试我们写的这个代码能否完成多生产多消费呢?

我们这次不让消费者消费前先sleep了不然太慢了:

运行起来后我们发现是完全没有问题的,因为我们生产和消费公用的一把锁所以所有的生产和消费注定是互斥的。(注意:我们生产和消费访问的都是同一个队列,所以注定了他们只能使用同一把锁)

二、POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

之前我们讲进程间通信的时候提到过信号量,那时候我们对信号量的描述是:信号量(信号灯),本质上是一个计数器,计数器需要++和--操作,但是由于多线程并发访问的问题,所以我们的信号量++ --操作一定要是原子的。

那么我们今天的信号量该如何理解呢?我们今天所说的信号量实际上是描述临界资源中的资源数目的。

如图所示,我们可以将一大块临界资源看成一小块一小块的,然后每一个线程在访问对应的资源的时候,都需要先申请信号量,一旦申请成功就表示该线程允许使用该资源,如果申请不成功,那么就说明目前无法使用该资源。

信号量的工作机制:信号量机制类似于我们看电影买票,是一种资源的预订机制。

信号量已经是资源的计数器了,申请信号量成功,本身就表明资源可用,申请信号量失败表明资源不可用,本质就是把判断转化为信号量的申请行为。

下面我们认识一下信号量的相关接口:

初始化信号量 :

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

pshared:0表示线程间共享,非零表示进程间共享

value:信号量初始值

销毁信号量 :

int sem_destroy(sem_t *sem);

等待信号量 :

功能:等待信号量,会将信号量的值减1 (PV操作中的P操作)

int sem_wait(sem_t *sem); //P()

发布信号量 :

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。(PV操作中的V操作)

int sem_post(sem_t *sem);//V()

下面我们讲一下基于环形队列的生产消费模型,然后开始用代码实现这个模型,在实现的过程中就会用到我们信号量的相关接口:

首先环形队列我相信大家并不陌生,因为我们在将数据结构的时候专门讲过循环队列。重点在于:由于我们的尾指针是先放数据再指向下一个空位置,这就导致最后尾指针和头指针一定会走到一起,这样我们就无法判断队列为空还是为满,所以我们当时给出的方法是:多开一个空位置,当头指针和尾位置在同一个位置时,这就证明这个队列为空。当尾指针的下一个位置是头指针时表面队列为满。

下面我们构建CP问题:

第一个问题:生产者和消费者关心的"资源"是一样的吗?答案是不一样,生产者关心的是空间,消费者关系的是数据。

第二个问题:只要信号量不为0,就表示资源可以用,表示线程可以访问。

第三个问题:环形队列,只要我们访问不同的区域,生产和消费的行为可以同时进行吗?可以同时进行!比如说我们生产者正在7号位置生产资源,而消费者正在1号位置拿取资源,他们两个之间是互不影响的。

第四个问题:他们两个什么时候会访问同一个区域?当队列为空时,他们两个都在同一个位置,这个时候他们会访问同一个区域。那么这个时候他们是什么关系呢?答案是竞争关系,一个抢着在该位置放数据,另一个抢着拿数据,但是我们一定要保证让生产者先生产数据,因为生产者都不生产数据消费者还怎么拿呢。第二种情况:当队列已经满了,如果不制止生产者那么生产者就会在已经存放资源的位置继续生产资源,这种情况下生产者和消费者会访问同一个区域。那么这个时候我们让谁先运行呢?答案当然是消费者了,我们刚刚已经说了,当他们指向同一个位置的时候一定是竞争关系,但是我们必须让消费者先运行,否则生产者就覆盖了之前存放过还没消费的数据。

下面我们总结一下:1.只有为空或者为满的时候,CP才会指向同一个位置。2.除了第一种情况,在其他情况下CP可以并发运行。3.我们要保证游戏规则,同时也要保证空或者满的时候的策略问题。

那么我们如何用计算机语言维护上面的4个规则呢?

我们可以在刚开始的时候给生产者的信号量设置为N,N就是队列为满的时候的空间。因为生产者关心的资源是空间,消费者关心的资源是数据,所以我们刚开始给消费者设置信号量为0(因为刚开始消费者肯定还没拿到资源)。而对于生产者和消费者来讲,我们一开始肯定是执行P操作因为我们都要先申请信号量,只不过生产者申请的是空间信号量,消费者申请的是数据信号量。

当我们进行生产活动时,生产者生产一个资源后,我们是否要归还刚刚申请的空间呢?答案是不要,因为消费者还没有消费,如果归还了生产者再次申请这个空间那么数据就会把之前没消费的覆盖了。所以我们要让data++,因为我们生产了一个数据那么数据就多了一个,所以是V(data),只有这样消费者才能知道队列有数据那么消费者就可以申请信号量了。

当data变成1后,消费者就可以进行P操作了也就是申请资源了,看下图:

当我们的消费者消费后那么这个空间一定是要被归还的,所以我们要让空间信号量++,保证下次生产者申请空间信号量的时候可以申请成功。

以上就是基于环形队列的生产者消费者模型。


总结

我们下一篇会主要讲解如何实现基于环形队列的生产者消费者模型,而要进行实现理论知识肯定要掌握,所以大家在实现之前一定要想明白环形队列的生产者消费者模型的原理。

标签: c++ 后端 linux

本文转载自: https://blog.csdn.net/Sxy_wspsby/article/details/131022495
版权归原作者 朵猫猫. 所有, 如有侵权,请联系我们删除。

“【Linux】生产者消费者模型代码实现和信号量”的评论:

还没有评论