阅读本篇文章前推荐优先阅读:
【Linux】多线程(中)-CSDN博客https://blog.csdn.net/Eristic0618/article/details/143433347?spm=1001.2014.3001.5501
一、生产者消费者模型
1.1 概念
生产者消费者模型的产生是为了解决数据的生产者和消费者的强耦合问题,方案是提供一个额外的容器让生产者和消费者之间进行通讯。
其思路在于,生产者产出数据后不直接递交给消费者进行处理,而是托管到容器中;消费者也不会直接向生产者请求数据,而是从容器中获取。
在生活中,一个经典的生产者消费者模型就是超市了,客户们就是消费者,供货商们就是生产者,而超市就是容器。如果没有超市,客户只能和供货商直接对接,但单个客户的需求量对于供货商的生产计划而言实在是太少了,生产和消费的效率不平衡。超市就相当于一个大号的缓存,既能够承担供货商高效的生产效率,又能够接受客户零碎的消费需求,并且生产和消费不必同时进行,做到了将生产和消费动作很好的解耦,支持生产和消费的忙闲不均。
在程序中,生产者和消费者就是一个个线程,数据存放到特定结构的内存空间中。于是这个内存空间一定会被多线程并发访问,是共享资源,所以我们的模型中一定要引入互斥锁保证其线程安全!
生产者消费者模型中的“321原则”,分为三种关系、两个角色和一个交易场所,必须遵守
其中三种关系分为:
- 生产者与生产者:互斥
- 消费者与消费者:互斥
- 生产者与消费者:互斥和同步
两个角色:生产者和消费者
一个交易场所:特定结构的内存空间
1.2 基于阻塞队列
阻塞队列(Blocking Queue)是一种常用于实现生产者消费者模型的数据结构,其特点在于当队列为空时,消费者从队列获取数据的动作将被阻塞,直到队列中有数据被放入;当队列已满时生产者向队列中存放数据的动作将被阻塞,直到队列中有空间
我们可以使用互斥锁和条件变量来实现基于阻塞队列的生产者消费者模型
//BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
template <class T>
class BlockQueue
{
static const int NUM = 5;
public:
BlockQueue(int cap = NUM)
:capacity_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_, nullptr);
pthread_cond_init(&p_, nullptr);
}
const T& pop()
{
pthread_mutex_lock(&mutex_); //加锁
while(q_.size() == 0) //阻塞队列为空(为什么要用while而不是if?)
{
pthread_cond_wait(&c_, &mutex_); //消费者等待
}
T out = q_.front(); //消费
q_.pop();
pthread_cond_signal(&p_); //唤醒生产者
pthread_mutex_unlock(&mutex_); //解锁
return out;
}
void Push(const T& in)
{
pthread_mutex_lock(&mutex_); //加锁
while(q_.size() == capacity_) //阻塞队列已满
{
pthread_cond_wait(&p_, &mutex_); //生产者等待
}
q_.push(in); //生产
pthread_cond_signal(&c_); //唤醒消费者
pthread_mutex_unlock(&mutex_); //解锁
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_);
pthread_cond_destroy(&p_);
}
private:
std::queue<T> q_;
int capacity_; //阻塞队列容量
pthread_mutex_t mutex_; //互斥锁
pthread_cond_t c_;
pthread_cond_t p_;
};
可以看到在Pop和Push函数中,我们判断队列为空或已满时使用的是while循环而不是if,为什么?
答案:防止线程被伪唤醒的情况
假设队列容量已满,消费者在消费完毕后可能在唤醒的时候一次性唤醒了多个生产者,被唤醒的这几个生产者先对锁竞争,第一个竞争到锁的线程开始生产,生产完毕后释放锁。接着消费者和剩余被唤醒的生产者共同竞争这把锁,如果下一个竞争到锁的还是生产者,则可能导致生产了超出队列容量的数据,导致错误。
使用while循环,就可以做到在线程被伪唤醒的情况下重新将其加入条件变量中
在main.cc中调用BlockQueue.hpp,实现一个多生产多消费的情景
#include "BlockQueue.hpp"
pthread_mutex_t mutex; //互斥锁
void* Consumer(void* args) //消费者例程
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
sleep(1); //可以让消费的速度慢一些
int data = bq->pop(); //消费数据
std::cout << "consuming a data: " << data << std::endl;
}
}
void* Productor(void* args) //生产者例程
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
static int data = 0; //模拟数据
while (true)
{
pthread_mutex_lock(&mutex);
data++; //临界资源
bq->Push(data); //生产数据
std::cout << "produce a data: " << data << std::endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++) //创建多个消费者线程
{
pthread_create(c + i, nullptr, Consumer, bq);
}
for (int i = 0; i < 5; i++) //创建多个生产者线程
{
pthread_create(p + i, nullptr, Productor, bq);
}
//线程等待
for (int i = 0;i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
delete bq;
return 0;
}
程序运行结果如下:
因为我们的代码中选择让消费者的速度慢一些,对生产者的速度不作限制,因此可以看到一旦消费者消费一个数据,生产者就立刻生产数据补上
在实际情况中,不一定只有队列为空才让消费者等待,队列为满让生产者等待,而是可以添加一个水位线,限定数据存量的上限和下限
为何生产者消费者模型是高效的?在业务中,生产者生产的数据往往需要花费时间获取,消费者在获取数据后也可能要对数据进行加工处理,也需要时间。生产者在获取数据时消费者可能在消费数据,生产者在生产数据时消费者可能正在加工处理数据,此时一个访问临界区,一个访问非临界区,二者互不干扰。
虽然生产和消费时线程之间是互斥的,但多生产多消费的意义在于让线程能够并发的完成数据的获取和后续数据的加工处理
1.3 POSIX信号量
在前面我们已经学习过了System V信号量
【Linux】进程间通信——System V消息队列和信号量_vos消息队列-CSDN博客https://blog.csdn.net/Eristic0618/article/details/142635584?spm=1001.2014.3001.5501POSIX信号量和System V信号量作用相同,但POSIX信号量可以用于线程间同步。
之前提到过,信号量的本质就是一把计数器,我们只要让信号量的值与资源的数量保持一致,让线程竞争信号量,那么每一个竞争到信号量的线程就一定会分配到资源。也就是说,在申请信号量和释放信号量之间,无需再对资源是否就绪做判断了,因为持有信号量就意味着一定有一份资源属于该线程
这里不再赘述信号量相关的概念,只需要了解其API如何使用即可
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
其中sem是待初始化的POSIX信号量;pshared为0表示线程间共享,非0表示进程间共享;value为信号量初始值
销毁信号量
#include <semaphore.h>
int sem_destroy(sem_t *sem);
等待信号量
#include <semaphore.h>
int sem_wait(sem_t *sem);
线程调用sem_wait函数后会将信号量sem的值减1
发布信号量
#include <semaphore.h>
int sem_post(sem_t *sem);
资源使用完毕后,线程可以调用sem_post函数将信号量sem的值加1,代表归还一份资源
1.4 基于环形队列和POSIX信号量
关于环形队列的性质我们简单提一嘴:
环形队列通常采用数组模拟,通过模运算实现逻辑上的循环结构
其缺点是起始状态下队头和队尾在同一位置,不便于判断队列为空还是为满,因此通常空出一个位置来区分二者的状态。
但现在我们有POSIX信号量,通过信号量的值就能知道队列中资源的数量,从而进行判满或判空
信号量用来统计剩余资源的数量,但对于生产者和消费者而言,二者需要的资源是同一种吗?
- 生产者需要的资源:容器中剩余的空间
- 消费者需要的资源:容器中剩余的数据
因此,使用环形队列和POSIX信号量实现生产消费模型时,我们需要一个消费信号量和一个生产信号量。消费信号量用于统计队列中的数据个数,生产信号量用于统计队列中的剩余空间数量
- 生产者进行一次生产:生产信号量-1(空间-1),消费信号量+1(数据+1)
- 消费者进行一次消费:消费信号量-1(数据-1),空间信号量+1(空间+1)
因此,不论是生产者进行生产,还是消费者进行消费,我们都要进行一次完整的PV操作,即请求自己的信号量(P)和释放对方的信号量(V)
环形队列代码如下:
//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
template <class T>
class RingQueue
{
static const int NUM = 5;
private:
void P(sem_t &sem) //P操作:申请信号量
{
sem_wait(&sem);
}
void V(sem_t &sem) //V操作:归还信号量
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex) //上锁
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex) //解锁
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = NUM)
: ringqueue_(cap), capacity_(cap), Ci_(0), Pi_(0)
{
sem_init(&c_sem_, 0, 0);
sem_init(&p_sem_, 0, capacity_);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Pop(T* out) //消费
{
P(c_sem_); //申请消费信号量
Lock(c_mutex_); //加锁
*out = ringqueue_[Ci_];
Ci_++;
Ci_ %= capacity_; //保持循环性质
Unlock(c_mutex_); //解锁
V(p_sem_); //归还生产信号量
}
void Push(T in) //生产
{
P(p_sem_); //申请生产信号量
Lock(p_mutex_); //加锁
ringqueue_[Pi_] = in;
Pi_++;
Pi_ %= capacity_; //保持循环性质
Unlock(p_mutex_); //解锁
V(c_sem_); // 归还消费信号量
}
~RingQueue()
{
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
sem_destroy(&c_sem_);
sem_destroy(&p_sem_);
}
private:
std::vector<T> ringqueue_; // 环形队列
int capacity_; // 队列容量
int Ci_; //可消费的数据所在下标
int Pi_; //可生产数据的空间下标
sem_t c_sem_; //消费信号量:剩余可消费的数据
sem_t p_sem_; //生产信号量:剩余可生产的空间
pthread_mutex_t c_mutex_; //消费者互斥锁
pthread_mutex_t p_mutex_; //生产者互斥锁
};
在main.cc中调用RingQueue.hpp,实现一个多生产多消费的情景
#include "RingQueue.hpp"
#include <time.h>
#include <string>
class ThreadData //线程属性
{
public:
RingQueue<int> *rq_; //环形队列
std::string ThreadName_; //线程名
};
void* Consumer(void* args) //消费者例程
{
ThreadData *td = static_cast<ThreadData *>(args);
RingQueue<int> *rq = td->rq_;
while(true)
{
int data = 0;
rq->Pop(&data); //消费数据
std::cout << td->ThreadName_ << " consuming a data: " << data << std::endl;
sleep(1);
}
return nullptr;
}
void* Producer(void* args) //生产者例程
{
ThreadData *td = static_cast<ThreadData *>(args);
RingQueue<int> *rq = td->rq_;
while(true)
{
int data = rand() % 10; //随机值模拟数据
rq->Push(data); //生产数据
std::cout << td->ThreadName_ << " produce a data: " << data << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(0));
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c[5], p[3];
for (int i = 0; i < 5; i++)
{
ThreadData *td = new ThreadData();
td->rq_ = rq;
td->ThreadName_ = "Consumer " + std::to_string(i); //初始化线程属性
pthread_create(c + i, nullptr, Consumer, td); //创建消费者线程
}
for (int i = 0; i < 3; i++)
{
ThreadData *td = new ThreadData();
td->rq_ = rq;
td->ThreadName_ = "Producer " + std::to_string(i);
pthread_create(p + i, nullptr, Producer, td); //创建生产者线程
}
//线程等待
for (int i = 0; i < 5; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0;i < 3; i++)
{
pthread_join(p[i], nullptr);
}
return 0;
}
程序运行结果如下:
通过观察可以发现,3个生产者每秒能够生产3份数据,但消费者有5个,所以每秒只能有3个消费者申请到信号量并消费数据
二、线程池
2.1 概念
线程池是一种利用池化技术思想来实现的线程管理技术,主要是为了复用线程、便利地管理线程和任务、并将线程的创建和任务的执行解耦开来。线程池维护着多个线程,随时等待分配可并发执行的任务,能够保证内核的充分利用,避免了在处理短时间任务时创建和销毁线程的开销。
线程池的适用场景:
- 需要大量线程完成任务且任务所需时长短的场景,例如web服务器完成网页请求。但对于长时间任务,线程池的优势就不明显了,因为任务所需时间比线程创建时间长
- 对性能要求苛刻的应用
- 可能产生突发性大量请求的应用。在这种情况下如果不采用线程池,短时间内将产生大量线程,占用系统内存
2.2 代码
下面是线程池的代码:
//ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
struct ThreadInfo
{
pthread_t tid;
std::string threadname;
};
static const int ThreadNum = 5;
template <class T>
class ThreadPool
{
public:
void Lock() //加锁
{
pthread_mutex_lock(&mutex_);
}
void Unlock() //解锁
{
pthread_mutex_unlock(&mutex_);
}
void Wait() //线程等待
{
pthread_cond_wait(&cond_, &mutex_);
}
void Wakeup() //唤醒线程
{
pthread_cond_signal(&cond_);
}
std::string GetThreadName(pthread_t tid) //获取线程名
{
for(auto &e : threads_)
{
if(tid == e.tid)
return e.threadname;
}
return "None";
}
public:
ThreadPool(int num = ThreadNum)
:threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
static void* ThreadRoutine(void* args) //为何要加static?因为不加static的成员函数第一个参数是this指针,不符合线程例程函数的要求
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string threadname = tp->GetThreadName(pthread_self());
while(true)
{
tp->Lock(); //加锁
while(tp->datas_.size() == 0) //队列为空
{
tp->Wait(); //线程进入条件变量等待
}
T data = tp->Pop(); //取出数据
tp->Unlock(); //解锁
std::cout << threadname << " get a data: " << data << std::endl;
}
}
void Start() //启动线程池
{
for (int i = 0; i < threads_.size(); i++)
{
threads_[i].threadname = "Thread-" + std::to_string(i + 1); //设置线程名
pthread_create(&(threads_[i].tid), nullptr, ThreadRoutine, this); //创建线程并传入This指针
}
}
T Pop() //从队列中取出数据
{
T out = datas_.front();
datas_.pop();
return out;
}
void Push(T &in) //向队列中放入数据
{
Lock(); //加锁
datas_.push(in);
Wakeup(); //唤醒线程
Unlock(); //解锁
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
std::vector<ThreadInfo> threads_; //线程信息数组
std::queue<T> datas_; //存放数据的队列(可以改为存放任务等)
pthread_mutex_t mutex_; //互斥锁
pthread_cond_t cond_; //条件变量
};
在main.cc中调用线程池并进行测试:
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ThreadPool.hpp"
int main()
{
ThreadPool<int> *tp = new ThreadPool<int>(10); //创建10个线程的线程池
tp->Start(); //启动线程池
srand(time(0));
while(true)
{
int data = rand() % 10;
tp->Push(data);
std::cout << "main thread make a data: " << data << std::endl;
sleep(1);
}
return 0;
}
程序运行结果如下:
三、封装Linux线程库
Linux线程库中,我们需要调用pthread_create函数才能让线程开始执行例程函数
但在C++11的标准库中,我们可以在创建thread类对象时传入线程需要执行的函数,并且可以通过调用类成员函数完成线程等待之类的操作。我们能否通过对Linux线程库的封装来简单实现和C++thread类一样的效果呢?
//Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <time.h>
#include <pthread.h>
typedef void (*callback_t)();
static int num = 1;
class Thread
{
public:
static void* ThreadRoutine(void* args)
{
Thread *thread = static_cast<Thread *>(args);
thread->entery();
}
public:
Thread(callback_t cb)
:threadname_(""), start_timestamp_(0), isrunning_(false), cb_(cb)
{}
void run()
{
threadname_ = "Thread-" + std::to_string(num++);
start_timestamp_ = time(0);
isrunning_ = true;
pthread_create(&tid_, nullptr, ThreadRoutine, this);
}
void join()
{
pthread_join(tid_, nullptr);
}
void entery()
{
cb_();
}
std::string name()
{
return threadname_;
}
bool isrunning()
{
return isrunning_;
}
uint64_t starttime()
{
return start_timestamp_;
}
private:
pthread_t tid_;
std::string threadname_;
uint64_t start_timestamp_;
bool isrunning_;
callback_t cb_;
};
注:此处没有实现创建线程时传入函数参数的功能,有兴趣的同学可以自己研究一下
在main.cc中调用封装后的线程
#include <iostream>
#include <unistd.h>
#include "Thread.hpp"
void Print()
{
while(true)
{
std::cout << "I am an encapsulated thread" << std::endl;
sleep(1);
}
}
int main()
{
Thread t(Print);
t.run();
std::cout << "isrunning: " << t.isrunning() << std::endl;
std::cout << "starttime: " << t.starttime() << std::endl;
std::cout << "threadname: " << t.name() << std::endl;
t.join();
return 0;
}
程序运行结果如下:
四、单例模式
4.1 概念
单例模式是一种经典的、常用常考的设计模式。采用这种模式的类只能由类本身创建一个全局唯一的实例化对象,即用户自己不能对这个类进行实例化,而是调用其接口来获取这个类已经实例化的对象。只能有一个实例化对象的类就称为单例。
单例模式是一种创建型设计模式,它确保一个类只有一个实例,并提供了一个方法来获取该实例。
要点:
- 单例类只能有一个实例
- 单例类必须自己创建自己的唯一实例
- 用户只能通过单例类提供的方法来获取该实例
当需要控制实例数目,节省系统资源时,我们就可以采用单例模式。单例模式可以解决频繁创建和销毁全局使用的类实例的问题。单例模式还可以避免资源的多重占用,例如当某些资源只能由一个对象进行管理时,这种情况可以采用单例模式
为了确保用户不会自行实例化单例类,我们需要将单例类的构造函数的属性设置为私有
4.2 单例模式的实现方式
4.2.1 饿汉模式
对于一个单例类,如果在程序一开始运行时就实例化自己的对象,在需要使用时可以直接获取,即为饿汉模式。这种模式本身就是线程安全的,不需要加锁
#include <iostream>
class Singleton
{
public:
static Singleton* GetInstance() //获取单例类对象
{
return instance;
}
static void DestroyInstance() //销毁单例类对象
{
if(instance)
{
delete instance;
instance = nullptr;
}
}
private: //将构造、拷贝构造、赋值构造、析构设置为私有
Singleton()
{
std::cout << "Singleton()" << std::endl;
}
~Singleton()
{
std::cout << "~Singleton()" << std::endl;
}
Singleton(const Singleton &sig);
const Singleton operator=(const Singleton &sig);
private:
static Singleton *instance;
};
Singleton *Singleton::instance = new Singleton(); //全局唯一的单例类对象
因为类对象指针是静态的,且只有静态成员函数能访问静态成员变量,所以函数也得是静态的
在main.cc中调用这个饿汉式单例并进行测试:
#include <iostream>
#include "Singleton.hpp"
int main()
{
Singleton *ins = Singleton::GetInstance(); //获取单例类对象
ins->DestroyInstance(); //销毁单例类对象
return 0;
}
程序运行结果如下:
如果我们试图在源文件中自行实例化一个单例类对象呢?
可以看到因为构造函数已经私有化了,无法在外部调用
4.2.2 懒汉模式
懒汉模式与饿汉模式相对,其不会在程序一开始运行时就实例化类对象,而是等到用户需要使用该对象、第一次调用方法获取时才进行实例化。
懒汉模式的核心思想在于“延时加载”,从而能够优化程序的启动速度,但其本身无法保证线程安全
(1)线程不安全的懒汉模式
#include <iostream>
class Singleton
{
public:
static Singleton* GetInstance() //获取单例类对象
{
if(instance == nullptr) //第一次调用:为空指针,进行实例化
{
instance = new Singleton();
}
return instance; //后续直接返回已实例化的对象
}
static void DestroyInstance() //销毁单例类对象
{
if(instance)
{
delete instance;
instance = nullptr;
}
}
private: //将构造、拷贝构造、赋值构造、析构设置为私有
Singleton()
{
std::cout << "Singleton()" << std::endl;
}
~Singleton()
{
std::cout << "~Singleton()" << std::endl;
}
Singleton(const Singleton &sig);
const Singleton operator=(const Singleton &sig);
private:
static Singleton *instance;
};
Singleton *Singleton::instance = nullptr; //初始化为空指针
可以看到,对于懒汉模式,单例类对象的指针初始是为空的,当用户第一次获取类对象时才会进行实例化
但是这种普通的懒汉模式存在线程不安全的问题,即如果在第一次获取类对象时两个线程同时调用GetInstance方法,可能会创建出两份单例类的实例
因此我们可以通过静态局部变量或使用互斥锁来实现线程安全的懒汉模式
(2)线程安全的懒汉模式
①使用静态局部变量
使用静态局部变量实现懒汉模式的方式称为 Meyer’s Singleton
#include <iostream>
class Singleton
{
public:
static Singleton& GetInstance() //获取单例类对象
{
static Singleton instance;
return instance;
}
private: //将构造、拷贝构造、赋值构造、析构设置为私有
Singleton()
{
std::cout << "Singleton()" << std::endl;
}
~Singleton()
{
std::cout << "~Singleton()" << std::endl;
}
Singleton(const Singleton &sig);
const Singleton operator=(const Singleton &sig);
};
在C++11中,使用静态局部变量的方式天然就是线程安全的,具体详见:
c++ - Singleton instance declared as static variable of GetInstance method, is it thread-safe? - Stack Overflowhttps://stackoverflow.com/questions/449436/singleton-instance-declared-as-static-variable-of-getinstance-method-is-it-thre这种方式实现的懒汉式单例,我们直接调用Singleton::GetInstance()即可获取其对象
②使用互斥锁
#include <iostream>
class Singleton
{
public:
static Singleton* GetInstance() //获取单例类对象
{
if(instance == nullptr) //第一次调用:为空指针,进行实例化
{
pthread_mutex_lock(&mutex); //加锁
if(instance == nullptr) //双重判断——双检锁:避免重复实例化
{
instance = new Singleton();
}
pthread_mutex_unlock(&mutex); //解锁
}
return instance; //后续直接返回已实例化的对象
}
static void DestroyInstance() //销毁单例类对象
{
if(instance)
{
delete instance;
instance = nullptr;
}
}
private: //将构造、拷贝构造、赋值构造、析构设置为私有
Singleton()
{
std::cout << "Singleton()" << std::endl;
}
~Singleton()
{
std::cout << "~Singleton()" << std::endl;
}
Singleton(const Singleton &sig);
const Singleton operator=(const Singleton &sig);
private:
static Singleton *instance;
static pthread_mutex_t mutex;
};
Singleton *Singleton::instance = nullptr; //初始化为空指针
pthread_mutex_t Singleton::mutex;
为了避免在第一次获取单例类对象时出现多线程同时调用GetInstance方法导致重复实例化的情况,我们需要使用互斥锁来保证线程安全。
但单纯使用互斥锁是不够的,在临界区内部还需要进行第二次判断,以避免解锁后其他线程继续申请锁进行二次实例化,这种进行双重判断的锁称为双检锁
五、其他常见的锁
5.1 定义
- 公平锁:线程在申请锁前先进入队列排队,按照顺序获取锁
- 非公平锁:多线程竞争式获取锁,获取不到才进入队列,可能导致线程饥饿问题
- 自旋锁:获取不到锁的线程不会阻塞挂起,而是循环等待并不断判断是否能够获取锁
- 悲观锁:每次获取数据时,总担心数据会被其他线程修改,因此会在访问数据前先加锁
- 乐观锁:每次获取数据时,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是会在更新数据前判断是否有其他线程对数据进行过修改。常见方式:版本号机制和CAS操作
自旋和非阻塞轮询比较相似,就是周而复始的去申请锁。过去我们学习使用的互斥锁属于挂起等待锁,即线程申请锁失败就被挂起,但将线程挂起和唤醒也是需要时间的,所以使用互斥锁和自旋锁完全取决于其他线程执行临界区时的时长。如果线程执行临界区需要花很长时间,那么就使用互斥锁,如果很短,那么就使用自旋锁
我们可以使用pthread_mutex_trylock函数和循环实现简单的自旋锁
#include <pthread.h>
int pthread_mutex_trylock(pthread_mutex_t *mutex);
除此之外,还有Linux原生线程库提供的自旋锁
#include <pthread.h>
int pthread_spin_lock(pthread_spinlock_t *lock);
5.2 C++11Atomic和CAS操作
为了保证线程安全,我们可以使用互斥锁,但不断加锁解锁的过程也是有性能损耗的。如果想要无锁实现线程安全,可使用C++11中新增的原子操作类Atomic中的CAS方法
首先我们需要知道什么是CAS操作
Compare-and-Swap (CAS)是用于多线程以实现同步的原子指令,在修改数据前先判断其是否与预期值相等,如果相等才会将其修改为给定值
通过在修改前先比较的方式,我们就能保证在修改数据的过程中,不会有其他线程额外对数据进行篡改,从而进行安全的写入,保证读写的一致性
关于Atomic类,可以查阅文档:
cplusplus.com/reference/atomic/atomic/https://cplusplus.com/reference/atomic/atomic/atomic<T>提供的常用方法有:
- store:原子写
- load:原子读
- exchange:原子的对两个数据进行交换
- compare_exchange_weak和compare_exchange_strong:CAS操作
我们先看这两个CAS操作的声明:
bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_weak (T& expected, T val, memory_order success, memory_order failure) noexcept;
bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
bool compare_exchange_strong (T& expected, T val, memory_order sync = memory_order_seq_cst) noexcept;
bool compare_exchange_strong (T& expected, T val, memory_order success, memory_order failure) volatile noexcept;
bool compare_exchange_strong (T& expected, T val, memory_order success, memory_order failure) noexcept;
主要参数为expected和val,它们的作用是,若this的值等于expected的值,则将this修改为val并返回true,否则返回false
其中,weak版本和strong的版本区别在于,weak版本的CAS操作允许出乎意料的false返回(例如this值和expected值一样时仍然返回false),比strong版本具有更高的性能
要创建一个原子变量,可以用这种方式:
std::atomic<int> ai(1);
int val = ai.load(); //原子的读取
ai.store(2); //原子的写入
我们可以看看C++文档中实现的线程安全的无锁链表
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
struct Node
{
int value;
Node *next;
};
std::atomic<Node *> list_head(nullptr);
void append(int val)
{
Node *oldHead = list_head;
Node *newNode = new Node{val, oldHead};
while (!list_head.compare_exchange_weak(oldHead, newNode))
newNode->next = oldHead;
}
int main()
{
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i)
threads.push_back(std::thread(append, i));
for (auto &th : threads)
th.join();
for (Node *it = list_head; it != nullptr; it = it->next)
std::cout << ' ' << it->value;
std::cout << '\n';
Node *it;
while (it = list_head)
{
list_head = it->next;
delete it;
}
return 0;
}
六、读者写者问题
6.1 概念
在多线程编程中,有一种情况十分常见,即有些共享数据修改的机会较少,读取的次数相比写入的次数要多得多。
例如存在一个文件,允许多个读者同时读取这个文件,但是不允许在读的过程中对文件进行写入,文件在进行写入时也不允许读取或有其他人同时写入
读者写者问题也有“321原则”,但与生产者消费者模型不同之处在于,读者与读者之间是共享关系,而不是互斥关系,因为读者不会像消费者一样拿走数据
在Linux原生线程库中也有读者写者问题需要用到的锁:读写锁
#include <pthread.h>
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
//读者申请读写锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
//写者申请读写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
//读者写者释放读写锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
6.2 读写策略
在读者多、写者少的情况下,读者竞争锁的能力更强,可能导致写者的饥饿问题,因此我们有时可能需要对读写采取特定的策略
(1)读者优先
读者优先的伪代码:
mutex_t rlock, wlock;
int reader_count = 0; //读者个数
void reader() //读者
{
lock(rlock); //加锁准备修改reader_count
if(reader_count == 0)
lock(wlock); //第一个读者申请写者锁,写者被阻塞
reader_count++;
unlock(rlock); //解锁
read(); //读者读
lock(rlock); //加锁准备修改reader_count
reader_count--;
if(reader_count == 0)
unlock(wlock);//最后一个读者读取完才释放写者锁
unlock(rlock); //解锁
}
void writer() //写者
{
lock(wlock); //加锁进行写入
write();
unlock(wlock);
}
第一个读者开始读取时,首先申请写者锁,让写者阻塞无法进行写入。读取的动作不需要互斥,因此不用加锁。
当reader_count为0时说明所有读者读取完毕,此时释放写者锁,让写者能够申请锁进行写入
而写者只需要简单的申请和释放锁,构成写者之间的互斥即可
(2)写者优先
写者优先的伪代码:
mutex_t S;
mutex_t rlock,wlock, wclock;
int readcount = 0, writecount = 0;
void reader() //读者
{
lock(S); //读者对S加锁准备申请读取
lock(rlock);//加锁准备修改readcount
if(readcount == 0)
lock(wlock); //第一个读者阻止后面的写者写入
readcount++;
unlock(rlock);
unlock(S);
read(); //读者读
lock(rlock); //加锁准备修改readcount
readcount--;
if(readcount == 0)
unlock(wlock);//最后一个读者读取完毕
unlock(rlock);
}
void writer() //写者
{
lock(wclock); //加锁准备修改writecount
if(writecount == 0)
lock(S); //第一个写者申请锁S,读者被阻塞
writecount++;
unlock(wclock);
lock(wlock);
write(); //写者写
unlock(wlock);
lock(wclock); //加锁准备修改writecount
writecount--;
if(writecount == 0)
unlock(S); //最后一个写者写入完毕释放S,读者才能读取
unlock(wclock);
}
在没有写者进行写入的情况下,读者会申请wlock让自己在读取的同时写者无法进行写入。
如果此时有写者想要进行写入,就会先申请S让后续读者阻塞,等待当前所有读者读取完毕后进行写入
上面的mutex_t换成二元信号量也是一样的效果,伪代码主要方便进行原理的说明
(3)读写公平
读写公平的伪代码:
mutex_t rlock, wlock, S;
int readcount = 0, writecount = 0;
void reader() //读者
{
lock(S);
lock(rlock);//加锁准备修改readcount
if(readcount == 0)
lock(wlock);//第一个读者申请wlock,写者阻塞
readcount++;
unlock(rlock);
unlock(S);
read(); //读者读
lock(rlock);
readcount--;
if(readcount == 0)
unlock(wlock);
unlock(rlock);
}
void writer() //写者
{
lock(S);
lock(wlock);
write(); //写者写
unlock(wlock);
unlock(S);
}
读者跟写者一起公平竞争S,申请到S的线程才有资格访问共享资源
完.
版权归原作者 阿瑾0618 所有, 如有侵权,请联系我们删除。