0


【Linux】生产者消费者模型——阻塞队列BlockQueue

作者:დ旧言~
座右铭:松树千年终是朽,槿花一日自为荣。

目标:理解【Linux】生产者消费者模型——阻塞队列BlockQueue。

毒鸡汤:有些事情,总是不明白,所以我不会坚持。早安!

专栏选自:Linux初阶

望小伙伴们点赞👍收藏✨加关注哟💕💕

​​

🌟前言

Linux有两个重要的模型,一个是生产者消费者模型——阻塞队列BlockQueue,另一个则是生产者消费者模型——环形队列RingQueue。今天我们学习其中一个模型:【Linux】生产者消费者模型——阻塞队列BlockQueue。

⭐主体

学习【Linux】生产者消费者模型——阻塞队列BlockQueue咱们按照下面的图解:

​🌙 生产者消费者模型


**💫 **生产者消费者模型的概念

概念:

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不进行直接通讯,而通过这个容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器中,消费者也不用找生产者要数据,而是直接从容器也就是阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

  • 这个阻塞队列就是用来给生产者和消费者解耦的
  • 如果缓冲区已经满了,则生产者线程阻塞;
  • 如果缓冲区为空,那么消费者线程阻塞。

图解:

**💫 **生产者消费者模型的特点

生产者消费者是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系:生产者和生产者(互斥关系),生产者和消费者(互斥关系),生产者和消费者(互斥关系,同步关系)
  • 两种角色:生产者和消费者(通常由进程或线程承担)
  • 一个交易场所:通常指的是内存中的一段缓冲区。

生产者和生产者,消费者和消费者,生产者和消费者,它们之间为什么会存在互斥关系?

  • 介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。
  • 其中,所有的生产者和消费者都会竞争式的申请锁,因此生产者和生产者,消费者和消费者,生产者和消费者之间都存在互斥关系。

生产者和消费者之间为什么会存在同步关系?

  • 如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。
  • 反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。
  • 虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消。

注意:

互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。

**💫 **生产者消费者模型优点

  • 解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
  • 支持并发:生产者直接调用消费者的某个方法,还有另一个弊端,由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直在等待。玩意消费者处理数据很慢,生产者就会白白浪费时间,使用了生产者/消费者模型后,生产者和消费者可以是两个独立的开发主体(常见并发类型有进程和线程两种)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据,基本上不用依赖消费者的处理速度。
  • 支持忙闲不均:缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造块的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造商速度慢下来,消费者再慢慢处理。

​🌙 基于Blockqueue的生产和消费模型

阻塞队列:阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

  • 阻塞队列为空时,从阻塞队列中获取元素的线程将被阻塞,直到阻塞队列被放入元素。
  • 阻塞队列已满时,往阻塞队列放入元素的线程将被阻塞,直到有元素被取出。

图解:

**💫 **单生产单消费计算


1.随机数

下面以单生产单消费为例子:

//BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int gmaxcap =5;
template <class T>
class BlockQueue
{
public:
    BlockQueue(const int&maxcap = gmaxcap):_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while(is_full())
        {
            pthread_cond_wait(&_pcond,&_mutex);//因为生产条件不满足无法生产,此时我们的生产者进行等待
        }
        _q.push(in);
        //pthread_cond_signal:这个函数可以放在临界区内部,也可以放在外部
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
        //pthread_cond_signal(&_ccond);
    }
    void pop(T*out)//输出型参数,*,输入输出型:&
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty(){return _q.empty();}
    bool is_full(){return _q.size()==_maxcap;}
private:
    std::queue<T> _q;
    int _maxcap;//队列上限
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者条件变量
    pthread_cond_t _ccond;//消费者条件变量
};

//mainCp.cc
void* consumer(void * bq_)
{
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(bq_);
    while(true)
    {
        int data;
        bq->pop(&data);
        std::cout<<"消费数据: "<<data<<std::endl;
        sleep(1);
    }
    return nullptr;
}
void*productor(void*bq_)
{
    BlockQueue<int>*bq = static_cast<BlockQueue<int>*>(bq_);
    while(true)
    {
        int data = rand()%10+1;
        bq->push(data);
        std::cout<<"生产数据: "<<data<<std::endl;
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<int> *bq = new  BlockQueue<int>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    return 0;
}

总结分析:

pthread_cond_wait函数的第二个参数必须是我们正在使用的互斥锁,满了就会进行等待,如果像之前一样把锁拿走,那么其他线程就无法访问共享资源。

  • a.pthread_cond_wait:该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起
  • b.pthread_cond_wait:该函数在被唤醒返回的时候,会自动的重新获取你传入的锁

pthread_cond_signal伪唤醒:判断的问题:假设生产者有10个,消费者只有一个,消费一下数据,如果是pthread_cond_broadcast是把10个线程同时唤醒,可是只需要生产一个数据,而同时把10个线程唤醒而如果是if判断的时候push就会出问题了。

如果生产者生产慢,消费者消费快:生产一个消费一个,而且消费的都是最新的数据。

如果生产者生产快,消费者消费慢:稳定后,消费一个生产一个。

2.计算器任务Task

Task.hpp:包含func_t的回调函数,这个函数就是进行数据计算的回调函数:

#pragma once
#include <iostream>
#include <functional>
#include <cstdio>
class Task
{
    using func_t = std::function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
    :_x(x),_y(y),_op(op),_callback(func)
    {}
    std::string operator()()
    {
        int result = _callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int gmaxcap =500;
template <class T>
class BlockQueue
{
public:
    BlockQueue(const int&maxcap = gmaxcap):_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }

    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while(is_full())
        {
            pthread_cond_wait(&_pcond,&_mutex);
        }
        _q.push(in);
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }

    void pop(T*out)
    {
        pthread_mutex_lock(&_mutex);
        if(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty(){return _q.empty();}
    bool is_full(){return _q.size()==_maxcap;}
private:
    std::queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;
    pthread_cond_t _ccond;
};

Main.cc:

#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
const std::string oper = "+-*/%"; 
int mymath(int x,int y,char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        break;
    }
    return result;
}
void* consumer(void * bq_)
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(bq_);
    while(true)
    {
        Task t;
        bq->pop(&t);
        std::cout<<"消费任务: "<<t()<<std::endl;
    }
    return nullptr;
}
void*productor(void*bq_)
{
    BlockQueue<Task>*bq = static_cast<BlockQueue<Task>*>(bq_);
    while(true)
    {
        int x = rand()%100+1;
        int y = rand()%10;
        int operCode = rand()%oper.size();
        Task t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"生产任务: "<<t.toTaskString()<<std::endl;  
        sleep(1);
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<Task> *bq = new  BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

3.存储任务

定义结构体BlockQueues封装计算任务的阻塞队列和存储任务的阻塞队列,创建生产者线程,消费者线程,存储线程执行各自的方法:

blockqueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int gmaxcap = 500;
template <class T>
class BlockQueue
{   
public:
    BlockQueue(const int &maxcap = gmaxcap):_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    void push(const T &in)
    {
        pthread_mutex_lock(&_mutex);
        while(is_full())
        {
            pthread_cond_wait(&_pcond, &_mutex); 
        }
        _q.push(in);
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T *out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty())
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        pthread_cond_signal(&_pcond); 
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size() == _maxcap;
    }
private:
    std::queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond; 
    pthread_cond_t _ccond;
};

Task.hpp:

#pragma once

#include <iostream>
#include <string>
#include <cstdio>
#include <functional>

class CalTask
{
    using func_t = std::function<int(int,int,char)>;
    // typedef std::function<int(int,int)> func_t;
public:
    CalTask()
    {}
    CalTask(int x, int y, char op, func_t func)
    :_x(x), _y(y), _op(op), _callback(func)
    {}
    std::string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }
    std::string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

const std::string oper = "+-*/%";

int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            std::cerr << "div zero error!" << std::endl;
            result = -1;
        }
        else
            result = x / y;
    }
        break;
    case '%':
    {
        if (y == 0)
        {
            std::cerr << "mod zero error!" << std::endl;
            result = -1;
        }
        else
            result = x % y;
    }
        break;
    default:
        // do nothing
        break;
    }

    return result;
}

class SaveTask
{
    typedef std::function<void(const std::string&)> func_t;
public:
    SaveTask()
    {}
    SaveTask(const std::string &message, func_t func)
    : _message(message), _func(func)
    {}
    void operator()()
    {
        _func(_message);
    }
private:
    std::string _message;
    func_t _func;
};

void Save(const std::string &message)
{
    const std::string target = "./log.txt";
    FILE *fp = fopen(target.c_str(), "a+");
    if(!fp)
    {
        std::cerr << "fopen error" << std::endl;
        return;
    }
    fputs(message.c_str(), fp);
    fputs("\n", fp);
    fclose(fp);
}

MianCp.cc:

#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <ctime>
#include "blockqueue.hpp"
#include "Task.hpp"
template<class C,class K>
struct BlockQueues
{
    BlockQueue<C> *c_bq;
    BlockQueue<K> *s_bq;
};
void* productor(void*args)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(args))->c_bq;
    while(true)
    {
        int x = rand()%10+1;
        int y = rand()%5+1;
        int operCode = rand()%oper.size();
        CalTask t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"productor thread,生产计算任务: "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}
void* consumer(void*args)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(args))->c_bq;
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(args))->s_bq;
    while(true)
    {
        CalTask t;
        bq->pop(&t);
        std::string result = t();
        std::cout<<"cal thread,完成计算任务: "<<result<<" ... done"<<std::endl;

        SaveTask save(result,Save);
        save_bq->push(save);
        std::cout<<"cal thread,推送存储任务完成..."<<std::endl;
    }
    return nullptr;
}
void*saver(void*args)
{
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(args))->s_bq;
    while(true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        std::cout<<"save thread,保存任务完成..."<<std::endl;
    }
    return nullptr;
}
int main()
{
    srand((unsigned int)time(nullptr)^getpid());
    BlockQueues<CalTask,SaveTask> bqs;
    bqs.c_bq = new BlockQueue<CalTask>();
    bqs.s_bq = new BlockQueue<SaveTask>();
    pthread_t c,p,s;
    pthread_create(&p,nullptr,productor,&bqs);
    pthread_create(&c,nullptr,consumer,&bqs);
    pthread_create(&s,nullptr,saver,&bqs);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

**💫 **多生产多消费

只需要稍微改一改MainCp.cc即可完成多生产多消费,其他文件代码不需要更改

MainCp.cc:

#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "blockqueue.hpp"
#include "Task.hpp"
template<class C,class S>
class BlockQueues
{
public:
    BlockQueue<C>*c_bq;
    BlockQueue<S>*s_bq;
}; 
void*productor(void*bqs_)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(bqs_))->c_bq;
    while(true)
    {
        int x = rand()%100+1;
        int y = rand()%10;
        int operCode = rand()%oper.size();
        CalTask t(x,y,oper[operCode],mymath);
        bq->push(t);
        std::cout<<"productor thread,生产计算任务: "<<t.toTaskString()<<std::endl;
        sleep(1);
    }
    return nullptr;
}
void* consumer(void * bqs_)
{
    BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(bqs_))->c_bq;
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(bqs_))->s_bq;

    while(true)
    {
        CalTask t;
        bq->pop(&t);
        std::string result = t();
        std::cout<<"cal thread,完成计算任务: "<<result<<" ... done"<<std::endl;

        // SaveTask save(result,Save);
        // save_bq->push(save);
        // std::cout<<"cal thread,推送存储任务完成..."<<std::endl;
        // sleep(1);
    }
    return nullptr;
}
void* saver(void* bqs_)
{
    BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask,SaveTask>*>(bqs_))->s_bq;
    while(true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        std::cout<<"save thread,保存任务完成..."<<std::endl;
    }
    return nullptr;
};
int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueues<CalTask,SaveTask> bqs;
    bqs.c_bq = new  BlockQueue<CalTask>();
    bqs.s_bq = new  BlockQueue<SaveTask>();
    pthread_t c[2],p[3];
    pthread_create(p,nullptr,productor,&bqs);
    pthread_create(p+1,nullptr,productor,&bqs);
    pthread_create(p+2,nullptr,productor,&bqs);
    pthread_create(c,nullptr,consumer,&bqs);
    pthread_create(c+1,nullptr,consumer,&bqs);
  
    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr); 
    pthread_join(p[2],nullptr);

    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

​🌙 总结

生产者消费模型高效在哪里?

高效体现在一个线程拿出来任务可能正在做计算,它在做计算的同时,其他线程可以继续从队列中拿,继续做运算,高效并不是体现在从队列中拿数据高效!而是我们可以让一个、多个线程并发的同时计算多个任务!在计算多个任务的同时,并不影响其他线程,继续从队列里拿任务的过程。也就是说,生产者消费者模型的高效:可以在生产之前与消费之后让线程并行执行,不要认为生产者消费模式仅仅只是把任务生产到队列的过程就是生产过程,生产过程:1.拿任务、需要费点劲2.拿到后再放到队列里面整个一体,整个生产的过程;整个消费的过程:不是把任务拿到线程的上下文中就完了,拿到之后还要进行计算或存储这些工作才是消费的过程在生产前和和消费后我们多个线程是可以并发的。

🌟结束语** **

   今天内容就到这里啦,时间过得很快,大家沉下心来好好学习,会有一定的收获的,大家多多坚持,嘻嘻,成功路上注定孤独,因为坚持的人不多。那请大家举起自己的小手给博主一键三连,有你们的支持是我最大的动力💞💞💞,回见。

​​​

标签: linux

本文转载自: https://blog.csdn.net/AAlykk/article/details/139379579
版权归原作者 დ旧言~ 所有, 如有侵权,请联系我们删除。

“【Linux】生产者消费者模型——阻塞队列BlockQueue”的评论:

还没有评论