0


【Linux | C++ 】生产者消费者模型(Linux系统下C++ 代码模拟实现)

在这里插入图片描述

阅读导航

引言

多线程编程中的同步问题是一个普遍存在的难点,为了解决这些问题,开发者们设计出了各种同步机制,如条件变量、信号量、互斥锁等。生产者消费者模型是一个经典案例,它涉及到两类线程:生产者和消费者。本文将介绍如何使用条件变量来实现生产者消费者模型,帮助读者更好地理解多线程编程中的同步机制和技术。

一、生产者消费者问题

生产者线程负责生产数据或物品,并将它们放入一个共享缓冲区中。而消费者线程负责从缓冲区中获取这些数据或物品,并进行相应的处理。在这个过程中,需要保证生产者和消费者之间的正确协作和数据安全,以避免数据竞争和不可预测的结果。

为了解决这个问题,我们需要使用同步机制来协调两种类型的线程之间的操作。最常见的同步机制包括条件变量、信号量、互斥锁等。这些机制可以保证线程之间的正确协作和数据安全,避免数据竞争和死锁等问题的发生。
在这里插入图片描述

在生产者消费者问题中,同步机制的主要作用是保证缓冲区的数据安全和正确性。当缓冲区已满时,生产者线程需要等待一段时间,直到缓冲区有足够的空间可以放置新数据;而当缓冲区为空时,消费者线程需要等待一段时间,直到缓冲区有新数据可以获取。这种等待和通知的机制可以使用条件变量来实现。

🍁将生产者消费者模型比喻为超市的顾客和供货商

当我们将生产者消费者模型比喻为超市的顾客和供货商时,可以清晰地理解这一概念。假设超市是一个缓冲区,顾客是消费者,供货商是生产者。供货商不断地向超市提供新货物(产品),而顾客则从超市购买这些货物。在这个过程中,超市需要保证货物的充足和有序销售,而且顾客和供货商之间的操作需要协调。
在这个例子中,生产者不断地往超市里补充货物,当超市库存已满时,供货商需要等待一段时间,直到有空间放入新货物。而消费者则不断地从超市购买货物,当超市库存为空时,顾客需要等待新货物的到来。

⭕通过这个例子,我们可以清晰地看到生产者消费者模型中的关键概念:生产者负责生产物品并放入缓冲区,消费者负责从缓冲区获取物品并进行消费,而缓冲区则需要合理地协调生产者和消费者之间的操作,以避免过度生产或过度消费的情况发生。这种协调工作正是多线程编程中同步机制的核心应用之一

在这里插入图片描述

🚨注意在使用条件变量等同步机制时,需要保证线程之间的正确协作,避免死锁和饥饿等问题的发生。同时,还需要考虑性能优化等问题,以提高程序的效率和响应速度

二、C++ queue模拟阻塞队列的生产消费模型(伪代码)

以下是使用C++实现基于

std::queue

std::mutex

的生产者消费者模型的示例代码:

#include<iostream>#include<thread>#include<queue>#include<mutex>#include<condition_variable>

std::queue<int> dataQueue;
std::mutex mtx;
std::condition_variable cv;voidproducer(){for(int i =1; i <=10;++i){
        std::this_thread::sleep_for(std::chrono::milliseconds(500));// 模拟生产数据的耗时操作{
            std::lock_guard<std::mutex>lock(mtx);
            dataQueue.push(i);
            std::cout <<"Produced: "<< i << std::endl;}

        cv.notify_one();// 通知消费者线程有新数据可用}}voidconsumer(){while(true){
        std::unique_lock<std::mutex>lock(mtx);// 使用条件变量等待,直到有新数据可用
        cv.wait(lock,[]{return!dataQueue.empty();});int num = dataQueue.front();
        dataQueue.pop();
        std::cout <<"Consumed: "<< num << std::endl;

        lock.unlock();if(num ==10){break;// 结束消费者线程,当消费到数字10时退出}}}intmain(){
    std::thread producerThread(producer);
    std::thread consumerThread(consumer);

    producerThread.join();
    consumerThread.join();return0;}

在这个示例中,生产者线程将数字从1到10放入

std::queue

中,而消费者线程从

std::queue

中取出这些数字进行消费。通过使用

std::mutex

std::condition_variable

,我们实现了线程之间的同步和通信。

生产者线程使用

std::lock_guard<std::mutex>

锁住互斥量,并将数据放入队列后通知消费者线程。消费者线程在等待条件变量时会解锁互斥量,以允许其他线程访问数据队列。当有新数据可用时,消费者线程被唤醒,并继续处理数据。

三、RAII风格的加锁方式

1. 简介

RAII(Resource Acquisition Is Initialization)是一种C++编程风格,通过在对象的构造函数中获取资源,在析构函数中释放资源,从而实现资源的自动管理。在多线程编程中,RAII可以用于实现加锁和解锁的自动管理,确保锁的正确释放,避免忘记手动解锁而导致的死锁或资源泄漏。

2. 示例

#include<iostream>#include<thread>#include<mutex>classLockGuard{public:explicitLockGuard(std::mutex& mtx):mutex(mtx){
        mutex.lock();}~LockGuard(){
        mutex.unlock();}private:
    std::mutex& mutex;};

std::mutex mtx;voidsomeFunction(){
    LockGuard lock(mtx);// 在作用域中创建LockGuard对象,自动加锁// 执行需要加锁保护的操作
    std::cout <<"Critical section"<< std::endl;// 当LockGuard对象离开作用域时,会自动调用析构函数解锁}intmain(){
    std::thread thread1(someFunction);
    std::thread thread2(someFunction);

    thread1.join();
    thread2.join();return0;}

在这个示例中,我们定义了一个名为

LockGuard

的RAII类,它在构造函数中获取一个

std::mutex

的引用,并在析构函数中调用

unlock()

来解锁互斥量。在

someFunction()

中,我们通过创建

LockGuard

对象来实现加锁和解锁操作。当

LockGuard

对象离开作用域时,其析构函数会自动被调用,从而释放互斥量。

通过使用RAII风格的加锁方式,我们可以确保在进入临界区之前加锁,在离开临界区之后自动解锁,避免了手动控制加锁和解锁操作可能带来的错误。同时,由于RAII对象的生命周期与作用域相对应,因此可以确保在任何情况下都会正确释放资源,即使在函数发生异常或提前返回时也不例外。这种方式简化了代码,提高了程序的可靠性和可读性。

四、基于Linux操作系统使用C++代码,采用RAII风格的加锁方式模拟“生产者消费者模型”

⭕Makefile文件

cp:ConProd.ccg++-o$@$^-std=c++11-lpthread.PHONY:cleanclean:rm-fcp

⭕ . h 头文件

✅lockGuard.h

#pragmaonce#include<iostream>#include<pthread.h>classMutex{public:Mutex(pthread_mutex_t *mtx):pmtx_(mtx){}// 加锁操作voidlock(){
        std::cout <<"要进行加锁"<< std::endl;pthread_mutex_lock(pmtx_);}// 解锁操作voidunlock(){
        std::cout <<"要进行解锁"<< std::endl;pthread_mutex_unlock(pmtx_);}~Mutex(){}private:
    pthread_mutex_t *pmtx_;// 互斥锁指针};// RAII风格的加锁方式classlockGuard{public:lockGuard(pthread_mutex_t *mtx):mtx_(mtx){
        mtx_.lock();// 构造时进行加锁操作}~lockGuard(){
        mtx_.unlock();// 析构时进行解锁操作}private:
    Mutex mtx_;// 互斥锁对象};

✅BlockQueue.h

#pragmaonce#include<iostream>#include<queue>#include<mutex>#include<pthread.h>#include"lockGuard.h"constint gDefaultCap =5;// 队列默认容量template<classT>classBlockQueue{private:boolisQueueEmpty()// 判断队列是否为空{return bq_.size()==0;}boolisQueueFull()// 判断队列是否已满{return bq_.size()== capacity_;}public:BlockQueue(int capacity = gDefaultCap):capacity_(capacity){// 初始化互斥锁和条件变量pthread_mutex_init(&mtx_,nullptr);pthread_cond_init(&Empty_,nullptr);pthread_cond_init(&Full_,nullptr);}voidpush(const T &in)// 生产者线程调用此函数向队列中添加元素{
        lockGuard lockgrard(&mtx_);// 自动调用构造函数,对互斥锁进行加锁while(isQueueFull())// 如果队列已满,则阻塞当前线程,等待队列有空闲位置pthread_cond_wait(&Full_,&mtx_);

        bq_.push(in);// 将元素添加到队列尾部pthread_cond_signal(&Empty_);// 对等待在 Empty_ 上的线程发送信号,表示队列非空}voidpop(T *out)// 消费者线程调用此函数从队列中取出元素{
        lockGuard lockguard(&mtx_);// 自动调用构造函数,对互斥锁进行加锁while(isQueueEmpty())// 如果队列为空,则阻塞当前线程,等待队列有元素pthread_cond_wait(&Empty_,&mtx_);*out = bq_.front();// 取出队头元素
        bq_.pop();// 将元素从队列中删除pthread_cond_signal(&Full_);// 对等待在 Full_ 上的线程发送信号,表示队列未满}~BlockQueue(){// 销毁互斥锁和条件变量pthread_mutex_destroy(&mtx_);pthread_cond_destroy(&Empty_);pthread_cond_destroy(&Full_);}private:
    std::queue<T> bq_;// 阻塞队列int capacity_;// 容量上限
    pthread_mutex_t mtx_;// 通过互斥锁保证队列安全
    pthread_cond_t Empty_;// 用它来表示队列是否空的条件
    pthread_cond_t Full_;// 用它来表示队列是否满的条件};

✅Task.h

#pragmaonce#include<iostream>#include<functional>typedef std::function<int(int,int)> func_t;classTask{public:// 默认构造函数Task(){}// 构造函数,初始化任务的参数和可调用对象Task(int x,int y, func_t func):x_(x),y_(y),func_(func){}// 重载函数调用运算符,用于执行任务intoperator()(){returnfunc_(x_, y_);}public:int x_;// 任务的参数 xint y_;// 任务的参数 y
    func_t func_;// 可调用对象,接受两个整数并返回一个整数};

⭕ . cpp 文件

✅ConProd.cpp

#include"BlockQueue.h"#include"Task.h"#include<pthread.h>#include<unistd.h>#include<ctime>// 定义一个加法函数,用于任务的处理intmyAdd(int x,int y){return x + y;}// 消费者线程函数,从阻塞队列中获取任务并完成任务void*consumer(void*args){// 将参数转化为阻塞队列的指针
    BlockQueue<Task>*bqueue =(BlockQueue<Task>*)args;while(true){// 获取任务
        Task t;
        bqueue->pop(&t);// 完成任务,并输出结果
        std::cout <<pthread_self()<<" consumer: "<< t.x_ <<"+"<< t.y_ <<"="<<t()<< std::endl;}returnnullptr;}// 生产者线程函数,制作任务并将任务加入阻塞队列void*productor(void*args){// 将参数转化为阻塞队列的指针
    BlockQueue<Task>*bqueue =(BlockQueue<Task>*)args;while(true){// 制作任务int x =rand()%10+1;usleep(rand()%1000);int y =rand()%5+1;
        Task t(x, y, myAdd);// 生产任务,并输出提示信息
        bqueue->push(t);
        std::cout <<pthread_self()<<" productor: "<< t.x_ <<"+"<< t.y_ <<"=?"<< std::endl;// 限制生产者的速度,以便观察阻塞队列的功能sleep(1);}returnnullptr;}intmain(){// 随机数种子初始化srand((uint64_t)time(nullptr)^getpid()^0x32457);// 创建一个阻塞队列
    BlockQueue<Task>*bqueue =newBlockQueue<Task>();// 创建两个消费者线程和两个生产者线程
    pthread_t c[2],p[2];pthread_create(c,nullptr, consumer, bqueue);pthread_create(c +1,nullptr, consumer, bqueue);pthread_create(p,nullptr, productor, bqueue);pthread_create(p +1,nullptr, productor, bqueue);// 等待所有线程结束pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);// 释放阻塞队列内存delete bqueue;return0;}

温馨提示

感谢您对博主文章的关注与支持!如果您喜欢这篇文章,可以点赞、评论和分享给您的同学,这将对我提供巨大的鼓励和支持。另外,我计划在未来的更新中持续探讨与本文相关的内容。我会为您带来更多关于Linux以及C++编程技术问题的深入解析、应用案例和趣味玩法等。如果感兴趣的话可以关注博主的更新,不要错过任何精彩内容!

再次感谢您的支持和关注。我们期待与您建立更紧密的互动,共同探索Linux、C++、算法和编程的奥秘。祝您生活愉快,排便顺畅!
在这里插入图片描述

标签: linux c++ java

本文转载自: https://blog.csdn.net/m0_75215937/article/details/134840744
版权归原作者 Yawesh_best 所有, 如有侵权,请联系我们删除。

“【Linux | C++ 】生产者消费者模型(Linux系统下C++ 代码模拟实现)”的评论:

还没有评论