0


【Linux从入门到精通】多线程总结(线程池、线程安全问题和常见锁)详解

本篇文章主要是对线程池进行详解。同时引出了单例模式的线程池,也对线程安全问题进行了解释。其中包含了智能指针、STL容器、饿汉模式的线程安全。也对常见的锁:悲观锁(Pessimistic Locking)乐观锁(Optimistic Locking)互斥锁(Mutex Lock)读写锁(Read-Write Lock)自旋锁(Spin Lock)条件变量(Condition Variable)进行了讲解。重点对读写锁进行了讲解。希望本篇文章会对你有所帮助。

🙋‍♂️ 作者:@Ggggggtm 🙋‍♂️

👀 专栏:Linux从入门到精通 👀

💥 标题:线程池、线程安全与常见锁问题💥

** ❣️ 寄语:与其忙着诉苦,不如低头赶路,奋路前行,终将遇到一番好风景 **

一、线程池

1、1 什么是线程池

线程池是一种线程使用模式。在应用程序中,创建和销毁线程会带来性能上的开销,因此线程池的出现可以减少这种开销并提高程序的效率。

线程池内部维护了一个线程队列,其中包含了一定数量的可重复使用的线程。当有任务需要执行时,可以从线程池中获取一个空闲线程来执行任务,而不是每次都重新创建一个线程。任务执行完毕后,线程将被返回给线程池,以备下次任务执行

1、2 为什么要有线程池

** 线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价**。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

1、3 线程池demo代码

1、3、1 设计思路

线程池的情况较为特殊,是一开始就创建一定数量的线程。当有任务时,所有待命的线程去竞争这个任务。与之前不同的是,在处理任务时才创建线程。

其设计思路如下:

  1. 线程池的构造函数ThreadPool(int thread_num)可接受一个参数thread_num,用于指定线程池中线程的数量,默认为THREAD_NUM。在构造函数中,通过循环创建thread_num个Thread对象,并将它们保存在_threads数组中。
  2. 线程的创建与等待进行了封装,封装成了一个Thread类
  3. 每个Thread对象都拥有一个线程编号和静态的routine函数。routine函数是线程执行的入口点,它接受一个ThreadData参数,其中包含了当前线程的相关数据。在routine函数中,使用while(true)循环来不断从任务队列中获取任务,并执行任务的运算操作。获取任务的过程中,需要先获取互斥锁mutex,然后使用条件变量cond进行等待或唤醒
  4. ThreadPool类提供了一系列辅助函数,例如getMutex()用于获取互斥锁mutex的指针,isEmpty()用于判断任务队列是否为空,waitCond()用于进入等待状态,getTask()用于获取队列中的任务。目的就是为了在routine函数中可以轻松获取相关参数
  5. 在run()函数中,通过遍历_threads数组,依次启动每个Thread对象,使它们开始执行任务。
  6. pushTask(const T& task)函数用于将任务入队列。在入队过程中,首先获取互斥锁mutex,然后将任务task添加到_task_queue队列中,并通过pthread_cond_signal函数对条件变量cond进行信号通知,以唤醒等待的线程
  7. 析构函数~ThreadPool()用于销毁线程池。在析构函数中,首先遍历_threads数组,调用每个Thread对象的join()函数来等待线程的结束,并释放Thread对象的内存。最后,调用pthread_mutex_destroy函数和pthread_cond_destroy函数来销毁互斥锁mutex和条件变量cond。

1、3、2 demo代码

LockGuard.hpp(对互斥锁的封装)

#pragma once 

class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
    {}
    void lock() 
    {
        pthread_mutex_lock(pmtx_);
    }
    void unlock()
    {
        pthread_mutex_unlock(pmtx_);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *pmtx_;
};
 
// RAII风格的加锁方式
class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mtx):mtx_(mtx)
    {
        mtx_.lock();
    }
    ~LockGuard()
    {
        mtx_.unlock();
    }
private:
    Mutex mtx_;
};

Task.hpp(所派发的任务)

#pragma once

typedef function<int(int, int)> fun_t;
 
class Task
{
 
public:
    Task(){}
    Task(int x, int y, fun_t func)
        :x_(x)
        , y_(y)
        , func_(func)
    {}
    int operator ()()
    {
        return func_(x_, y_);
    }
public:
    int x_;
    int y_;
    // int type;
    fun_t func_;
};

Thread.hpp(创建线程的封装)

#pragma once

// typedef std::function<void* (void*)> func_t; 

typedef void *(*func_t)(void *);    // 定义一个函数类型
// (要传递给)线程的信息
class ThreadData
{
public:
    void* _args;        // 线程参数
    std::string _name;    // 线程名称
};
// 线程类
class Thread
{
public:
    Thread(int num, func_t callback, void* args)
    : _func(callback)
    {
        char threadName[64];
        snprintf(threadName, sizeof(threadName), "Thread:[%d]", num);
        _name = threadName;
        
        _td._args = args;    // 给线程传递参数
        _td._name = _name;
    }
    ~Thread()
    {}
    // 创建线程
    void start()
    {
        pthread_create(&_tid, nullptr, _func, (void*)&_td);
    }

    void join()
    {
        pthread_join(_tid, nullptr);
    }

    std::string name()
    {
        return _name;
    }

private:
    ThreadData _td;        // 要传递给线程的信息
    std::string _name;    // 线程名称
    pthread_t _tid;        // 线程ID
    func_t _func;        // 线程函数
};

ThreadPool.hpp(线程池)

#include<string>
#include <vector>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
#include <functional>

using namespace std;

#include "Thread.hpp"
#include "LockGuard.hpp"
#include "Task.hpp"

#define THREAD_NUM 5
template<class T>

class ThreadPool
{
public:
    pthread_mutex_t* getMutex()
    {
        return &mutex;
    }

    bool isEmpty()
    {
        return _task_queue.empty();
    }

    void waitCond()
    {
        pthread_cond_wait(&cond,&mutex);
    }

    T getTask()
    {
        T n=_task_queue.front();
        _task_queue.pop();
        return n;
    }
public:
    static void* routine(void* args)
    {
        ThreadData* td = (ThreadData*)args;
        ThreadPool<T>* tp = (ThreadPool<T>*)td->_args;
        while(true)
        {
            T task;
            {
                LockGuard lockguard(tp->getMutex());
                while(tp->isEmpty())
                    tp->waitCond();
                
                task=tp->getTask();
            }
            cout<< "线程" << td->_name << "运算结果是 : " << task() << endl;
        }
    }

    ThreadPool(int thread_num = THREAD_NUM)
    : _num(thread_num)
    {
        for(int i = 1; i <= _num; i++)
        {
            // 参数列表对应着Thread的构造函数
            _threads.push_back(new Thread(i, routine, this));
        }

        pthread_mutex_init(&mutex,nullptr);
        pthread_cond_init(&cond,nullptr);
    }

    // 线程执行任务
    void run()
    {
        for(auto& it : _threads)
        {
            it->start();
            std::cout << "线程开始执行任务:"<<it->name() << std::endl;
        }
    }

    // void joins()
    // {
    //     for(auto& it:_threads)
    //     {
    //         it->join();
    //     }
    // }

    // 将任务入队列
    void pushTask(const T& task)
    {
        LockGuard lockguard(&mutex);
        _task_queue.push(task);
        pthread_cond_signal(&cond);
    }

    ~ThreadPool()
    {
        for(auto& it : _threads)
        {
            it->join();
            delete it;
        }

        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
    }
private:
    std::vector<Thread*> _threads;        // 保存线程的数组
    std::queue<T> _task_queue;            // 保存任务的队列
    int _num;                            // 线程的个数
    pthread_mutex_t mutex;
    pthread_cond_t cond;
};

TestMain.cpp

int myAdd(int x, int y)
{
    return x + y;
}

int main()
{
    srand((uint64_t)time(nullptr) ^ 0x333 ^ getpid());
    ThreadPool<Task> *tp=new ThreadPool<Task>();

    tp->run();
    while(true)
    {
        int x=rand()%100+1;
        usleep(666);
        int y=rand()%88+1;
        Task t(x,y,myAdd);

        cout<<"制作任务完成 :"<< x << " + "<< y << " = ?"<<endl;
        tp->pushTask(t);

        sleep(1);
    }
    //td->joins();
    return 0;
}

1、4 懒汉方式的线程池 (线程安全版本)

我们知道懒汉方式时单例模式中的一种。我们下面给出懒汉方式的线程池伪代码:

template<class T>
class ThreadPool
{
private:
    // 将构造函数私有化,当然还有其拷贝构造和赋值重载
    ThreadPool(int thread_num = THREAD_NUM)
    : _num(thread_num)
    {
        for(int i = 1; i <= _num; i++)
        {
            // 参数列表对应着Thread的构造函数
            _threads.push_back(new Thread(i, routine, this));
        }

        pthread_mutex_init(&mutex,nullptr);
        pthread_cond_init(&cond,nullptr);
    }
    
    // 提供获取线程池指针的函数
    static ThreadPool<T>* getThreadPtr()
    {

        if(nullptr==thread_ptr)
        {
            thread_ptr=new ThreadPool<T>();
        }
        return thread_ptr;
    }
private:
    std::vector<Thread*> _threads;        // 保存线程的数组
    std::queue<T> _task_queue;            // 保存任务的队列
    int _num;                            // 线程的个数
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    static ThreadPool<T>* thread_ptr;
}
template<class T>
ThreadPool<T>* ThreadPool<T>::thread_ptr=nullptr;

我们在没有学习多线程之前看上述代码似乎并没有问题。实际上有多个执行流在执行时,就会出现问题。如果当前有多个线程同时想要申请线程池对象呢,可能就会不是单例了!!!所以我们也应该在获取线程池对象指针的函数中加锁进行保护。实际代码如下:

    static ThreadPool<T>* getThreadPtr()
    {
        //{
            //    LockGuard lockguard(&g_mutex);
            //    if(nullptr==thread_ptr)
            //    {
            //        thread_ptr=new ThreadPool<T>();
            //    }
        //}

        if(nullptr==thread_ptr)
        {
            LockGuard lockguard(&g_mutex);
            if(nullptr==thread_ptr)
            {
                thread_ptr=new ThreadPool<T>();
            }
        }
        return thread_ptr;
    }

上述代码加锁加的很巧妙。我们知道单例模式只能实例出一个对象。所以我们先判断其是否为空,也就是是否已经实例出对象了。然后再申请锁。这样不但保护了申请对象的安全,同时也减少了申请锁的次数。

二、线程安全

2、1 STL容器线程安全问题

STL中的容器是否是线程安全的呢?答案是:STL中的容器不是线程安全的

原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶)。因此 STL 默认不是线程安全。如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。

2、2 智能指针线程安全问题

对于** unique_ptr, 由于只是在当前代码块范围内生效,因此不涉及线程安全问题**。

对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,,基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数。

三、常见的几种锁

在多线程编程中,常见的几种锁包括悲观锁(Pessimistic Locking)乐观锁(Optimistic Locking)互斥锁(Mutex Lock)读写锁(Read-Write Lock)自旋锁(Spin Lock)条件变量(Condition Variable)。下面对它们进行详细解释:

  1. 悲观锁(Pessimistic Locking):- 悲观锁的基本思想是,对共享资源的访问持保守态度,认为并发操作可能会产生冲突,因此,在访问共享资源之前,先获取锁来确保独占访问。- 使用悲观锁时,当一个线程需要对共享资源进行读或写操作时,首先尝试获取锁。如果锁已被其他线程持有,则当前线程会被阻塞,直到锁被释放。- 悲观锁通常使用互斥锁(Mutex Lock)或读写锁(Read-Write Lock)等来实现。
  2. 乐观锁(Optimistic Locking):- 乐观锁的基本思想是,每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。- 使用乐观锁时,当一个线程需要对共享资源进行读或写操作时,先读取当前版本号或时间戳,并在执行操作前记录下来。然后,当要提交修改时,先检查版本号或时间戳是否与之前读取的一致,如果一致则提交成功,否则表示发生了冲突,需要进行回滚或重试操作。- 乐观锁通常使用版本号或时间戳等机制来实现,如数据库中的乐观并发控制。
  3. 互斥锁(Mutex Lock):- 互斥锁用于保护临界区,确保同一时间只有一个线程能够进入临界区进行操作,从而避免数据竞争。- 在获取互斥锁之前,如果锁已经被其他线程占用,则线程会被阻塞,并等待锁的释放;一旦获取到锁,线程可以进入临界区执行操作,执行完毕后释放锁供其他线程使用。- 互斥锁的实现可以是阻塞式的(Blocking Mutex),也可以是非阻塞式的(Non-blocking Mutex)。
  4. 读写锁(Read-Write Lock):- 读写锁用于在读多写少的情况下提高并发性能。它分为读共享模式和写独占模式- 读共享模式允许多个线程同时对共享资源进行读取操作,不互斥;而写独占模式则排斥地获取锁,一次只能有一个线程进行写入操作,且当前不能有读操作。- 当有线程持有读锁时,其他线程可以继续持有读锁而不会被阻塞;但当有线程持有写锁时,其他读写线程都会被阻塞等待。- 读写锁允许多个线程同时读取共享资源,从而提高并发性能。
  5. 自旋锁(Spin Lock):- 自旋锁是一种忙等的锁机制,用于保护临界区,并在获取锁失败时自旋等待锁的释放。- 当一个线程尝试获取自旋锁时,如果锁已被其他线程占用,则该线程不会被阻塞,而是通过循环不断地检查锁是否被释放。- 自旋锁适用于临界区的锁定时间较短且线程竞争不激烈的情况下,避免了线程切换的开销,但也可能导致CPU资源的浪费。- 什么情况下使用自旋锁呢?决定因素就是等待临界资源就绪的时间。如果等待临界资源就绪时间过长,一直在循环检测不就是一种浪费吗!!!如果临界区的代码执行很快,那么忙等待所消耗的时间可能比线程挂起与唤醒的时间更短,从而提高了性能。其次,自旋锁适用于并发竞争较小的情况。因为自旋锁是通过忙等待来获取锁,如果并发竞争激烈,那么会导致大量的线程在忙等待,浪费了大量的CPU资源。
  6. 条件变量(Condition Variable):- 条件变量用于在线程之间进行等待和通知,用来解决生产者-消费者等经典同步问题。- 线程可以通过条件变量等待某个条件成立,在条件不满足时将自己放入等待队列,等待其他线程发出通知唤醒自己。- 条件变量通常与互斥锁一起使用,等待前需要先加锁,唤醒后也会自动解锁。- 在满足条件的情况下,其他线程可以发送信号或广播(signal/broadcast)来唤醒等待的线程。

这些锁机制提供了不同的线程同步方式,应根据具体的多线程场景和需求选择合适的锁来保证并发操作的正确性和性能。

四、读者学者问题(读写锁)

4、1 简单理解读者学者问题

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。这也就是读者写者问题。

读者写者问题就是基于读写锁来实现的。我们可结合下图理解读者写着的读写操作:

4、2 读写锁常用接口介绍

常用的读写锁接口有以下几个:

  1. 初始化锁:int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr); 该函数用于初始化一个读写锁对象,参数rwlock为要初始化的读写锁对象的指针,attr为锁属性,一般使用默认值NULL即可。
  2. 销毁锁:int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); 该函数用于销毁一个读写锁对象,释放相关资源。
  3. 加读锁:int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); 该函数用于对读写锁对象加读锁,如果有其他线程持有写锁,则当前线程会被阻塞,直到写锁释放。
  4. 加写锁:int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); 该函数用于对读写锁对象加写锁,如果有其他线程持有读锁或写锁,则当前线程会被阻塞,直到所有的读锁和写锁释放。
  5. 尝试加读锁:int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); 该函数尝试对读写锁对象加读锁,如果无法获取到锁(有其他线程持有写锁),则立即返回错误码。
  6. 尝试加写锁:int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); 该函数尝试对读写锁对象加写锁,如果无法获取到锁(有其他线程持有读锁或写锁),则立即返回错误码。
  7. 解锁:int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); 该函数用于释放读写锁对象的锁,如果是读锁,则允许其他线程继续获取读锁或写锁;如果是写锁,则允许其他线程获取读锁或写锁。

4、3 用互斥锁实现读写锁

当我们了解到读写锁后,那么怎么用互斥锁来实现一个读写锁的功能呢? 大概思路就是我们用一个表变量来统计读者的数量(也就是来记录读锁的个数)。当读者为0时,才可进行写操作。读者之间不互斥。给出伪代码如下:

int cnt = 0;

pthread_mutex_t rd_count_mtx;
pthread_mutex_t wt_mtx;

void read()
{
    pthread_mutex_lock(&rd_count_mtx);
    cnt++;
    if(cnt == 1) // 表示已经有读者,且只加一次锁就可以
        pthread_mutex_lock(&wt_mtx);
    pthread_mutex_unlock(&rd_count_mtx);

    // 进行读操作
    // ......

    pthread_mutex_lock(&rd_count_mtx);
    cnt--;
    if(cnt == 0) // 表示已经没有读者,可以进行写操作
        pthread_mutex_unlock(&wt_mtx);
    pthread_mutex_unlock(&rd_count_mtx);
}

本文转载自: https://blog.csdn.net/weixin_67596609/article/details/133148816
版权归原作者 Ggggggtm 所有, 如有侵权,请联系我们删除。

“【Linux从入门到精通】多线程总结(线程池、线程安全问题和常见锁)详解”的评论:

还没有评论