0


Linux生产者消费者模型之阻塞队列

一、生产者消费者模型

1、例子引入

我们在日常生活中,一般都是通过超市,集市等场所,来购买日常用品,而不会直接向生产商进行购买。超市则会统一向各个生产商批发商品,然后售卖给人们。

如果我们直接去供货商那里买东西,那我们只会要很少的商品,供货商只能给你单独生产,对于供货商来说生产的成本就太大了,所以有了交易场所这个中间媒介的存在,其目的就是为了集中需求,分发产品。

消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,生产商还可以继续生产,提供给超市,当供货商不再生产的时候消费者还能从超市买得到!这样生产和消费就能进行解耦了。而我们把临时的存储商品的场所称为缓冲区。

2、生产者消费者模型

生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接交易,而通过一个容器来进行交易(超市等交易场所),所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者直接要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区。这个容器实际上就是用来给生产者和消费者解耦的。

而从编码的角度,每一个消费者我们可以看作一个消费者线程,每一个生产者我们也可以看作一个生产者线程。所有的消费者和生产者都要访问同一个交易场所,于是交易场所就是一个临界资源!

在编码的角度下,模型特点如下:

1、三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系和同步关系)。
2、两种角色: 生产者和消费者。(通常由线程承担)
3、一个交易场所: 通常指的是内存中的一段缓冲区。

编码思路:我们主要的目标其实就是维护上面的三个特点:

1、生产者和生产者之间,消费者和消费者之间,生产者和消费者之间,这三个关系它们都是存在互斥的:因为交易场所可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。其中,所有的生产者和消费者都会竞争式的申请锁。

2、生产者和消费者之间还有同步关系:如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者就会停止生产,而如果让消费者一直消费,那么当容器当中的数据被消费完后,消费者就会停止消费。

消费者和生产者会竞争同一把锁。而必定会有一个线程的竞争力特别强,比如是某一个生产者线程,它在生产满资源后,它仍然能够去申请锁,访问临界资源,然后释放锁,这样其他的生产者线程申请不到锁,无法生产,且消费者线程也无法消费了,这样就会引起饥饿问题,是非常低效的。所以,我们应该让生产者和消费者访问该容器时具有一定的顺序性,这就要使用我们之前所讲到的条件变量了,这就是让线程之间同步。

二、基于BlockQueue的生产者消费者模型

阻塞队列(BlockQueue):常用于作为生产者消费者模型中交易场所的数据结构。

队列为空时,消费者无法取数据,直到阻塞队列被放入数据。队列为满时,生产者不能生产数据,直到有数据被取出。

1、单生产单消费

BlockQueue的实现:BlockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include<time.h>

using namespace std;
#define G_NUM 5

template <class T>
class BlockQueue
{
public:
    bool full()
    {
        return bq.size() == capacity;
    }

    bool empty()
    {
        return bq.size() == 0;
    }

    BlockQueue(const int &capacity_ = G_NUM)
        : capacity(capacity_)
    {
        pthread_mutex_init(&mtx_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
    }

    void push(const T &in)
    {
        pthread_mutex_lock(&mtx_);
        while (full())
            pthread_cond_wait(&p_cond_, &mtx_);

        bq.push(in);
        pthread_cond_signal(&c_cond_);

        pthread_mutex_unlock(&mtx_);
    }

    void pop(T *out)
    {
        pthread_mutex_lock(&mtx_);
        while (empty())
            pthread_cond_wait(&c_cond_, &mtx_);

        *out = bq.front();
        bq.pop();
        pthread_cond_signal(&p_cond_);

        pthread_mutex_unlock(&mtx_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mtx_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }

private:
    queue<T> bq;            // 阻塞队列
    int capacity;           // 容量
    pthread_mutex_t mtx_;   // 锁
    pthread_cond_t c_cond_; // 消费者条件变量(消费者等待的队列)是否为空
    pthread_cond_t p_cond_; // 生产者条件变量(生产者等待的队列)是否为满
};

在主函数中我们创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。ConProd.cc:

#include "BlockQueue.hpp"

void *consmer(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    while (true)
    {
        int a = 0;
        bq->pop(&a);
        cout << "消费者:" << a << endl;
        sleep(1);
    }
    return nullptr;
}

void *productor(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    int a = 0;
    while (true)
    {
        bq->push(a);
        cout << "生产者:" << a << endl;
        a++;
    }
    return nullptr;
}

int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, consmer, (void *)bq);
    pthread_create(&p, nullptr, productor, (void *)bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    return 0;
}

运行结果:

说明:

1、我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。

当阻塞队列满了的时候,生产者线程就应该在p_cond_条件变量下进行等待,直到阻塞队列中有空间时再将其唤醒,继续生产。

当阻塞队列为空的时候,消费者线程就应该在c_cond_条件变量下进行等待,直到阻塞队列中有新的数据时再将其唤醒,继续消费。

2、当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在c_cond_条件变量下进行等待,因此当生产者生产完数据后需要唤醒在c_cond_条件变量下等待的消费者线程。
同样的,当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在p_cond_条件变量下进行等待,因此当消费者消费完数据后需要唤醒在p_cond_条件变量下等待的生产者线程。

3、判断是否满足生产消费条件时不能用if,而应该用while:因为pthread_cond_wait函数有可能调用失败,意味着线程没有被成功阻塞,调用失败后该执行流就会继续往后执行。而此时如果队列为满(队列大小是一定的),在生产者函数中,你继续往后执行的话,就会越界访问。为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件。

2、多生产多消费并随机派发任务

由于我们将BlockQueue当中存储的数据进行了模板化,此时就可以让BlockQueue当中存储其他类型的数据。比如下面我们让其存储自定义类型的数据。

task.hpp:我们自行设定一个加法计算任务,该任务由多个生产者派发,交给多个消费者去执行。

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <time.h>

class task
{
public:
    task()
    {
    }
    task(int x, int y)
        : x_(x), y_(y)
    {
    }

    int operator()()
    {
        return x_ + y_;
    }

private:
    int x_;
    int y_;
};

ConProd.cc:

#include "BlockQueue.hpp"
#include "task.hpp"

void *consmer(void *arg)
{
    BlockQueue<task> *bq = (BlockQueue<task> *)arg;
    while (true)
    {
        task k;
        bq->pop(&k);
        cout << "消费者: x+y=" << k() << endl;
        sleep(1);
    }
    return nullptr;
}

void *productor(void *arg)
{
    BlockQueue<task> *bq = (BlockQueue<task> *)arg;
    while (true)
    {
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        task k(x, y);
        bq->push(k);
        cout << "生产者: x+y=?"
             << "  "
             << "x:" << x << "  "
             << "y:" << y << endl;
    }
    return nullptr;
}

int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ 12323);
    BlockQueue<task> *bq = new BlockQueue<task>();
    pthread_t c[2], p[3];
    for (int i = 0; i < 2; i++)
        pthread_create(c + i, nullptr, consmer, (void *)bq);
    for (int i = 0; i < 2; i++)
        pthread_create(p + i, nullptr, productor, (void *)bq);

    for (int i = 0; i < 2; i++)
        pthread_join(c[i], nullptr);
    for (int i = 0; i < 2; i++)
        pthread_join(p[i], nullptr);

    return 0;
}

运行结果:

三、总结

生产者消费者模型是高效的。其高效体现在一个线程拿出来任务可能正在做处理,它在做处理的同时,其他线程可以继续从队列中拿任务,继续处理,所以其高效是我们可以让多个线程并发的同时处理多个任务! 生产者线程也可以不断地并发地派发任务。


本文转载自: https://blog.csdn.net/zdlynj/article/details/136606391
版权归原作者 dbln 所有, 如有侵权,请联系我们删除。

“Linux生产者消费者模型之阻塞队列”的评论:

还没有评论