0


初识Linux · 编写生产消费模型(2)


前言:

前文我们介绍了基于阻塞队列实现生产消费模型,使用阻塞队列实现生产消费模型中,我们学习到了pthread_cond_wait的第二个参数的重要性,不仅会解锁,此时锁被其他人持有,当条件满足的时候,就重新竞争锁,所以在pthread_cond_wait函数这里是不会存在死锁的。

第二个重要的条件是while,首先不说函数等待失败,直接造成错误的情况,如果多个消费者同时在等待,但是只有一把锁,这种情况就需要重新判断,所以使用的while,以上是前文两个比较重要的讨论。

但是纵观全文,我们发现基于阻塞队列实现生产消费模型的时候,是必须要对条件是否满足进行判断的,于是就有人思考,可不可以让其他东西代替我们判断条件是否满足呢?毕竟我们这里代码量小,不容易出错,对于代码量的情况来说的话,就不好说了,所以我们应该借助什么东西呢?

答案是信号量,对于信号量存在的PV操作,以及信号量是将整个资源划分成一个一个的小部分,我们在进程间通信部分已经介绍了,这里我们不再花费更多的时间介绍信号量,但是我们有必要认识一下信号量的接口,基于信号量编写生产消费模型我们可以尝试使用环形队列,我们使用vector来模拟。

更多细节就直接进入主题吧!


RingQueue编写生产消费模型

认识接口

对于信号量的接口,Ubuntu系统也可以可以man到的,我们需要用到的接口有sem_init,sem_wait和sem_post,sem_destroy,这是在POSIX里面的信号量的操作,而不是system V的操作,两者要分清楚,那么对应的头文件是semaphore.h文件:

  1. int sem_init(sem_t *sem, int pshared, unsigned int value);
  2. int sem_wait(sem_t *sem);
  3. int sem_post(sem_t *sem);
  4. int sem_destroy(sem_t *sem);

其中稍微复杂一点就是sem_init函数了,其他的多简单,有了线程学习之前的基础,我们基本上可以直接使用了。sem_init函数的第一个参数是sem_t类型的,和phtread_t的一样,第二个参数我们直接设置为0,这个参数决定的是线程间共享信号量还是进程间共享信号量,0代表的线程间共享,第三个参数就是申请多少个信号量。

对于信号量来说,在System -V里面,我们简短的介绍了pv操作,对于P操作相当于信号量--,对于V操作相当于信号量++,而因为是对临界资源的访问,所以这两个操作应该是原子的。

对于以上函数的返回值都是成功返回0,失败返回-1,并且错误码被设置。

可是为什么使用以上的函数就可以不用判断条件是否满足了呢?

  1. sem_wait()函数- 功能:相当于P操作,用于等待信号量变为正数(即请求资源)。如果信号量的值为0,则调用线程将被阻塞,直到信号量的值大于0为止。- 参数:包括指向信号量对象的指针(sem)。- 返回值:成功时返回0,失败时返回-1并设置errno。
  2. sem_post()函数- 功能:相当于V操作,用于释放信号量(即释放资源)。信号量的值将增加1。如果有任何线程在等待该信号量,则其中一个线程将被唤醒。- 参数:包括指向信号量对象的指针(sem)。- 返回值:成功时返回0,失败时返回-1并设置errno。

因为PV操作的函数,会自动判断信号量,或者说条件是否满足,如果满足,那么就操作线程。

以上是对于sem_*函数的介绍。

开始编写

同前文的blockqueue一样,我们先来确定成员变量应该有谁?

首先,我们既然是基于环形队列和信号量编写的,那么生产者和消费者的位置,我们应该知道吧?那么就应该有两个变量用来表示位置。对于位置来说,后续操作肯定是免不了%操作的,虽然有PV操作,但是我们应该防止越界。

其次,信号量的变量肯定要有吧?在构造函数和析构函数的时候初始化 + 析构就可以了。可是我们应该引入几个信号量呢?在最开始生产者生产的时候,消费者一个信号量都不能消费吧?那么这不就是初识信号量为0吗?当生产者进行了V操作之后,消费者的信号量+1(重点),消费者消费了同理,所以应该有两个信号量。

最后,对于锁来说,我们前文加锁是为了防止对于临界资源的访问出错,这里需要加吗?当然要加了,对于环形队列的访问难道不是临界资源吗?当然是了,所以同样需要锁,可是需要几把锁呢?一把锁吗?如果只用一把锁,那不就是基于阻塞队列编写的吗?(只能有一个人操作)这里既然有了信号量的加持,都有人帮我们判断条件了,我们不妨设计两把锁,因为生产者和消费者是并发进行的,要满足消费者和消费者,生产者和生产者之间的关系。即互斥。对于生产者和消费者之间,我们都不用担心互斥,信号量已经帮我们做了,我们只要保证同步即可。

  1. const int default_cap = 5;
  2. template<typename T>
  3. class RingQueue
  4. {
  5. public:
  6. private:
  7. std::vector<T> _ring_queue;
  8. int _max_cap;
  9. int _c_step;
  10. int _p_step;
  11. sem_t _data_sem;
  12. sem_t _space_sem;
  13. pthread_mutex_t _c_mutex;
  14. pthread_mutex_t _p_mutex;
  15. };

那么就是正常的构造函数和析构函数:

  1. const int default_cap = 5;
  2. template<typename T>
  3. class RingQueue
  4. {
  5. public:
  6. RingQueue(int max_cap = default_cap)
  7. : _max_cap(max_cap), _c_step(0), _p_step(0), _ring_queue(max_cap)
  8. {
  9. sem_init(&_data_sem, 0, 0);
  10. sem_init(&_space_sem, 0, _max_cap);
  11. pthread_mutex_init(&_c_mutex, nullptr);
  12. pthread_mutex_init(&_p_mutex, nullptr);
  13. }
  14. ~RingQueue()
  15. {
  16. sem_destroy(&_data_sem);
  17. sem_destroy(&_space_sem);
  18. pthread_mutex_destroy(&_c_mutex);
  19. pthread_mutex_destroy(&_p_mutex);
  20. }
  21. private:
  22. std::vector<T> _ring_queue;
  23. int _max_cap;
  24. int _c_step;
  25. int _p_step;
  26. sem_t _data_sem;
  27. sem_t _space_sem;
  28. pthread_mutex_t _c_mutex;
  29. pthread_mutex_t _p_mutex;
  30. };

有了析构和构造,我们现在只需要关心pop和push了,其中我们不妨简单封装一下PV操作的函数:

  1. private:
  2. void P(sem_t& sem)
  3. {
  4. sem_wait(&sem);
  5. }
  6. void V(sem_t& sem)
  7. {
  8. sem_post(&sem);
  9. }

对于push操作,因为访问了临界资源,所以一定要加锁,加锁之后,生产者的位置需要++,并且要保证++之后不会被超出队列的总长度,那么就要模运算,就没了:

  1. void Push(const T& in)
  2. {
  3. P(_space_sem);
  4. pthread_mutex_lock(&_p_mutex);
  5. _ring_queue[_p_step] = in;
  6. _p_step++;
  7. _p_step %= _max_cap;
  8. pthread_mutex_unlock(&_p_mutex);
  9. V(_data_sem);
  10. }

对于PV操作一定是二者都要同时进行的,不会只执行单独的一个,都是全部执行。

对于pop操作一样的:

  1. void Pop(T* out)
  2. {
  3. P(_data_sem);
  4. pthread_mutex_lock(&_c_mutex);
  5. *out = _ring_queue[_c_step];
  6. _c_step++;
  7. _c_step %= _max_cap;
  8. pthread_mutex_unlock(&_c_mutex);
  9. V(_space_sem);
  10. }

对于头文件RingQueue.hpp的编写就结束了,我们在主函数部分编写测试代码试试:

  1. void *Consumer(void *args)
  2. {
  3. RingQueue<int> *c = static_cast<RingQueue<int> *>(args);
  4. while (true)
  5. {
  6. int out = 0;
  7. c->Pop(&out);
  8. std::cout << "Consumer pop data-> " << out << std::endl;
  9. //sleep(1);
  10. }
  11. }
  12. void *Productor(void *args)
  13. {
  14. RingQueue<int> *q = static_cast<RingQueue<int> *>(args);
  15. while (true)
  16. {
  17. int in = rand() % 100 + 1;
  18. q->Push(in);
  19. std::cout << "Productor push data-> " << in << std::endl;
  20. // sleep(1);
  21. }
  22. }
  23. int main()
  24. {
  25. srand(time(nullptr) ^ getpid());
  26. pthread_t c, p;
  27. RingQueue<int> *rq = new RingQueue<int>;
  28. pthread_create(&c, nullptr, Consumer, (void *)rq);
  29. pthread_create(&p, nullptr, Productor, (void *)rq);
  30. pthread_join(c, nullptr);
  31. pthread_join(p, nullptr);
  32. return 0;
  33. }

以上是main函数的编写,实际上,这份代码也支持多生产多消费,毕竟锁在那里,那么以上就是环形队列编写生产消费模型的介绍。


感谢阅读!

标签: linux 运维 服务器

本文转载自: https://blog.csdn.net/2301_79697943/article/details/144345057
版权归原作者 _lazy. 所有, 如有侵权,请联系我们删除。

“初识Linux · 编写生产消费模型(2)”的评论:

还没有评论