1.POSIX信号量
1.1.复习信号量
还记得我们在进程间通信那里讲的systemV 信号量吗!可以去这里看看哦:http://t.csdnimg.cn/JbBpE
信号量(semaphore)是一种特殊的工具,主要用于实现 同步和互斥
**在并发编程中,多个线程或进程可能会尝试同时访问同一资源,这可能导致数据的不一致或冲突。****比如说两个进程访问一个共享内存,a进程想往里面写1kb大小的数据,写到一半,b进程想来读取这个共享内存的数据了,那么如果没有别的限制,b进程就直接读走了a已经写到共享内存里面的数据,这样子就导致a要传输的数据不完整了,这个是发到的数据和收到的数据不一致的问题,这种情况在管道是不存在的,因为管道是有同步和互斥机制的**
互斥:任何时刻,只允许一个执行流访问共享资源
为了解决这个问题,我们引入了信号量(Semaphore)的概念。**信号量是一个计数器,用于控制对共享资源的访问。信号量是描述临界资源数量的多少。在Linux中,信号量提供了一种机制,允许多个线程或进程安全地访问共享资源。**
- 信号量的工作原理
信号量的工作原理基于计数器的概念。
**在创建信号量时,我们需要为其设置一个初始值(这个可以自己设置),该值表示可以同时访问共享资源的线程或进程数量。每当一个线程或进程需要访问共享资源时,它会尝试获取信号量。**
如果信号量的值大于0,表示还有可用的资源,线程或进程可以继续执行并访问资源。同时,信号量的值会减1。
如果信号量的值为0,表示所有资源都被占用,线程或进程需要等待,直到其他线程或进程释放资源。
当线程或进程完成对共享资源的访问后,它会释放信号量,即将信号量的值加1。这表示又有可用的资源,等待的线程或进程可以继续执行。
** 每个进程想访问共享资源的一部分的时候,不是直接访问,而是先申请信号量,就像看电影的要先买票一样 **
- 把整个临界资源视作一部分的时候,信号量初始值为1,这个信号量也叫二元信号量(因为它只有0和1两种状态)
- 把整个临界资源视作2部分的时候,信号量初始值为2;
- 把……
- 理解互斥
我们把临界资源的信号量设置为1,这样子只有1个进程能访问到这个临界资源
- 我们现在说要访问临界资源,得先申请信号量,信号量也是临界资源,信号量保护我们的临界资源,那么我们的信号量谁来保护呢?
我们不能直接对信号量直接加减,因为确实是有点不安全,这个我们多线程来讲,为了安全,操作系统提供了PV操作,P操作就是申请信号量,而V操作就是释放信号量,这个就更安全了!!!这两个操作也是原子的,意思就是只有两种状态(做完了或者没做),中间做了什么不知道,中间的状态也不知道,这样子就安全了!!!
- 原子的:只知道最初的状态和最后的状态,中间的状态不知道
1.2.POSIX信号量
好了,复习了system信号量,现在来看看POSIX信号量
POSIX和System V都是可移植的操作系统接口标准,它们都定义了操作系统应该为应用程序提供的接口标准。
- POSIX信号量和System V信号量作用相同,都是用于同步和互斥操作,以达到无冲突的访问共享资源目的。
- System V版本的信号量只适用于实现进程间的通信,而POSIX版本的信号量主要用于实现线程之间的通信。
**也就是说 POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。 **
因为POSIX信号量主要用于实现线程间的同步,所以我们把它放到了多线程这里来讲。
注意:** 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。**
信号量的本质上为:计数器 + PCB等待队列 + 一堆接口(等待接口,唤醒接口)
我们来看看POSIX信号量的结构
计数器:本质上是对资源的计数
- 当执行流获取信号量成功之后,信号量当中的计数器会进行减1操作,当获取失败后,该执行流就会被放到该信号量的PCB等待队列中去。
- 当执行流释放信号量成功之后,信号量当中的计数器会进行加1操作。
又称为基于内存的信号量, 由于其没有名字, 没法通过open操作直接找到对应的信号量, 所以很难直接用于没有关联的两个进程之间。
1.3.POSIX信号量工作过程——PV操作
** 每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。**
注意:** 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。**我们来看看POSIX信号量的结构
信号量的操作包括等待(wait)和发送(post)。
- 等待操作(wait):执行流在申请资源时,如果信号量的值大于0,表示资源可用,进程或线程可以继续执行,并将信号量的值减1。如果信号量的值为0,表示资源不可用,这时申请信号量的进程或线程的task_struct会被放入该信号量的阻塞队列里面,直到信号量的值大于0。(P操作)
- 发送操作(post):释放资源时,将信号量的值加1,并唤醒等待该信号量的阻塞队列里面的进程或线程。这样,其他进程或线程就可以继续执行。(V操作)
也就是说:
信号量的PV操作:
- P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。
- V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。
PV操作必须是原子操作
** 多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。**
**但信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。**
** 注意: 内存当中变量的++、--操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行++、--操作。**
申请信号量失败被挂起等待
当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
什么时候会用到信号量?
- 多线程需要共享资源
- 共享资源可以被局部性访问
- 需要对共享资源的数量做计数统计,控制对共享资源的访问。、
1.4.POSIX信号量分类
POSIX信号量是一个sem_t类型的变量,但POSIX有两种信号量的实现机制:无名信号量和命名信号量。
无名信号量
** POSIX标准只是规定,无名信号量必须被放置在一段可以被多线程或者多进程所共享的内存区域中。**
无名信号量有一个共享属性,分为线程共享属性和进程共享属性两类。
- 一个线程共享属性的无名信号量必须被放置在一个可以被进程中所有线程所共享的内存区域中,比如全局变量。
- 一个进程共享属性的无名信号量必须被放置在共享内存区域,System V通过shmget接口实现共享内存,POSIX通过shm_open接口来实现共享内存区域。
无名信号量在使用之前必须通过sem_init接口进行初始化,之后可以通过sem_wait和sem_post接口进行操作,当不再使用无名信号量,或者放置无名信号的区域被释放之前,用户都应该通过sem_destroy接口来释放无名信号量。
** 有名信号量**
有名信号量通过一个名字来作为标识,名字的格式为"/somename",这个名字的长度为MAX_NAME - 4(NAME_MAX是一个宏定义),信号量名字以'/'为开始,以'\0'字符为结尾,并且中间字符串中不能再有'/'。
两个进程之间可以通过sem_open函数来操作同一个有名信号量,用户可以通过sem_open函数接口创建一个新的有名信号量或者打开一个已有的有名信号量。有名信号量被打开之后,进程或线程(有名信号量一般应用在进程之间的同步控制)可以通过sem_post或者sem_wait接口进行信号量操作。当一个进程不在使用有名信号量后,可以通过sem_close接口来关闭它,当系统中所有的进程都不在使用某个有名信号量后,可以通过sem_unlink接口将它从系统中移除。
总结一下
- 无名信号量只可以在共享内存的情况下,比如实现进程中各个线程之间的互斥和同步,因此无名信号量也被称作基于内存的信号量;
- 命名信号量通常用于不共享内存的情况下,比如进程间通信。
同时,在创建信号量时,根据信号量取值的不同,POSIX信号量还可以分为:
- 二值信号量:信号量的值只有0和1,这和互斥量很类似,若资源被锁住,信号量的值为0,若资源可用,则信号量的值为1;
- 计数信号量:信号量的值在0到一个大于1的限制值之间,该计数表示可用的资源的个数。
**无名信号量和命名信号量的区别 **
有名信号量和无名信号量的差异在于创建和销毁的形式上,但是其他工作一样。
- 无名信号量只能存在于内存中,要求使用信号量的进程必须能访问信号量所在的这一块内存,所以无名信号量只能应用在同一进程内的线程之间(共享进程的内存),或者不同进程中已经映射相同内存内容到它们的地址空间中的线程(即信号量所在内存被通信的进程共享)。意思是说无名信号量只能通过共享内存访问。
- 相反,有名信号量可以通过名字访问,因此可以被任何知道它们名字的进程中的线程使用。
也就是说
- **单个进程中使用 POSIX 信号量时,无名信号量更简单(我们本文就讲这个)。 **
- 多个进程间使用 POSIX 信号量时,有名信号量更简单。
2.POSIX无名信号量接口
其实有名信号量和无名信号量的接口差不多,只是创建和销毁有一点差别。
我们上面说过,** 单个进程中使用 POSIX 信号量时,无名信号量更简单,接下来我们就将学习无名信号的接口。**
我们得先创建我们的信号量——**信号量载体的类型是
sem_t
,我们可以根据这个类型自己定义信号量对象:**
#include<iostream>>
#include<semaphore.h>
using namespace std;
int main()
{
sem_t sem1;//信号量
}
2.1.初始化信号量
初始化信号量的函数叫做sem_init,该函数的函数原型如下:
函数sem_init的作用是**在参数sem指向的地址上初始化一个无名信号量,其value值由参数value指定。 **
参数说明:
- sem:需要初始化的信号量。
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
- value:信号量的初始值(计数器的初始值)。
返回值说明:
- 初始化信号量成功返回0,失败返回-1。
特别注意:参数2pshared指定此无名信号量实在进程之间被共享使用,还是在线程之间被共享使用。
- 如果参数pshared等于0,那么此信号量只能在一个进程内的线程之间共享使用,所以,它应该被实现在可以被所有线程都能访问到的地址上,如全局变量或者动态分配在堆上。
- 如果参数pshared是一个非0值,那么此信号量被多个进程共享使用,它应该被实现在共享内存里,或者父子进程之间使用。
注意:如果使用此函数去初始化一个已经被初始化的信号量,此函数的行为是未被定义的。
2.2.销毁信号量
销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:
函数sem_destroy的作用是销毁由参数sem指向的无名信号量。
** 只有用sem_init初始化的无名信号量才可以用此函数来销毁,并且程序开发人员也应该使用此函数去销毁这个无名信号量。 **
参数说明:
- sem:需要销毁的信号量。
返回值说明:
- 销毁信号量成功返回0,失败返回-1。
2.3.等待信号量(申请信号量)
这个就是我们说的P操作
等待信号量的函数叫做sem_wait,该函数的函数原型如下:
参数说明:
- sem:需要等待的信号量。
返回值说明:
- 等待信号量成功返回0,信号量的值减一。
- 等待信号量失败返回-1,信号量的值保持不变。
其实wait函数不只一个
sem_wait系列操作,即我们所说的PV操作中的P操作,目的是锁住一个信号量。
(1) sem_wait函数将由参数sem指定的信号量减一,即上锁操作。如果信号量的值大于0,那么执行减一操作,并且函数立即返回。如果信号量当前的值为0,那么调用进程会被加入这个信号量的阻塞队列,一直阻塞直到信号量变成大于0(由其它进程执行了sem_post操作)或者被信号中断此调用。 (2)sem_trywait函数同sem_wait函数的作用一样,不同是如果不能立即执行加一操作,则调用进程不会堵塞而是返回一个错误,errno会被设置成EAGAIN。 (3)sem_timedwait函数同sem_wait函数的作用一样,不同是如果不能立即执行加一操作,则调用进程会堵塞一定的时间段,这个时间段由函数参数abs_timeout指定。如果在指定的时间内信号量仍不能被锁住,则函数返回超时错误,errno会被设置成ETIMEDOUT。如果信号量的减一操作可以被立即执行,则此函数永远都不会返回超时错误,并且参数abs_timeout的有效性也不会被检查。
2.4.发布信号量(释放信号量)
与sem_wait相对应的函数就是sem_post,即我们PV操作里面的V操作。
**此函数将sem指向的信号量解锁(加一操作),加一操作后如果信号量的值变成大于0,那么另外一个因为调用sem_wait函数而被堵塞的进程或者线程将会被唤醒并且去执行对信号量的加锁操作。**
发布信号量的函数叫做sem_post,该函数的函数原型如下:
参数说明:
- sem:需要发布的信号量。
返回值说明:
- 发布信号量成功返回0,信号量的值加一。
- 发布信号量失败返回-1,信号量的值保持不变。
3.二元信号量模拟实现互斥功能
** 信号量本质是一个计数器,如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量。**
** 信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。**
例如,下面我们实现一个多线程抢票系统,其中我们用二元信号量模拟实现多线程互斥。
我们在主线程当中创建四个新线程,让这四个新线程执行抢票逻辑,并且每次抢完票后打印输出此时剩余的票数,其中我们用全局变量tickets记录当前剩余的票数,此时tickets是会被多个执行流同时访问的临界资源,**在下面的代码中我们并没有对tickets进行任何保护操作。**
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
int tickets = 2000;
void* TicketGrabbing(void* arg)
{
std::string name = (char*)arg;
while (true){
if (tickets > 0){
usleep(1000);
std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;
}
else{
break;
}
}
std::cout << name << " quit..." << std::endl;
pthread_exit((void*)0);
}
int main()
{
pthread_t tid1, tid2, tid3, tid4;
pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");
pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");
pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");
pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
运行代码后可以看到,线程打印输出剩余票数时出现了票数剩余为负数的情况,这是不符合我们预期的。
下面我们在抢票逻辑当中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
class Sem{
public:
Sem(int num)
{
sem_init(&_sem, 0, num);//创建1个Sem对象的时候就已经把信号量初始化好了
}
~Sem()
{
sem_destroy(&_sem);//销毁一个Sem对象的时候顺便把信号量销毁了
}
void P()//申请信号量
{
sem_wait(&_sem);//P操作
}
void V()//发布信号量
{
sem_post(&_sem);//V操作
}
private:
sem_t _sem;//创建1个信号量
};
Sem sem(1); //二元信号量,创建sem对象的时候就已经把信号量初始化好了
int tickets = 2000;
void* TicketGrabbing(void* arg)
{
std::string name = (char*)arg;
while (true){
sem.P();//当前线程申请信号量,如果申请成功就往下走,不然就得待在信号量的阻塞队列里
if (tickets > 0){
usleep(1000);
std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl;
sem.V();//这个线程抢票完毕,发布信号量,可以让别人去抢了
}
else{
sem.V();//抢票失败,发布信号量,可以让别人去抢了
break;
}
}
std::cout << name << " quit..." << std::endl;
pthread_exit((void*)0);
}
int main()
{
pthread_t tid1, tid2, tid3, tid4;
pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1");
pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2");
pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3");
pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
运行代码后就不会出现剩余票数为负的情况了,因为此时同一时刻只会有一个执行流对全局变量tickets进行访问,不会出现数据不一致的问题。
4.基于环形队列的生产消费模型
4.1.环形队列
环形队列是一种线性数据结构,其操作表现基于先进先出(FIFO)的原则,并通过使用固定大小的数组和两个指针来有效地管理队列的入队和出队操作。在环形队列中,当队尾指针到达数组的末尾时,它不会溢出,而是会循环回到数组的开头,从而形成一个循环。类似地,当队头指针到达数组的开头时,它也会循环到数组的末尾。
** 环形队列的主要特点是它有一个固定的容量,当队列满时,就不能再添加新元素;**当队列空时,就不能再移除元素。通过维护两个指针(通常称为队头指针和队尾指针),我们可以实现循环队列的高效操作。队头指针指向队列中第一个元素的下一个位置(如果队列不为空),而队尾指针指向队列中最后一个元素的位置。
那么如何对环形队列进行判满就成了一个问题
策略一:**多开一个空间,
head
、
tail
位于同一块空间中时,表示当前队列为空;在进行插入、获取数据时,都是对下一块空间中的数据进行操作,因为多开了一块空间,当待生产的数据落在
head
指向的空间时,就表示已经满了**
策略二:**参考阻塞队列,搞一个计数器,当计数器的值为
0
时,表示当前为空,当计数器的值为容量时,表示队列为满**
这两种策略都可以确保 环形队列 正确判空和判满,至于这里肯定是选择策略二,因为 「信号量」 本身就是一个天然的计数器
在 环形队列 中,生产者 和 消费者 关心的资源不一样:**生产者只关心是否有空间放数据,消费者只关心是否能从空间中取到数据**
除非两者相遇,其他情况下生产者、消费者可以并发运行(同时访问环形队列)
**两者错位时正常进行生产消费就好了,但两者相遇时需要特殊处理,也就是处理 空、满 两种情况,这就是 环形队列 的运转模式**
4.2.环形队列的运转模式
这里可以引入一个小游戏,来辅助理解 环形队列 的运转模式
假设存在一个大圆桌,上面摆放了一圈空盘子,可以往上面放苹果,也可以取上面的苹果
张三和李四打算展开一场 苹果追逐赛,张三作为 追逐方,目标是移动并获取盘子中的苹果,李四作为 被追逐方,目标是往盘子中放苹果,并向下一个空盘子移动
注意:这里的移动指顺时针移动,不能跳格,这是游戏核心规则
游戏基本规则:
- 当两者相遇,且圆桌中没有苹果时,被追逐方(李四)先跑,对方(张三)阻塞
- 当两者相遇,且圆桌中全是苹果时,追逐方(张三)先跑,对方(李四)阻塞
- 被追逐方(李四)不能套圈追逐方(张三)
- 同时追逐方(张三)也不能超过被追逐方(李四)
ok,现在游戏开始,张三和李四处于同一块空间中(起点),此时两人处于一种特殊情况中,不能同时进行 苹果拾取/苹果放置,由于是刚开始,作为 被追逐方 的李四理应先走,否则两者就都阻塞了(张三追上李四时的情况与刚开始的情况一致)
所以可以得出结论:环形队列为空时,生产者需要先生产数据,消费者阻塞
李四先跑,边跑边放苹果,此时因为张三还没有追上李四,所以张三也是边跑边拾取苹果,两者展开了激烈的追逐赛(高效率)
在追逐过程中,张三李四都能同时对圆桌中的格子进行操作,这是非常高效的,环形队列不为空、不为满时,生产者、消费者可以同时进行并发操作
游戏进行到白热化阶段,法外狂徒张三一不注意摔了一跤,导致拾取苹果的速度不断减慢,李四见状火力全开,不断放置苹果,很快张三就被李四追上了,此时场上已经摆满了苹果,规定一个盘子只能放置一个苹果,李四无法在放置苹果,只能阻塞等待张三进行苹果拾取
场上摆满苹果的情况对应着 环形队列为满的情况,生产者不能再生产,消费者需要进行消费
ok,游戏到这里就可以结束了,因为已经足够总结出 环形队列 的运作模式了
- 被追逐方(李四) -> 生产者
- 追逐方(张三) ->** 消费者**
- 大圆桌 -> 环形队列
- 空盘 -> 无数据,可生产
- 苹果 -> 有数据,可消费
运作模式
- 环形队列为空时:消费者阻塞,只能由生产者进行生产,生产完商品后,消费者可以消费商品
- 环形队列为满时:生产者阻塞,只能由消费者进行消费,消费完商品后,生产者可以生产商品
- 其他情况:生产者、消费者并发运行,各干各的事,互不影响
张三和李四也就只能在 满、空 时相遇了
忘记张三和李四的小游戏,将 环形队列 的运行模式带入 「生产者消费者模型」
**可以使用 「信号量」 标识资源的使用情况,但生产者和消费者关注的资源并不相同,所以需要使用两个 「信号量」 来进行操作**
- 生产者信号量:标识当前有多少可用空间
- 消费者信号量:标识当前有多少数据
如果说搞两个 条件变量 是 阻塞队列 的精髓,那么搞两个** 信号量 就是 环形队列 的精髓,显然,刚开始的时候,生产者信号量初始值为环形队列的大小,消费者信号量初始值为 0**
无论是生产者还是消费者,只有申请到自己的 「信号量」 资源后,才进行 生产 / 消费
比如上图中的
pro_sem
就表示 **生产者还可以进行
3
次生产**,
con_sem
表示 **消费者还可以消费
5
次**
生产者、消费者对于 「信号量」 的申请可以这样理解
// 生产者
void Producer()
{
// 申请信号量(空位 - 1)
sem_wait(&pro_sem);
// 生产商品
// ...
// 释放信号量(商品 + 1)
sem_post(&con_sem);
}
// 消费者
void Consumer()
{
// 申请信号量(商品 - 1)
sem_wait(&con_sem);
// 消费商品
// ...
// 释放信号量(空位 + 1)
sem_post(&pro_sem);
}
生产者和消费者指向同一个位置时保证线程安全,其他情况保证并发度
至于怎么落实到代码中,需要接着往下看
4.3、单生产单消费模型
首先来实现简单点的单生产、单消费版 「生产者消费者模型」
起手先创建一个 环形队列 头文件
创建
RingQueue.hpp
头文件
#pragma once
#include <vector>
#include <semaphore.h>
namespace Yohifo
{
#define DEF_CAP 10
template<class T>
class RingQueue
{
public:
RingQueue(size_t cap = DEF_CAP)
:_cap(cap), _pro_step(0), _con_step(0)
{
_queue.resize(_cap);
// 初始化信号量
sem_init(&_pro_sem, 0, _cap);
sem_init(&_con_sem, 0, 0);
}
~RingQueue()
{
// 销毁信号量
sem_destroy(&_pro_sem);
sem_destroy(&_con_sem);
}
// 生产商品
void Push(const T &inData)
{
// 申请信号量
P(&_pro_sem);
// 生产
_queue[_pro_step++] = inData;
_pro_step %= _cap;
// 释放信号量
V(&_con_sem);
}
// 消费商品
void Pop(T *outData)
{
// 申请信号量
P(&_con_sem);
// 消费
*outData = _queue[_con_step++];
_con_step %= _cap;
// 释放信号量
V(&_pro_sem);
}
private:
void P(sem_t *sem)
{
sem_wait(sem);
}
void V(sem_t *sem)
{
sem_post(sem);
}
private:
std::vector<T> _queue;
size_t _cap;
sem_t _pro_sem;
sem_t _con_sem;
size_t _pro_step; // 生产者下标
size_t _con_step; // 消费者下标
};
}
细节:
- 生产者的信号量初始值为
DEF_CAP
- 消费者的信号量初始值为
0
- 生产者、消费者的起始下标都为
0
- ** 在没有 互斥锁 的情况下,是如何 确保生产者与消费者间的互斥关系的?**
通过两个 信号量,当两个 信号量 都不为 0 时,双方可以并发操作,这是 环形队列 最大的特点;
- 当 生产者信号量为 0 时,生产者陷入阻塞等待,等待消费者消费;
- 同理当 消费者信号量为 0 时,消费者也会阻塞住,在这里阻塞就是 互斥 的体现。
当对方完成 生产 / 消费 后,自己会解除阻塞状态,而这就是 同步
目前代码没问题(单生产单消费场景中)
创建
cp.cc
源文件
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include "RingQueue.hpp"
void* Producer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
// 生产者慢一点
sleep(1);
// 1.生产商品(通过某种渠道获取数据)
int num = rand() % 10;
// 2.将商品推送至阻塞队列中
rq->Push(num);
std::cout << "Producer : " << num << std::endl;
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
// 消费者慢一点
sleep(1);
// 1.从阻塞队列中获取商品
int num;
rq->Pop(&num);
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer : " << num << std::endl;
}
pthread_exit((void*)0);
}
int main()
{
// 种 种子
srand((size_t)time(nullptr));
// 创建一个阻塞队列
Yohifo::RingQueue<int>* rq = new Yohifo::RingQueue<int>;
// 创建两个线程(生产者、消费者)
pthread_t pro, con;
pthread_create(&pro, nullptr, Producer, rq);
pthread_create(&con, nullptr, Consumer, rq);
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
delete rq;
return 0;
}
编译并运行程序。为了使结果更加清晰,分别展示 生产者每隔一秒生产一次、消费者每隔一秒消费一次的结果
生产者消费者步调一致
由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的,也是一起成对出现的。
生产者生产的快,消费者消费的慢
我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费。
void* Producer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
// 1.生产商品(通过某种渠道获取数据)
int num = rand() % 10;
// 2.将商品推送至阻塞队列中
rq->Push(num);
std::cout << "Producer : " << num << std::endl;
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
// 消费者慢一点
sleep(1);
// 1.从阻塞队列中获取商品
int num;
rq->Pop(&num);
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer : " << num << std::endl;
}
pthread_exit((void*)0);
}
此时由于生产者生产的很快,运行代码后一瞬间生产者就将环形队列打满了,此时生产者想要再进行生产,但空间资源已经为0了,于是生产者只能在_con_sem的等待队列下进行阻塞等待,直到由消费者消费完一个数据后对_con_sem进行了V操作,生产者才会被唤醒进而继续进行生产。
但由于生产者的生产速度很快,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
生产者生产的慢,消费者消费的快
当然我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费。
void* Producer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
sleep(1);
// 1.生产商品(通过某种渠道获取数据)
int num = rand() % 10;
// 2.将商品推送至阻塞队列中
rq->Push(num);
std::cout << "Producer : " << num << std::endl;
}
pthread_exit((void*)0);
}
void* Consumer(void *args)
{
Yohifo::RingQueue<int>* rq = static_cast<Yohifo::RingQueue<int>*>(args);
while(true)
{
// 1.从阻塞队列中获取商品
int num;
rq->Pop(&num);
// 2.消费商品(结合某种具体业务进行处理)
std::cout << "Consumer : " << num << std::endl;
}
pthread_exit((void*)0);
}
虽然消费者消费的很快,但一开始环形队列当中的数据资源为0,因此消费者只能在data_sem的等待队列下进行阻塞等待,直到生产者生产完一个数据后对_con_sem进行了V操作,消费者才会被唤醒进而进行消费。
但由于消费者的消费速度很快,消费者消费完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
接下来可以实现 多生产多消费场景 中的 CP 模型了,**多生产多消费无非就是增加了 消费者与消费者、生产者与生产者 间的 互斥 关系,加锁就行了,现在问题是加几把锁?**
答案是 两把,**因为当前的 生产者和消费者 关注的资源不一样,一个关注剩余空间,另一个关注是否有商品,一把锁是无法锁住两份不同资源的,所以需要给 生产者、消费者 各配一把锁**
- 阻塞队列 中为什么只需要一把锁?
因为阻塞队列中的共享资源是一整个队列,生产者和消费者访问的是同一份资源,所以一把锁就够了
RingQueue.hpp
头文件(完全版本)
#pragma once
#include <vector>
#include <mutex>
#include <semaphore.h>
namespace Yohifo
{
#define DEF_CAP 10
template<class T>
class RingQueue
{
public:
RingQueue(size_t cap = DEF_CAP)
:_cap(cap), _pro_step(0), _con_step(0)
{
_queue.resize(_cap);
// 初始化信号量
sem_init(&_pro_sem, 0, _cap);
sem_init(&_con_sem, 0, 0);
// 初始化互斥锁
pthread_mutex_init(&_pro_mtx, nullptr);
pthread_mutex_init(&_con_mtx, nullptr);
}
~RingQueue()
{
// 销毁信号量
sem_destroy(&_pro_sem);
sem_destroy(&_con_sem);
// 销毁互斥锁
pthread_mutex_destroy(&_pro_mtx);
pthread_mutex_destroy(&_con_mtx);
}
// 生产商品
void Push(const T &inData)
{
// 申请信号量
P(&_pro_sem);
Lock(&_pro_mtx);
// 生产
_queue[_pro_step++] = inData;
_pro_step %= _cap;
UnLock(&_pro_mtx);
// 释放信号量
V(&_con_sem);
}
// 消费商品
void Pop(T *outData)
{
// 申请信号量
P(&_con_sem);
Lock(&_con_mtx);
// 消费
*outData = _queue[_con_step++];
_con_step %= _cap;
UnLock(&_con_mtx);
// 释放信号量
V(&_pro_sem);
}
private:
void P(sem_t *sem)
{
sem_wait(sem);
}
void V(sem_t *sem)
{
sem_post(sem);
}
void Lock(pthread_mutex_t *lock)
{
pthread_mutex_lock(lock);
}
void UnLock(pthread_mutex_t *lock)
{
pthread_mutex_unlock(lock);
}
private:
std::vector<T> _queue;
size_t _cap;
sem_t _pro_sem;
sem_t _con_sem;
size_t _pro_step; // 生产者下标
size_t _con_step; // 消费者下标
pthread_mutex_t _pro_mtx;
pthread_mutex_t _con_mtx;
};
}
- 细节: 加锁行为放在信号量申请成功之后,可以提高并发度
** 在 环形队列 中,可以在申请 「信号量」 前进行加锁,也可以在申请 「信号量」 后进行加锁,这里比较推荐的是 在申请 「信号量」 后加锁,**因为 「信号量」 的操作是原子的,可以确保线程安全,也就不需要加锁保护;也就是可以并发申请 「信号量」,再串行化访问临界资源
这样子就大功告成了。
5.阻塞队列VS环形队列
** 首先要明白 「生产者消费者模型」 高效的地方从来都不是往缓冲区中放数据、从缓冲区中拿数据**
对缓冲区的操作对于计算机说就是小 case,需要关注的点在于 **获取数据和消费数据**,这是比较耗费时间的,
- 阻塞队列 至多支持获取 一次数据获取 或 一次数据消费,在代码中的具体体现就是 所有线程都在使用一把锁,并且每次只能 push、pop 一个数据;
- 而 环形队列 就不一样了,生产者、消费者 可以通过 条件变量 知晓数据获取、数据消费次数,并且由于数据获取、消费操作没有加锁,支持并发,因此效率十分高
环形队列 中允许 N 个生产者线程一起进行数据获取,也允许 N 个消费者线程一起进行数据消费,简单任务处理感知不明显,但复杂任务就不一样了,这就有点像同时下载多份资源,是可以提高效率的
但是环形队列也不是十全十美的!要不然也就不用学习阻塞队列了
特征 阻塞队列(互斥锁实现)环形队列(信号量实现)内部同步机制 使用互斥锁或类似的锁机制来实现线程安全使用信号量来实现线程安全阻塞操作支持阻塞操作,当队列为空或已满时,线程可以等待也支持阻塞操作,当队列为空或已满时,线程可以等待数据覆盖通常不会覆盖已有元素,新元素添加时需要等待队列有空间有界的,当队列已满时,添加新元素会覆盖最早的元素实现复杂度实现可能较为复杂,需要处理锁的获取和释放 实现相对较简单,需要管理信号量线程安全 通过锁来保证线程安全,容易引入死锁问题通过信号量来保证线程安全,不易引入死锁问题添加和删除操作时间复杂度 O(1)(在队列未满或非空时)O(1)(常数时间,除非队列已满或为空)应用场景多线程数据传递,任务调度,广播通知等循环缓存,数据轮询,循环任务调度等
至此,生产者消费者完结!
版权归原作者 掘根 所有, 如有侵权,请联系我们删除。