0


【Linux】:多线程(互斥 && 同步)

📃个人主页:island1314

🔥个人专栏:Linux—登神长阶

⛺️ 欢迎关注:👍点赞 👂🏽留言 😍收藏 💞 💞 💞


1. 线程互斥 🚀

1.1 进程线程间的互斥相关背景概念

  1. 临界资源:多线程执行流共享的资源就叫做临界资源
  2. 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  3. 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
  4. 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1.2 互斥量Mutex

首先我们来看个案例,关于抢票的,如下:

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>

int ticket = 100;

void *routine(void *arg)
{
    char *id = (char*)arg;
    
    while(1)
    {
        if (ticket > 0){
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
        }
        else{
            break;
        }
    }
    return nullptr;
}

int main()
{
    pthread_t t1, t2, t3, t4;

    pthread_create(&t1, NULL, routine, (void *) "thread-1");
    pthread_create(&t2, NULL, routine, (void *) "thread-2");
    pthread_create(&t3, NULL, routine, (void *) "thread-3");
    pthread_create(&t4, NULL, routine, (void *) "thread-4");

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);

    return 0;
}

// 输出结果:
thread-1 sells ticket:100
...
thread-2 sells ticket:0
thread-3 sells ticket:-1
thread-4 sells ticket:-1

****由上面结果可知,抢票抢到负数去了。因此多线程并发访问公共资源时可能会引发异常 ****

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互
  • 多个线程并发的操作共享变量,会带来一些问题

🎈 那么为什么会出现负数的情况呢?

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程

  • usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段

  • –ticket 操作本身就不是⼀个原子操作- -- 操作并不是原子操作,而是对应三条汇编指令- load:将共享变量ticket从内存加载到寄存器中- update:更新寄存器里面的值,执行-1操作将新值,从寄存器- store:写回共享变量ticket的内存地址

寄存器不等于寄存器的内容,线程在执行的时候,将共享数据,加载到 CPU 寄存器的本质:把数据的内容,变成了自己的上下文,同时自己拷贝了一份数据

拿走数据,拿走上下文,每次通过上下文轮番刷新

对一个全局变量进行多线程并发--/++是否是安全的?(并发情况下,对变量的操作)---> 不安全

加深理解(举个例子)

每个进程都认为自己是 1,操作完第一步之后就被切走了,我在修改的时候你也修改了,例如:

假设此时有两个线程,它们都在尝试递减全局变量 tickets 的值。若没有适当的同步机制,可能会发生以下情形:

  1. 线程 a 和 线程 b 都检查 tickets 的值是否大于 0。
  2. 线程 a 和 线程 c 都发现 tickets 的值大于 0,因此都会尝试递减 tickets 的值。
  3. 线程 a 先递减 tickets 的值,例如从 100 减到 99。
  4. 线程 b 也递减 tickets 的值,但是由于它读取的是原始值 100,所以也会将 tickets 的值从 100 减到 99。
  5. 最终结果:尽管两个线程都执行了递减操作,但是 tickets 的值只减少了 1 而不是期望的 2

🎈 要解决以上问题,需要做到三点:

  • 代码必须要有互斥行为:当代码进入区执行时,不允许其他线程进入该临界区
  • 并且临界区没有线程在执行,那么只能允许一个线程如果多个线程同时要求执行临界区的代码进入该临界区
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量

  • 互斥锁在任何时刻,只允许一个线程进行资源访问

1.3 互斥量函数

🐋 初始化互斥量有两种方法:

  • 如果定义的是全局或者静态的锁,可以只使用pthread_mutex_t 锁的名字=PTHREAD_MUTEX_INITIALIZER
  • 如果定义的这把锁是动态申请的,比如new或栈上开辟的,必须使用pthread_mutex_init函数来进行初始化。参数1就是你自己定义的锁,参数2是属性,直接设为nullptr即可。
① ⽅法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

② ⽅法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

参数:
    mutex:要初始化的互斥量
    attr:NULL

🐋 销毁互斥量

销毁互斥量需要注意:

  • 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁,因为此时锁是静态或全局的,不需要 destroy,全局的或者静态的变量会随着进程的运行而一直存在,进程结束他也就自动释放了
  • 不要销毁⼀个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);

🐋 互斥量加锁和解锁

🔥 一旦有了锁,我们就需要对临界区进行保护, 就需要加锁和解锁。要对某个区域加锁,就要调用pthread_mutex_lock函数来加锁,参数就是你定义的锁。要解锁,就用pthread_mutex_unlock函数。

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号

调用 pthread_ lock 时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么 pthread_ lock 调⽤会陷⼊阻塞(执行流被挂起),等待互斥量解锁。

因此我们可以对之前的代码做出一些修改,如下:

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>

int ticket = 100;
pthread_mutex_t mutex;

void *routine(void *arg)
{
    char *id = (char*)arg;

    while(1)
    {
        pthread_mutex_lock(&mutex);
        if (ticket > 0){
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
            pthread_mutex_unlock(&mutex);
        }
        else{
            pthread_mutex_unlock(&mutex);
            break;
        }
    }
    return nullptr;
}

int main()
{
    pthread_t t1, t2, t3, t4;

    pthread_mutex_init(&mutex, NULL);

    pthread_create(&t1, NULL, routine, (void *) "thread-1");
    pthread_create(&t2, NULL, routine, (void *) "thread-2");
    pthread_create(&t3, NULL, routine, (void *) "thread-3");
    pthread_create(&t4, NULL, routine, (void *) "thread-4");

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);

    return 0;
}

// 此时就不会出现 负数情况

1.4 互斥量实现原理

  • 经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
  • 为了实现互斥锁操作,大多数体系结构都提供了 swap 或 **exchange **指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期

1.5 互斥量封装

Mutex.hpp

#include <iostream>
#include <pthread.h>

namespace LockModule
{
    class Mutex
    {
    public:
        Mutex(const Mutex&) = delete;
        const Mutex &operator = (const Mutex&) = delete;
        Mutex()
        {
            int n = ::pthread_mutex_init(&_lock, nullptr);
            (void)n;
        }
        ~Mutex()
        {
            int n = ::pthread_mutex_destroy(&_lock);
            (void)n;
        }
        void Lock()
        {
            int n = ::pthread_mutex_lock(&_lock);
            (void)n;
        }
        void Unlock()
        {
            int n = ::pthread_mutex_unlock(&_lock);
            (void)n;
        }
    private:
        pthread_mutex_t _lock;
    };

    class LockGuard
    {
    public:
        LockGuard(Mutex &mtx):_mtx(mtx)
        {
            _mtx.Lock();
        }
        ~LockGuard()
        {
            _mtx.Unlock();
        }
    private:
        Mutex &_mtx;
    };
}

Main..cc --- 测试文件

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
#include "Mutex.hpp"

using namespace LockModule;

int ticket = 0;
Mutex mtx;
pthread_mutex_t mutex;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *route(void *arg)
{
    char *id = (char*)arg;
    while (1)
    {
        // 使用一:
        mtx.Lock();
        if (ticket > 0){
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
            mtx.Unlock();
           
        }
        else{
            mtx.Unlock();
            break;
        }
        
        // 使用二:
        // 构建临时对象 以代码块的形式
        {
            LockGuard lockguard(mtx); // 临时对象,临界区
            if (ticket > 0){
                usleep(1000);
                printf("%s sells ticket:%d\n", id, ticket);
                ticket--;
            }
            else{
                break;
            }
        }
    }
    return nullptr;
}

int main()
{  
    pthread_t t1, t2, t3, t4;
    pthread_mutex_init(&mutex, NULL);

    pthread_create(&t1, NULL, route, (void*)"thread-1");
    pthread_create(&t2, NULL, route, (void*)"thread-2");
    pthread_create(&t3, NULL, route, (void*)"thread-3");
    pthread_create(&t4, NULL, route, (void*)"thread-4");

    int cnt = 10;
    while(true)
    {
        sleep(1);
        ticket += cnt;
        printf("主线程放票, ticket: %d\n", ticket);
        pthread_cond_signal(&cond);
    }

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);

    pthread_mutex_destroy(&mutex);

    return 0;
}

注意:RAII 风格的互斥锁, C++11也有,比如:

std::mutex mtx; 
std::lock_guard<std::mutex> guard(mtx);

2. 线程同步 📚

2.1 条件变量

  • 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了
  • 例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列中。这种情况就需要用到条件变量

🔥 举例:有A,B,C三个人,一个盘子。B拿出苹果放到盘子上,另外两人就可以到盘子上拿。为了在放苹果的时候,其他人不能来拿,就要加锁,盘子就是临界区。因为另外两人想拿苹果,就一直申请锁,导致B放不了苹果。此时就需要一个铃铛。A,C两人在外面排队,当B放好苹果后就摇铃铛,此时A和C就会根据排队的顺序依次进去拿苹果。

  • 上面的铃铛就是条件变量,人就是线程。摇铃铛后,可以规定是唤醒一个线程还是唤醒全部

2.2 同步概念与竟态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
  • 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

**2.3 条件变量函数 **

**① 初始化和销毁 **

初始化:
    int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
    cond:要初始化的条件变量
    attr:NULL

销毁:
    int pthread_cond_destroy(pthread_cond_t *cond) 

pthread_cond_t cond = PTHREAD_COND_INITIALIZER
  • 条件变量是 pthread_cond_t 的数据类型。它的使用跟前面互斥锁一样,可以定义成局部或者全局的。
  • 如果是全局或者静态的,可以直接使用 PTHREAD_COND_INITIALIZER 初始化。
  • 如果是局部的,就用pthread_cond_init 函数初始化,使用完了就destroy销毁掉

② 等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
    cond:要在这个条件变量上等待
    mutex:互斥量,后⾯详细解释
  • 线程条件不满足时,线程就要等待,要在指定的条件变量上等待

③ 唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
  • 等待完成后,就要进行唤醒。
  • **pthread_cond_signal 表示唤醒一个线程,pthread_cond_broadcast **表示唤醒所有线程。

④ 案例

#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *active(void *arg)
{
    std::string name = static_cast<const char*>(arg);
    while(true)
    {
        pthread_mutex_lock(&mutex);
        // 没有对于资源是否就绪的判定
        pthread_cond_wait(&cond, &mutex); // mutex??
        printf("%s is active!\n", name.c_str());
        pthread_mutex_unlock(&mutex);
    }
    
}

int main()
{
    pthread_t tid1, tid2, tid3;

    pthread_create(&tid1, nullptr, active, (void*)"thread-1");
    pthread_create(&tid2, nullptr, active, (void*)"thread-2");
    pthread_create(&tid3, nullptr, active, (void*)"thread-3");

    sleep(1);
    printf("Main thread ctrl begin...\n");

    while(true){
        printf("main wakeup thread...\n");
        pthread_cond_signal(&cond); // 一个一个唤醒
        // pthread_cond_broadcast(&cond); // 全部唤醒
        sleep(1);
    }

    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);
    pthread_join(tid3, nullptr);

    return 0;
}

// 输出
Main thread ctrl begin...
main wakeup thread...
thread-1 is active!
main wakeup thread...
thread-2 is active!
main wakeup thread...
thread-3 is active!
main wakeup thread...
thread-1 is active!

2.4 为什么 pthread_cond_wait 需要互斥量 ❓

  • 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程
  • 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据

按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了

如下代码:

// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
    pthread_mutex_unlock(&mutex);
    //解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
    pthread_cond_wait(&cond);
    pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);

由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 **pthread_cond_wait **将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait。所以解锁和等待必须是一个原子操作。

  • **int pthread cond_wait(pthread_cond t cond, pthread_mutex_t mutex); 进入该函数后,会去看条件量是否等于0,不等于就把互斥量变成1,直到cond_wait返回,把条件量改成1,把互斥量恢复成原样

**2.5 条件变量使用规范 **

等待条件

pthread_mutex_lock(&mutex);
while (条件为假)
    pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);

给条件发送信号代码​

pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

2.6 条件变量封装

Cond.hpp

下面代码用到了之前写的互斥量封装

#pragma once

#include <pthread.h>
#include <iostream>
#include "Mutex.hpp"

namespace CondModule
{
    using namespace LockModule;

    class Cond
    {
    public:
        Cond()
        {
            int n = ::pthread_cond_init(&_cond, nullptr);
            (void)n;
        }
        void Wait(Mutex &mutex) // 让我们的线程释放原有的锁
        {
            int n = ::pthread_cond_wait(&_cond, mutex.LockPtr());
        }
        void Notify()
        {
            int n = ::pthread_cond_signal(&_cond);
            (void)n;
        }
        void NotifyAll()
        {
            int n = ::pthread_cond_broadcast(&_cond);
            (void)n;            
        }

        ~Cond()
        {
            int n = ::pthread_cond_destroy(&_cond);
        }
    private:
        pthread_cond_t _cond;
    };
}

🔥 上面实现了一个条件变量类

Cond

,它使用了 POSIX 线程库(

pthread

)提供的条件变量机制。条件变量允许线程在某些条件下等待并在条件满足时被唤醒,常用于实现线程间的同步和通信。该类依赖于一个自定义的

Mutex

类来确保线程安全

Cond 类是对 POSIX 条件变量的封装,提供了三种主要操作:

  • Wait(Mutex &mutex):让当前线程在条件变量 _cond 上等待,并且在等待时释放传入的 mutex 锁。
  • Notify():通知一个等待的线程,唤醒它。
  • NotifyAll():通知所有等待的线程,唤醒它们。

线程与条件变量的交互:

  • 当线程调用 Wait 时,它会释放 mutex 锁,并在条件变量 _cond 上等待直到被唤醒。
  • 当线程调用 Notify 或 NotifyAll 时,它会唤醒至少一个(或所有)等待条件变量的线程,使它们重新获得 mutex 锁并继续执行。

Mutex** 类依赖**:这个类中使用了 Mutex 类(在 LockModule 中定义)来提供对互斥锁的操作,mutex.LockPtr() 传递给 pthread_cond_wait 作为互斥锁。

线程同步和互斥:这个类的实现是线程安全的,确保了在多线程环境中通过条件变量来实现线程间的协调。例如,生产者消费者模型中,生产者可以在队列满时等待,消费者可以在队列空时等待,直到条件满足,线程才会继续执行(这个下面我们要用到,可以先看看)

有个问题:(注意注意)

🍉 为了让条件变量更具有通用性,建议封装的时候,不要在Cond类内部引用对应的封装互斥
量,要不然后面组合的时候,会因为代码耦合的问题难以初始化,因为⼀般而言 Mutex
Cond 基本是⼀起创建的

3. 生产消费者模型 🖊

💢 生产者-消费者模型(Producer-Consumer Model)是一种经典的多线程同步问题,它描述了两个线程(或进程)之间的协作:

  • 一个或多个生产者线程生成数据项,并将它们放入缓冲区中
  • 一个或多个消费者线程从缓冲区中取出数据项,并进行处理
  • 这个模型通常用于解决生产者和消费者在不同速度下工作时的同步和数据传输问题

3.1 为什么要使用生产消费者模型

💢 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点:

  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

** 为了方便记忆,这里有一个“321”原则 : **

  1. 一个交易场所(一段内存空间)
  2. 两种角色(生产、消费角色)
  3. 三种关系(生产和生产、消费和消费 、生产和消费)前两种是互斥关系,最后一种是互斥和同步的关系

3.2 基于 BlockingQueue 的生产者消费者模型

🔥 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出

3.3 C++ queue 模拟生产消费者模型

BlockQueue.hpp -- 封装

#pragma once

#include <iostream>
#include <queue>
#include <unistd.h>

namespace BlockQueueModule
{
    static const int gcap = 10;

    template <typename T>
    class BlockQueue
    {
    private:
        // 判断对象为空为满,本身就是访问临界资源
        bool IsFull() {return _q.size() == _cap;}
        bool IsEmpty() {return _q.empty();}

    public:
        BlockQueue(int cap = gcap) :_cap(cap), _cwait_num(0), _pwait_num(0)
        {
            pthread_mutex_init(&_mutex, nullptr);
            pthread_cond_init(&_productor_cond, nullptr);
            pthread_cond_init(&_consumer_cond, nullptr);
        }

        void Equeue(const T &in)  // 生产者
        {
            pthread_mutex_lock(&_mutex);
            // 注意:这里并不是我们想要放数据就可以去放数据的,生产数据都是有条件的
            // 结论1: 在临界区中等待是必然的(目前)
            while(IsFull()) // 5. 不建议用 if, 对条件进行判定,防止伪唤醒
            {
                std::cout << "生产者进入等待..." << std::endl;

                // 2. 等待是,然后释放 _mutex
                _pwait_num++;
                pthread_cond_wait(&_productor_cond, &_mutex); // 在临界区中等待是必然的
                _pwait_num--;                
                // 3. 返回,线程被唤醒 && 重新申请并持有锁(它会在临界区醒来)

                std::cout << "生产者被唤醒..." << std::endl;
            }
            // 4. if(Full()) 不满足 || 线程被唤醒
            _q.push(in); // 生产

            // 肯定有数据
            if(_cwait_num)
            {
                std::cout << "叫醒消费者" << std::endl;
                pthread_cond_signal(&_consumer_cond);
            }

            pthread_mutex_unlock(&_mutex);
            // TODO
        }

        void Pop(T *out) // 消费者
        {
            pthread_mutex_lock(&_mutex);
            // 线程伪唤醒(Thread Spurious Wakeup) 是指在多线程程序中,某个线程在本应处于阻塞状态(如等待条件变量、信号量、互斥锁等)时,
            // 可能会无故被唤醒,而并非因为真正的条件满足。这种唤醒没有实际的意义,需要线程在唤醒后进行检查,以确保执行正确的操作
            while(IsEmpty()) // 5. 不建议用 if, 对条件进行判定,防止伪唤醒
            {
                std::cout << "消费者进入等待..." << std::endl;
                _cwait_num++;
                pthread_cond_wait(&_consumer_cond, &_mutex); // 伪唤醒:在条件不满足的情况被唤醒
                _cwait_num--;
                std::cout << "消费者被唤醒..." << std::endl;
            }
            // 4. if(Ispty()) 不满足 || 线程被唤醒
            *out = _q.front();
            _q.pop();

            // 肯定有空间
            if(_pwait_num)
            {
                std::cout << "叫醒生产者" << std::endl;
                pthread_cond_signal(&_productor_cond);
            }

            pthread_mutex_unlock(&_mutex);
        }

        ~BlockQueue()
        {
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_productor_cond);
            pthread_cond_destroy(&_consumer_cond);
        }
    private:
        std::queue<T> _q;  // 保存数据的容器,临界资源
        int _cap; // bq 最大容量
        pthread_mutex_t _mutex; // 互斥
        pthread_cond_t _productor_cond; // 生产者条件变量
        pthread_cond_t _consumer_cond; // 消费者条件变量

        int _cwait_num;
        int _pwait_num;
    };

}

🎐 上面实现了一个线程安全的阻塞队列(BlockQueue),使用了互斥锁(pthread_mutex_t)和条件变量(pthread_cond_t)来协调生产者和消费者的访问。下面将从类的结构、各个成员函数以及多线程同步机制等方面进行代码剖析:

****① 类结构和成员变量 ****

template <typename T>
class BlockQueue
{
private:
    std::queue<T> _q;  // 用于存放数据的容器,实际使用的队列类型为 std::queue
    int _cap; // 队列的最大容量
    pthread_mutex_t _mutex; // 互斥锁,用于保护共享数据的访问
    pthread_cond_t _productor_cond; // 生产者的条件变量
    pthread_cond_t _consumer_cond; // 消费者的条件变量

    int _cwait_num; // 当前等待消费的消费者线程数量
    int _pwait_num; // 当前等待生产的生产者线程数量
};
  • _q:一个标准的 std::queue 类型,作为数据的存储容器。它是一个典型的先进先出(FIFO)队列,线程安全地被生产者和消费者使用。
  • _cap:队列的最大容量,超过这个容量,生产者会阻塞;小于这个容量,消费者会阻塞。
  • _mutex:互斥锁,用于保证访问 _q 队列时的线程安全。
  • _productor_cond 和** _consumer_cond:**条件变量,用于生产者和消费者线程的协调。生产者在队列满时会等待,消费者在队列空时会等待。
  • _cwait_num_pwait_num:这两个变量用于统计当前正在等待的消费者和生产者线程的数量。它们的存在可以用于唤醒等待的线程

② 构造和析构函数

构造函数:BlockQueue(int cap) 用于初始化队列的容量(默认为 gcap),同时初始化互斥锁和条件变量。

  • pthread_mutex_init 初始化互斥锁 _mutex
  • pthread_cond_init 初始化生产者和消费者的条件变量

析构函数:在对象销毁时,销毁互斥锁和条件变量

③ 生产者入队列(Equeue) | 消费者出队列(Pop)

生产者线程同步机制

  • 生产者首先尝试获取 _mutex,进入临界区。
  • 如果队列已满,生产者调用 pthread_cond_wait 进入等待状态,此时会自动释放 _mutex 锁,直到被唤醒。
  • 唤醒后的生产者再次检查队列是否已满,如果队列不再满,则将数据推入队列。
  • 如果有消费者正在等待(即 _cwait_num > 0),则生产者会唤醒一个消费者。

消费者线程同步机制

  • 消费者首先尝试获取 _mutex,进入临界区。
  • 如果队列为空,消费者调用 pthread_cond_wait 进入等待状态,此时会自动释放 _mutex 锁,直到被唤醒。
  • 唤醒后的消费者再次检查队列是否为空,如果队列不再空,则从队列中取出数据并弹出。
  • 如果有生产者正在等待(即 _pwait_num > 0),则消费者会唤醒一个生产者

🦌 pthread_cond_wait 是一个阻塞调用,它在等待期间释放 mutex,等待时会一直阻塞直到被其他线程通过 **pthread_cond_signal **唤醒。

④ 伪唤醒问题

while (IsFull()) // 5. 不建议用 if, 对条件进行判定,防止伪唤醒
{
    std::cout << "生产者进入等待..." << std::endl;

    // 2. 等待是,然后释放 _mutex
    _pwait_num++;
    pthread_cond_wait(&_productor_cond, &_mutex); // 在临界区中等待是必然的
    _pwait_num--;
    // 3. 返回,线程被唤醒 && 重新申请并持有锁(它会在临界区醒来)

    std::cout << "生产者被唤醒..." << std::endl;
}

🎈 在 PopEqueue** **中都使用了 while 循环而不是 if 语句

  • 这是为了避免伪唤醒的问题。在多线程编程中,线程可能会因为条件变量的误触发而被唤醒,但条件并不满足。因此,每次唤醒后,都需要再次检查条件,确保线程的行为是正确的
  • 例如,在生产者或消费者线程被唤醒后,它们会重新检查队列是否为空或已满。如果此时队列的状态没有改变,线程会继续进入等待状态

**Main.cc -- 测试 **

#include "BlockQueue.hpp"

using namespace BlockQueueModule;

void *Consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *> (args);
    while(true)
    {
        int data;
        // 1. 从 bq 中拿到数据
        bq->Pop(&data);
        
        // 2. 做处理
        printf("Consumer, get a data: %d\n", data);
        sleep(1);
    }
}

void *Productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *> (args);
    int data = 10;
    while(true)
    {
        sleep(2);
        // 1. 从外部获取数据
        //data = 10; // 有数据

        // 2. 生产到 bq 中
        bq->Equeue(data);

        printf("producter 生产了一个数据: %d\n", data);
        data++;
    }
}

int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(5);
    // 单生产, 单消费
    // 多生产, 多消费
    pthread_t c1, c2, p1, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    //pthread_create(&c2, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);
    //pthread_create(&p2, nullptr, Productor, bq);
    //pthread_create(&p3, nullptr, Productor, bq);

    pthread_join(c1, nullptr);
    //pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    //pthread_join(p2, nullptr);
    //pthread_join(p3, nullptr);
    
    delete bq;

    return 0;
}

当我们是单生产、单消费的时候,结果如下:

当然我们也可以进行多生产多消费,大家也可以自己试试,这里就不过多测试啦

多线程生产者消费者模型高效是因为:一个生产者在生产任务的时候,其他生产者在构建任务,一个消费者在获取任务的时候,其他消费者在处理任务。

为什么线程在等待的时候,都是在加锁和解锁之间等待?

  • 答:无论是生产者还是消费者,都必须先检查资源的状态。检查就是要访问,所以检查之前就要加锁,等待必须在临界区里进行等待,因为判断结果是在临界区里的

🥑 基于之前对 互斥量条件变量封装来完善我们的生产者消费者模型代码, ***BlockQueue.hpp ***修改如下:

#pragma once

#include <iostream>
#include <queue>
#include <unistd.h>
#include "Cond.hpp" // 引入条件变量
#include "Mutex.hpp" // 引入互斥锁

namespace BlockQueueModule
{

    // 版本 2 

    // 这两个很重要,需要先声明命名空间
    using namespace LockModule;
    using namespace CondModule;

    static const int gcap = 10;

    template <typename T>
    class BlockQueue
    {
    private:
        // 判断对象为空为满,本身就是访问临界资源
        bool IsFull() {return _q.size() == _cap;}
        bool IsEmpty() {return _q.empty();}

    public:
        BlockQueue(int cap = gcap) :_cap(cap), _cwait_num(0), _pwait_num(0)
        {}

        void Equeue(const T &in)  // 生产者
        {
            LockGuard lockguard(_mutex);
            while(IsFull()) // 5. 不建议用 if, 对条件进行判定,防止伪唤醒
            {
                std::cout << "生产者进入等待..." << std::endl;

                // 2. 等待是,然后释放 _mutex
                _pwait_num++;
                _productor_cond.Wait(_mutex); // 在临界区中等待是必然的
                _pwait_num--;                
                // 3. 返回,线程被唤醒 && 重新申请并持有锁(它会在临界区醒来)

                std::cout << "生产者被唤醒..." << std::endl;
            }
            // 4. if(Full()) 不满足 || 线程被唤醒
            _q.push(in); // 生产

            // 肯定有数据
            if(_cwait_num)
            {
                std::cout << "叫醒消费者" << std::endl;
                _consumer_cond.Notify();
            }
        }

        void Pop(T *out) // 消费者
        {
            LockGuard lockguard(_mutex);
            while(IsEmpty()) 
            {
                std::cout << "消费者进入等待..." << std::endl;
                _cwait_num++;
                _consumer_cond.Wait(_mutex); // 伪唤醒:在条件不满足的情况被唤醒
                _cwait_num--;
                std::cout << "消费者被唤醒..." << std::endl;
            }
            // 4. if(Ispty()) 不满足 || 线程被唤醒
            *out = _q.front();
            _q.pop();

            // 肯定有空间
            if(_pwait_num)
            {
                std::cout << "叫醒生产者" << std::endl;
                _productor_cond.Notify();
            }
        }

        ~BlockQueue()
        {}
        
    private:
        std::queue<T> _q;  // 保存数据的容器,临界资源
        int _cap; // bq 最大容量
        Mutex _mutex; // 互斥
        Cond _productor_cond; // 生产者条件变量
        Cond _consumer_cond; // 消费者条件变量

        int _cwait_num;
        int _pwait_num;
    };
}

我们不仅仅可以用生产消费者模型传递数据,还可以传递任务,如下:

Task.hpp

#pragma once

#include <iostream>
#include <unistd.h>

namespace TaskModule
{
    class Task
    {
    public:
        Task(){}
        Task(int a, int b): x(a), y(b)
        {}
        void Excute()
        {
            sleep(1); // 用 1 s 来进行模拟任务处理的时长131
            result = x + y; // 入库,访问缓存,访问网络,打印日志等
        }
        int X() {return x;}
        int Y() {return y;}
        int Result() {return result;}
    private:
        int x;
        int y;
        int result;
    };
}

Main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <functional>

void test()
{
    std::cout << "haha test" << std::endl;
}

void hello()
{
    std::cout << "hehe hello" << std::endl;
} //长传、下载、刷新、入库、同步等各种

using task_t = std::function<void()>;

using namespace BlockQueueModule;
using namespace TaskModule;

void *Consumer(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *> (args);
    while(true)
    {
        // Task t;
        // // 1. 从 bq 中拿到数据
        // bq->Pop(&t);
        

        // 2. 做处理
        //t.Excute();
        //printf("Consumer, 处理完一个任务: %d + %d = %d\n", t.X(), t.Y(), t.Result());

        task_t t;
        // 1. 从 bq 中拿到数据
        bq->Pop(&t);
        // 2. 做处理
        t();
        printf("Consumer, 处理完一个任务\n");
    }
}

void *Productor(void *args)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *> (args);
    int data = 10;
    while(true)
    {
        // // 1. 从外部获取数据
        // int x = rand() % 10 + 1;
        // int y = rand() % 20 + 1;
        
        // Task t(x, y); //构建任务

        // // 2. 生产到 bq 中
        // bq->Equeue(t);

        // printf("producter 生产了一个任务: %d + %d = ?\n", t.X(), t.Y());
        sleep(1);
        bq->Equeue(test);
        printf("producter 生产了一个任务\n");
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<task_t> *bq = new BlockQueue<task_t>(5);

    // 单生产, 单消费
    pthread_t c1, c2, p1, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq);
    pthread_create(&p1, nullptr, Productor, bq);

    pthread_join(c1, nullptr);
    pthread_join(p1, nullptr);
    
    delete bq;

    return 0;
}

那么看到这里,有个问题 ,互斥访问,高效从何谈起?

  • 也就是说作为生产者,把生产的任务放到任务队列里面,我在放别人就不能放,同样消费者拿任务也是互斥的,等于是访问整个交易资源,目前的代码就是串行访问的,那么如果是串行访问的,那么为啥生产消费者模型为啥还能这么高效呢?

现在我们有个问题 -- 》 生产者生产的数据从哪里来,消费者仅仅是把数据取走,不做其他事情嘛?

💢 在刚刚代码中,作为消费者 Pop 之后需要做处理的,所以我们在生产消费者效率中不要仅仅只考虑其内部的效率,而是应该考虑其整体效率 !!!

比如:

  1. 生产者内部没有数据,等待外部数据到来,获取数据的期间 其可能存在历史残留任务,那么此时消费者同时也可以从缓冲区中拿任务,并且处理任务,这样两者一个获取任务,一个处理任务,这两个线程此时就并行起来了
  2. 或者当我们的生产者生产了很多任务,但是消费者拿走,并且处理,此时消费者正在做处理任务的时候,生产者仍然还可以获取扔到任务队列里面,此时消费者也仍然在处理任务,此时两者也并行起来了

结论:生产消费者交易的时刻确实是串行的,但是交易的放任务、拿任务是占比时间最短的,占比更多的是生产者获取任务以及消费者处理任务,因此我们可以把整体看作是并行的,故称其效率是高的


4. 勉励

后面我会写关于 环形队列 以及 日志的相关内容,敬请期待呀

★,°*:.☆( ̄▽ ̄)/$:*.°★ 】那么本篇到此就结束啦,如果有不懂 和 发现问题的小伙伴可以在评论区说出来哦,同时我还会继续更新关于【Linux】的内容,请持续关注我 !!**


本文转载自: https://blog.csdn.net/island1314/article/details/144145832
版权归原作者 IsLand1314~ 所有, 如有侵权,请联系我们删除。

“【Linux】:多线程(互斥 && 同步)”的评论:

还没有评论