0


【Linux】多线程7——线程池

1.线程池的概念

1.1.池化技术

池化技术指的是提前准备一些资源,在需要时可以重复使用这些预先准备的资源。

在系统开发过程中,我们经常会用到池化技术。通俗的讲,池化技术就是:把一些资源预先分配好,组织到对象池中,之后的业务使用资源从对象池中获取,使用完后放回到对象池中。

这样做带来几个明显的好处:

  1. 资源重复使用, 减少了资源分配和释放过程中的系统消耗。比如,在IO密集型的服务器上,并发处理过程中的子线程或子进程的创建和销毁过程,带来的系统开销将是难以接受的。所以在业务实现上,通常把一些资源预先分配好,如线程池,数据库连接池,Redis连接池,HTTP连接池等,来减少系统消耗,提升系统性能。
  2. 可以对资源的整体使用做限制。这个好理解,相关资源预分配且只在预分配是生成,后续不再动态添加,从而限制了整个系统对资源的使用上限。类似一个令牌桶的功能。
  3. 池化技术分配对象池,通常会集中分配,这样有效避免了碎片化的问题。

池化技术简单点来说,就是提前保存大量的资源,以备不时之需。

池化技术有两个特点,提前创建和重复利用。

    由于在实际应用当做,分配内存、创建进程、线程都会设计到一些系统调用,系统调用需要导致程序从用户态切换到内核态,是非常耗时的操作。因此,当程序中需要频繁的进行内存申请释放,进程、线程创建销毁等操作时,通常会使用内存池、进程池、线程池技术来提升程序的性能。

    对连接或线程的复用,并对复用的数量、时间等进行控制,从而使得系统的性能和资源消耗达到最优状态。

池化技术 的本质:空间换时间

1.2.线程池

    线程池的基本概念是,**在应用程序启动时创建一定数量的线程,并将它们保存在线程池中。当需要执行任务时,从线程池中获取一个空闲的线程,将任务分配给该线程执行。当任务执行完毕后,线程将返回到线程池,可以被其他任务复用。**

    **线程池的设计思想是为了避免频繁地创建和销毁线程的开销,以及控制并发执行的线程数量,从而提高系统的性能和资源利用率。**

    线程池(Thread Pool)是一种并发编程中常用的技术,用于管理和重用线程。它由线程池管理器、工作队列和线程池线程组成。

线程池的关键组成部分包括:

  1. 线程池管理器(ThreadPoolExecutor):负责创建、管理和控制线程池。它负责线程的创建、销毁和管理,以及线程池的状态监控和调度任务。
  2. 工作队列(BlockingQueue):用于存储待执行的任务。当线程池中的线程都在执行任务时,新的任务会被放入工作队列中等待执行。
  3. 线程池线程(Worker Thread):实际执行任务的线程。线程池中会维护一组线程,这些线程可以被重复使用,从而避免了频繁创建和销毁线程的开销。

1.3.线程池的优点

** 线程池** 的优点在于 高效、方便,线程在使用前就已经创建好了,使用时直接将任务交给线程完成。此外线程会被合理调度,确保 任务与线程 间能做到负载均衡

  1. 重用线程:线程池会在内部维护一组可重用的线程,避免了频繁地创建和销毁线程的开销,提高了线程的利用率。
  2. 控制并发度:线程池可以限制并发执行的线程数量,防止系统过载。通过调整线程池的大小,可以控制并发度,避免资源消耗过大。
  3. 提供线程管理和监控:线程池提供了一些管理和监控机制,例如线程池的创建、销毁、线程状态的监控等,方便开发人员进行线程的管理和调试。
  4. 提供任务队列:线程池通常会使用任务队列来存储待执行的任务,这样可以实现任务的缓冲和调度

线程池 中的线程数量不是越多越好,因为线程增多会导致调度变复杂,具体创建多少线程取决于具体业务场景,比如 处理器内核、剩余内存、网络中的 socket 数量等

线程池 还可以配合 「生产者消费者模型」 一起使用,做到 解耦与提高效率

1.4.线程池的应用场景

线程池 有以下几种应用场景:

  1. 存在大量且短小的任务请求,比如 Web 服务器中的网页请求,使用 线程池 就非常合适,因为网页点击量众多,并且大多都没有长时间连接访问
  2. 对性能要求苛刻,力求快速响应需求,比如游戏服务器,要求对玩家的操作做出快速响应
  3. 突发大量请求,但不至于使服务器产生过多的线程,短时间内,在服务器创建大量线程会使得内存达到极限,造成出错,可以使用 线程池 规避问题

2.线程池_V1(朴素版)——够用了

「朴素版」:实现最基本的线程池功能,直接使用系统提供的接口

所谓朴素版就是不加任何优化设计,只实现 线程池 最基础的功能,便于理解 线程池

创建 ThreadPool_v1.hpp 头文件

将 线程池 实现为一个类,提供接口供外部调用

首先要明白 线程池 的两大核心:一批线程 与 任务队列,客户端发出请求,新增任务,线程获取任务,执行任务,因此 ThreadPool_v1.hpp 的大体框架如下

  1. 创建一批线程,通过容器将它们管理起来
  2. 创建任务队列,存储就绪的任务
  3. 设置互斥锁
  4. 设置条件变量

互斥锁 的作用是 保证多个线程并访问任务队列时的线程安全,而 条件变量 可以在 任务队列 为空时,让一批线程进入等待状态,也就是线程同步

注:为了方便实现,直接使用系统调用接口及容器,比如 pthread_t、vector、queue 等

ThreadPool_v1.hpp

#pragma once

#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>

#define THREAD_NUM 10 // 线程池线程的默认个数

template <class T>
class ThreadPool
{
public:
    ThreadPool(int num = THREAD_NUM) // 线程数量
        : _threads(num), _num(num)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        // 互斥锁、条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    void init()
    {
        // 其他信息初始化(当前不需要)
    }

    void start()
    {
        // 启动线程池
        // ...
    }

    // 提供给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 业务处理
        // ...
    }

private:
    std::vector<pthread_t> _threads; // 存放线程的容器
    int _num;                        // 线程数量
    std::queue<T> _tasks;            // 利用 STL 自动扩容的特性,无需担心容量
    pthread_mutex_t _mtx;            // 互斥锁
    pthread_cond_t _cond;            // 条件变量
};

这里需要补充一点,

  • 类内使用多线程就得需要将提供给线程的回调函数需要设置为静态,否则就会报错(参数不匹配)
  • 需要提前给 vector 扩容,避免后面使用时发生越界访问

接下来就继续设计了,

**init()的设计 — 位于

ThreadPool

类**

当前场景只需要初始化 互斥锁条件变量,在 构造函数 中完成就行了,所以这里的

init()

函数不需要补充,所以我们把它删掉即可


**start()的设计 — 位于

ThreadPool

类**

启动 线程池 需要先创建出一批线程,这里直接循环创建即可

void start()
{
    // 创建一批线程并启动
    for(int i = 0; i < _num; i++)
        pthread_create(&_threads[i], nullptr, threadRoutine, nullptr); 
}

**线程的回调函数

threadRoutine()的设计

— 位于

ThreadPool

类**

这里进行简单测试,打印当前线程的线程

ID

就行了,并且直接

detach

,主线程无需等待次线程运行结束

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());

    while (true)
    {
        std::cout << "Thread Running... " << pthread_self() << std::endl;
        sleep(1);
    }
}

**创建

main.cc

源文件,测试线程池的代码**

#include "ThreadPool_V1.hpp"
#include <memory>

int main()
{
    std::unique_ptr<ThreadPool<int>> ptr(ThreadPool<int>());

    ptr->init();
    ptr->start();

    // 还有后续动作

    return 0;
}

makefile

test:main.cc
    g++ -o $@ $^ -std=c++11 -lpthread

.PHONY:clean
clean:
    rm -rf test

编译并运行代码,可以看到 确实创建了一批线程,当主线程退出后,其他次线程也就跟着终止了

好了,接着设计


线程池 还需要提供一个重要的接口 **

pushTask()

**,将用户需要执行的业务装载至 任务队列 中,等待线程执行

**装载任务

pushTask()

— 位于

ThreadPool

类**

// 装载任务
void pushTask(const T& task)
{
    // 本质上就是在生产商品,需要加锁保护
    pthread_mutex_lock(&_mtx);
    _tasks.push(task);

    // 唤醒消费者进行消费
    pthread_cond_signal(&_cond);//线程都会阻塞在这里条件变量里面
    pthread_mutex_unlock(&_mtx);
}

** 装载任务的本质就是在生产任务,相当于用户充当生产者,通过这个接口将任务生产至任务队列中,而线程充当消费者,从任务队列中获取任务并消费**

所以线程的回调函数需要从 任务队列 中获取任务,进行消费

  1. 检测是否有任务
  2. 有 -> 消费
  3. 没有 -> 等待

**线程回调函数

threadRoutine()

— 位于

ThreadPool

类**

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());

    while (true)
    {
        // 任务队列是临界资源,需要保护
        pthread_mutex_lock(&_mtx);

        // 等待条件满足
        while(_tasks.empty())
            pthread_cond_wait(&_cond, &_mtx);

        T task = _tasks.front();
        _tasks.pop();

        // task(); // 进行消费(存疑)

        pthread_mutex_unlock(&_mtx);
    }
}

注意: **判断任务队列是否为空需要使用

while

,确保在多线程环境中不会出现问题**

因为 任务队列、互斥锁、条件变量 是类内成员,而这里的

threadRoutine()

函数是一个静态函数,并没有

this

指针以访问类内成员,可以采取传递

this

指针的方式解决问题

**启动线程池

start()

— 位于

ThreadPool

类**

void start()
{
    // 创建一批线程并启动
    for(int i = 0; i < _num; i++)
        pthread_create(&_threads[i], nullptr, threadRoutine, this); // 传递 this 指针
}
threadRoutine()

函数需要将参数

void*

转化为所在类对象的指针,并通过该指针访问类内成员

**线程回调函数

threadRoutine()

— 位于

ThreadPool

类**

// 提供给线程的回调函数
static void *threadRoutine(void *args)
{
    // 避免等待线程,直接剥离
    pthread_detach(pthread_self());

    auto ptr = static_cast<ThreadPool<T>*>(args);

    while (true)
    {
        // 任务队列是临界资源,需要保护
        pthread_mutex_lock(&ptr->_mtx);

        // 等待条件满足
        while(ptr->_tasks.empty())
            pthread_cond_wait(&ptr->_cond, &ptr->_mtx);

        T task = ptr->_tasks.front();
        ptr->_tasks.pop();

        //task(); // 进行消费(存疑)

        pthread_mutex_unlock(&ptr->_mtx);
    }
}

到这里也差不多了,我们整合一下我们的代码

ThreadPool_v1.hpp

#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>

#define THREAD_NUM 10 // 线程池线程的默认个数

template <class T>
class ThreadPool
{
private:
    void lockQueue() // 加锁
    {
        pthread_mutex_lock(&_mtx);
    }

    void unlockQueue() // 解锁
    {
        pthread_mutex_unlock(&_mtx);
    }

    void threadWait() // 让当前线程进入阻塞队列等待
    {
        pthread_cond_wait(&_cond, &_mtx);
    }

    void threadWakeUp() // 唤醒第一个线程
    {
        pthread_cond_signal(&_cond);
    }

    bool isEmpty() // 任务是不是为空
    {
        return _tasks.empty();
    }

public:
    ThreadPool(int num = THREAD_NUM) // 线程数量
        : _threads(num), _num(num)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        // 互斥锁、条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    T popTask() // 清除任务
    {
        T task = _tasks.front();
        _tasks.pop();

        return task;
    }

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护
        lockQueue();
        _tasks.push(task);

        // 唤醒消费者进行消费
        threadWakeUp();
        unlockQueue();
    }

    void start()
    {
        // 创建一批线程并启动
        for (int i = 0; i < _num; i++)
            pthread_create(&_threads[i], nullptr, threadRoutine, nullptr);
    }

    // 提供给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 避免等待线程,直接剥离
        pthread_detach(pthread_self());

        auto ptr = static_cast<ThreadPool<T> *>(args);

        while (true)
        {
            // 任务队列是临界资源,需要保护
            ptr->lockQueue();

            // 等待条件满足
            while (ptr->isEmpty())
                ptr->threadWait(); // 进来了就先排队

            T task = ptr->popTask();

            ptr->unlockQueue(); // 解锁

            // 消费行为可以不用加锁(一个商品只会被一个线程消费,因为我们只唤醒了一个)
            task();
        }
    }

private:
    std::vector<pthread_t> _threads; // 存放线程的容器
    int _num;                        // 线程数量
    std::queue<T> _tasks;            // 利用 STL 自动扩容的特性,无需担心容量
    pthread_mutex_t _mtx;            // 互斥锁
    pthread_cond_t _cond;            // 条件变量
};
    细节: **轮到线程执行任务时,不需要加锁,这就好比你买桶泡面回家,是不必担心别人会和你争抢,可以慢慢消费;同样的,你也不应该占用锁资源,主动让出锁资源以提高整体效率**

上面那个写的可能有点不太好看,下面重新写一个

ThreadPool_v1.hpp改良版本

#pragma once

#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>

#define NUM 5

//线程池
template<class T>
class ThreadPool
{
private:
    bool IsEmpty()
    {
        return _task_queue.size() == 0;
    }
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnLockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void Wait()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
public:
    ThreadPool(int num = NUM)
        : _thread_num(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    //线程池中线程的执行例程
    static void* Routine(void* arg)
    {
        pthread_detach(pthread_self());
        ThreadPool* self = (ThreadPool*)arg;
        //不断从任务队列获取任务进行处理
        while (true){
            self->LockQueue();
            while (self->IsEmpty()){
                self->Wait();
            }
            T task;
            self->Pop(task);
            self->UnLockQueue();
            
            task.Run(); //处理任务
        }
    }
    void ThreadPoolInit()
    {
        pthread_t tid;
        for (int i = 0; i < _thread_num; i++){
            pthread_create(&tid, nullptr, Routine, this); //注意参数传入this指针
        }
    }
    //往任务队列塞任务(主线程调用)
    void Push(const T& task)
    {
        LockQueue();
        _task_queue.push(task);
        UnLockQueue();
        WakeUp();
    }
    //从任务队列获取任务(线程池中的线程调用)
    void Pop(T& task)
    {
        task = _task_queue.front();
        _task_queue.pop();
    }
private:
    std::queue<T> _task_queue; //任务队列
    int _thread_num; //线程池中线程的数量
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

补充一下

  • 1.为什么线程池中需要有互斥锁和条件变量?

** 线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。**

    线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此**线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒**,因此我们需要引入条件变量。

    当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程。

注意:

  • 当某线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
  • pthread_cond_broadcast函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用pthread_cond_signal函数唤醒正在等待的一个线程即可。
  • 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后再进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区当中。
  • 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
  • 2.为什么线程池中的线程执行例程需要设置为静态方法?

      使用pthread_create函数创建线程时,需要为创建的线程传入一个Routine(执行例程),该Routine只有一个参数类型为void*的参数,以及返回类型为void*的返回值。
    
      **而此时Routine作为类的成员函数,该函数的第一个参数是隐藏的this指针,因此这里的Routine函数,虽然看起来只有一个参数,而实际上它有两个参数,此时直接将该Routine函数作为创建线程时的执行例程是不行的,无法通过编译。**
    
      静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,**因此我们需要将Routine设置为静态方法,此时Routine函数才真正只有一个参数类型为void*的参数。**
    
      但是**在静态成员函数内部无法调用非静态成员函数**,而我们需要在Routine函数当中调用该类的某些非静态成员函数,比如Pop。因此**我们需要在创建线程时,向Routine函数传入的当前对象的this指针,此时我们就能够通过该this指针在Routine函数内部调用非静态成员函数了。**
    

接下来我们来测试一下

    **我们将线程池进行了模板化,因此线程池当中存储的任务类型可以是任意的,但无论该任务是什么类型的,在该任务类当中都必须包含一个Run方法,当我们处理该类型的任务时只需调用该Run方法即可。**

例如,下面我们实现一个计算任务类:

Task.hpp

#pragma once

#include <iostream>

//任务类
class Task
{
public:
    Task(int x = 0, int y = 0, char op = 0)
        : _x(x), _y(y), _op(op)
    {}
    ~Task()
    {}

    //处理任务的方法
    void Run()
    {
        int result = 0;
        switch (_op)
        {
        case '+':
            result = _x + _y;
            break;
        case '-':
            result = _x - _y;
            break;
        case '*':
            result = _x * _y;
            break;
        case '/':
            if (_y == 0){
                std::cerr << "Error: div zero!" << std::endl;
                return;
            }
            else{
                result = _x / _y;
            }
            break;
        case '%':
            if (_y == 0){
                std::cerr << "Error: mod zero!" << std::endl;
                return;
            }
            else{
                result = _x % _y;
            }
            break;
        default:
            std::cerr << "operation error!" << std::endl;
            return;
        }
        std::cout << "thread[" << pthread_self() << "]:" << _x << _op << _y << "=" << result << std::endl;
    }
private:
    int _x;
    int _y;
    char _op;
};

此时线程池内的线程不断从任务队列拿出任务进行处理,而它们并不需要关心这些任务是哪来的,它们只需要拿到任务后执行对应的Run方法即可。

#include "Task.hpp"
#include " ThreadPool_v1.hpp"

int main()
{
    srand((unsigned int)time(nullptr));
    ThreadPool<Task>* tp = new ThreadPool<Task>; //线程池
    tp->ThreadPoolInit(); //初始化线程池当中的线程
    const char* op = "+-*/%";
    //不断往任务队列塞计算任务
    while (true){
        sleep(1);
        int x = rand() % 100;
        int y = rand() % 100;
        int index = rand() % 5;
        Task task(x, y, op[index]);
        tp->Push(task);
    }
    return 0;
}

运行代码后一瞬间就有六个线程,其中一个是主线程,另外五个是线程池内处理任务的线程。

并且我们会发现这五个线程在处理时会呈现出一定的顺序性,因为主线程是每秒Push一个任务,这五个线程只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待,当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次Push一个任务后会唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这五个线程在处理任务时会呈现出一定的顺序性。

注意:此后我们如果想让线程池处理其他不同的任务请求时,我们只需要提供一个任务类,在该任务类当中提供对应的任务处理方法就行了。

3.线程池最终进阶版本——V2版本

3.1.单例模式的概念

  • 1. 什么是单例模式

单例模式:是指一个类只会创建一次对象的设计模式,属于设计模式创建者模式中的一种。这个类提供了一种唯一访问该对象的方式,也就是说,这个类的实现只会在内存中出现一次。这样子的好处是防止频繁的创建对象导致内存资源浪费。

  • 2. 单例模式的两种形式

饿汉式:在类被加载时就会创建该类的实例对象

懒汉式:在类被加载时不回创建该类的实例对象,在首次要使用该实例时才会创建

  • 3. 单例模式的特点

1.单例类只会有一个实例

2.单例类的实例由该类自己提供对外访问的方法

3.单例类的构造函数必须是私有的

3.2.单例模式的简单实现

单例模式 有两种实现方向:饿汉懒汉,它们避免类被再次创建出对象的手段是一样的:构造函数私有化、删除拷贝构造

只要外部无法访问 构造函数,那么也就无法构建对象了,比如下面这个类

Signal

**单例类

Signal

**

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;
};

这样子外面就不能创建类对象了,当然这只实现了一半,

    还有另一半是 创建一个单例对象,既然外部受权限约束无法创建对象,那么类内是肯定可以创建对象的,只需要创建一个指向该类对象的 静态指针 或者一个 静态对象,再初始化就好了;因为外部无法访问该指针,所以还需要**提供一个静态函数 getInstance() **以获取单例对象的信息,至于具体怎么实现,需要分不同方向(饿汉 or 懒汉)
#pragma once
#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    { }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    // 获取单例对象的句柄
    static Signal *getInstance()
    {
        return _sigptr;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }

private:
    // 指向单例对象的静态指针
    static Signal *_sigptr;
};

注意:** 构造函数不能只声明,需要实现,即使什么都不写**

  • 为什么要删除拷贝构造?

如果不删除拷贝构造,那么外部可以借助拷贝构造函数,拷贝构造出一个与 单例对象 一致的 “对象”,此时就出现两个对象,这是不符合 单例模式 特点的

  • 为什么要创建一个静态函数?

单例对象也需要被初始化,并且要能被外部使用
** 调用链逻辑:通过静态函数获取句柄(静态单例对象地址)-> 通过地址调用该对象的其他函数**

3.2.1.饿汉模式

张三总是很饿,尽管饭菜还没准备好,他就已经早早的把碗洗好了,等到开饭时,直接开干

饿汉模式 也是如此,在程序加载到内存时,就已经早早的把 单例对象 创建好了(此时程序服务还没有完全启动),也就是在外部直接通过

new

实例化一个对象,具体实现如下

#pragma once

#include <iostream>

// 饿汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    static Signal *getInstance()
    {
        return _sigptr;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }

private:
    // 指向单例对象的静态指针
    static Signal *_sigptr;
};

Signal *Signal::_sigptr = new Signal();

注:在程序加载时,该对象会被创建

这里的 单例对象 本质就有点像 全局变量,在程序加载时就已经创建好了

外部可以直接通过 **

getInstance()

** 获取 单例对象 的操作句柄,来调用类中的其他函数

**

main.cc

**

#include <iostream>
#include "Signal.hpp"

int main()
{
    Signal::getInstance()->print();

    return 0;
}

可以看到,我们没有创建类对象都能调用这个静态函数。

    这就实现了一个简单的 饿汉版单例类,除了**创建 static Signal* 静态单例对象指针 外,也可以直接定义一个 静态单例对象,生命周期随进程**,不过要注意的是:**getInstance() 需要返回的也是该静态单例对象的地址,不能返回值,因为拷贝构造被删除了;并且需要在类的外部初始化该静态单例对象**
#pragma once

#include <iostream>

// 饿汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    static Signal *getInstance()
    {
        return &_sig;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }

private:
    // 静态单例对象
    static Signal _sig;
};

// 初始化
Signal Signal::_sig;

饿汉模式 是一个相对简单的单例实现方向,只需要在类中声明,在类外初始化就行了,但它也会带来一定的弊端:延缓服务启动速度

    完全启动服务是需要时间的,创建 单例对象 也是需要时间的,饿汉模式 在服务正式启动前会先创建对象,但凡这个单例类很大,服务启动时间势必会受到影响,大型项目启动,时间就是金钱

    **并且由于 饿汉模式 每次都会先创建 单例对象,再启动服务,如果后续使用 单例对象 还好说,但如果后续没有使用 单例对象,那么这个对象就是白创建了,在延缓服务启动的同时造成了一定的资源浪费**

    综上所述,**饿汉模式 不是很推荐使用,除非图实现简单,并且服务规模较小;****既然 饿汉模式 有缺点,就需要改进,于是就出现了 懒汉模式**

3.2.2.懒汉模式

    李四也是个很饿的人,他也有一个自己的碗,吃完饭后碗会脏,但他不像张三那样极端,李四比较懒,只有等他吃饭的时候,他才会去洗碗,李四这种做法让他感到无比轻松。

    在 **懒汉模式** 中,**单例对象** 并不会在程序加载时创建,而是在第一次调用时创建,第一次调用创建后,后续无需再创建,直接使用即可
#pragma once

#include <iostream>

// 懒汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    static Signal *getInstance()
    {
        // 第一次调用才创建
        if (_sigptr == nullptr)
        {
            _sigptr = new Signal();
        }

        return _sigptr;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }

private:
    // 静态指针
    static Signal *_sigptr;
};

// 初始化静态指针
Signal *Signal::_sigptr = nullptr;

注意: 此时的静态指针需要初始化为 nullptr,方便第一次判断

饿汉模式 中出现的问题这里全都避免了

  • 创建耗时 -> 只在第一次使用时创建
  • 占用资源 -> 如果不使用,就不会被创建

懒汉模式 的核心在于 延时加载,可以优化服务器的速度及资源占用

延时加载这种机制就有点像 「写时拷贝」,就du你不会使用,从而节省资源开销,类似的还有 动态库、进程地址空间 等

当然,懒汉模式 下也是可以正常使用 单例对象 的

3.2.3.饿汉模式VS懒汉模式

这样看来,懒汉模式 确实优秀,实现起来也不麻烦,为什么会说 饿汉模式 更简单呢?

    这是因为当前只是单线程场景,程序暂时没啥问题,如果当前是多线程场景,问题就大了,如果一批线程同时调用 getInstance(),同时认定 _sigptr 为空,就会创建多个 单例对象,这是不合理的

也就是说当前实现的 懒汉模式 存在严重的线程安全问题

如何证明?

** 简单改一下懒汉模式的代码,每创建一个单例对象,就打印一条语句,将代码放入多线程环境中测试**

**懒汉模式的获取单例对象句柄

getInstance()

— 位于

Signal

类**

#pragma once

#include <iostream>

// 懒汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    static Signal *getInstance()
    {
        // 第一次调用才创建
        if (_sigptr == nullptr)
        {
            std::cout << "创建了一个单例对象" << std::endl;
            _sigptr = new Signal();
        }

        return _sigptr;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }

private:
    // 静态指针
    static Signal *_sigptr;
};

// 初始化静态指针
Signal *Signal::_sigptr = nullptr;

**源文件

main.cc

**

其中使用了

lambda

表达式来作为线程的回调函数,重点在于查看现象

#include <iostream>
#include <pthread.h>
#include "Signal.hpp"

int main()
{
    // 创建一批线程
    pthread_t arr[10];
    for(int i = 0; i < 10; i++)
    {
        pthread_create(arr + i, nullptr, [](void*)->void*
            {
                // 获取句柄
                auto ptr = Signal::getInstance();
                ptr->print();
                return nullptr;
            }, nullptr);
    }

    for(int i = 0; i < 10; i++)
        pthread_join(arr[i], nullptr);

    return 0;
}

当前代码在多线程环境中,同时创建了多个 单例对象,因此是存在线程安全问题的

  • 饿汉模式没有线程安全问题吗?

没有,因为饿汉模式下,单例对象一开始就被创建了,即便是多线程场景中,也不会创建多个对象,它们也做不到

3.2.4.懒汉模式(线程安全版)

有问题就解决,解决多线程并发访问的利器是 互斥锁,那就创建 互斥锁 保护单例对象的创建

#pragma once

#include <iostream>
#include <mutex>

// 懒汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    static Signal *getInstance()
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);
        if (_sigptr == nullptr)
        {
            std::cout << "创建了一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        pthread_mutex_unlock(&_mtx);

        return _sigptr;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }

private:
    // 静态指针
    static Signal *_sigptr;
    static pthread_mutex_t _mtx;
};

// 初始化静态指针
Signal *Signal::_sigptr = nullptr;

// 初始化互斥锁
pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;

注意: **

getInstance()

是静态函数,互斥锁也要定义为静态的,可以初始化为全局静态锁**

依旧是借助之前的多线程场景,测试一下改进后的 懒汉模式 代码有没有问题

没有问题


现在还面临最后一个问题:效率问题

当前代码确实能保证只会创建一个 单例对象,但即使后续不会创建 单例对象,也需要进行 加锁、判断、解锁 这个流程,要知道 加锁 也是有资源消耗的,所以这种写法不妥

解决方案是:DoubleCheck 双检查加锁

在 加锁 前再增加一层判断,如此一来,N 个线程,顶多只会进行 N 次 加锁与解锁,这是非常优雅的解决方案

**获取静态对象句柄

getInstance()

— 位于

Signal

类**

static Signal *getInstance()
{
    // 双检查
    if(_sigptr == nullptr)
    {
        // 加锁保护
        pthread_mutex_lock(&_mtx);
        if(_sigptr == nullptr)
        {
            std::cout << "创建了一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        pthread_mutex_unlock(&_mtx);
    }

    return _sigptr;
}
  • ** 为什么要两个if**

单纯的

if

判断并不会消耗很多资源,但 加锁 行为会消耗资源,延缓程序运行速度,双检查加锁 可以有效避免这个问题

**懒汉模式最终版本1 **

#pragma once

#include <iostream>
#include <mutex>

// 懒汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    static Signal *getInstance()
    {
        // 双检查
        if (_sigptr == nullptr)
        {
            // 加锁保护
            pthread_mutex_lock(&_mtx);
            if (_sigptr == nullptr)
            {
                std::cout << "创建了一个单例对象" << std::endl;
                _sigptr = new Signal();
            }
            pthread_mutex_unlock(&_mtx);
        }

        return _sigptr;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }

private:
    // 静态指针
    static Signal *_sigptr;
    static pthread_mutex_t _mtx;
};

// 初始化静态指针
Signal *Signal::_sigptr = nullptr;

// 初始化互斥锁
pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;

上面这个代码未必也太复杂了。

    值得一提的是,**懒汉模式** 还有一种非常简单的写法:**调用 
getInstance()

时创建一个静态单例对象并返回**,因为静态单例对象只会初始化一次,所以是可行的,并且在

C++11

之后,可以保证静态变量初始化时的线程安全问题,也就不需要 双检查加锁 了,实现起来非常简单

懒汉模式最终版本2

#pragma once

#include <iostream>
#include <mutex>

// 懒汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {
    }

    // 删除拷贝构造
    Signal(const Signal &) = delete;

public:
    static Signal *getInstance()
    {
        // 静态单例对象,只会初始化一次,并且生命周期随进程
        static Signal _sig;

        return &_sig;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }
};

结果也是正常的,所以如果当前的生产环境所支持的

C++

版本为

C++11

及以后,在实现 懒汉模式 时可以选择这种简便的方式,是非常不错的;如果为了兼容性,也可以选择传统写法

注意: **静态变量创建时的线程安全问题,在

C++11

之前是不被保障的**

3.3.线程池_V2(最终版)

有了 单例模式 的相关知识后,就可以开始编写最终版线程池了

「最终版」:将线程池改为 单例模式,只允许存在一个线程池对象

这里选择 懒汉模式,因为比较优秀,并且为了确保兼容性,选择 经典写法

也就是等到使用的时候再创建!

    首先是修改 
ThreadPool

为单例模式,然后提供一个获取 单例对象 的函数,如果是第一次创建 单例对象,就需要先创建对象。

获取单例对象的函数

// 获取线程池单例对象
    static ThreadPool<T> *GetInstance()
    {
        if (nullptr == tp_) // 如果线程池对象不存在,则创建一个新的线程池对象
        {
            pthread_mutex_lock(&lock_); // 加锁保证线程安全
            if (nullptr == tp_) // 再次检查是否已经创建了线程池对象,防止多线程环境下的竞争条件
            {
                std::cout << "log: singleton create done first!" << std::endl;
                tp_ = new ThreadPool<T>(); // 创建线程池对象
            }
            pthread_mutex_unlock(&lock_); // 解锁
        }

        return tp_; // 返回线程池对象指针
    }

 // 线程池单例对象指针和互斥锁静态成员变量
    static ThreadPool<T> *tp_;
    单例模式 改完了,但现在面临一个尴尬的问题:**main.cc 无法直接将回调函数 callBack() 进行传递,因为它根本无法创建对象**

    可以试试曲线救国:**将函数对象传递给 getInstance() 函数,如果用户不传,那就使用缺省参数,也就是直接打印结果**

总之,修修改改后的线程池长这样

头文件 ThreadPool_V2.hpp

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>

// 线程信息结构体
struct ThreadInfo
{
    pthread_t tid; // 线程ID
    std::string name; // 线程名称
};

// 默认线程数量
static const int defalutnum = 5;

// 线程池模板类
template <class T>
class ThreadPool
{
private:
    // 互斥锁加锁函数
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }
    // 互斥锁解锁函数
    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }
    // 唤醒等待的线程
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }
    // 线程休眠等待条件变量
    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }
    // 判断任务队列是否为空
    bool IsQueueEmpty()
    {
        return tasks_.empty();
    }
    // 根据线程ID获取线程名称
    std::string GetThreadName(pthread_t tid)
    {
        for (const auto &ti : threads_)
        {
            if (ti.tid == tid)
                return ti.name;
        }
        return "None";
    }

public:
    // 线程处理任务的函数
    static void *HandlerTask(void *args)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        std::string name = tp->GetThreadName(pthread_self());
        while (true)
        {
            tp->Lock();

            while (tp->IsQueueEmpty())
            {
                tp->ThreadSleep();
            }
            T t = tp->Pop();
            tp->Unlock();

            t.Run();
            std::cout << name << " run, "
                      << "result: " << t.GetResult() << std::endl;
        }
    }
    // 启动线程池中的所有线程
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name = "thread-" + std::to_string(i + 1);
            pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
        }
    }
    // 从任务队列中取出一个任务
    T Pop()
    {
        T t = tasks_.front();
        tasks_.pop();
        return t;
    }

    // 向任务队列中添加一个任务
    void Push(const T &t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();
        Unlock();
    }

    // 获取线程池单例对象
    static ThreadPool<T> *GetInstance()
    {
        if (nullptr == tp_) // 如果线程池对象不存在,则创建一个新的线程池对象
        {
            pthread_mutex_lock(&lock_); // 加锁保证线程安全
            if (nullptr == tp_) // 再次检查是否已经创建了线程池对象,防止多线程环境下的竞争条件
            {
                std::cout << "log: singleton create done first!" << std::endl;
                tp_ = new ThreadPool<T>(); // 创建线程池对象
            }
            pthread_mutex_unlock(&lock_); // 解锁
        }

        return tp_; // 返回线程池对象指针
    }

private:
    // 构造函数,初始化线程池,可以指定线程数量,默认为defalutnum
    ThreadPool(int num = defalutnum) : threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr); // 初始化互斥锁
        pthread_cond_init(&cond_, nullptr); // 初始化条件变量
    }
    // 析构函数,销毁线程池资源
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_); // 销毁互斥锁
        pthread_cond_destroy(&cond_); // 销毁条件变量
    }

    // 禁止拷贝构造和赋值操作符,确保线程池对象的单一性
    ThreadPool(const ThreadPool<T> &) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c
private:
    // 线程信息列表
    std::vector<ThreadInfo> threads_;
    // 任务队列
    std::queue<T> tasks_;

    // 互斥锁和条件变量用于同步和通信
    pthread_mutex_t mutex_;
    pthread_cond_t cond_;

    // 线程池单例对象指针和互斥锁静态成员变量
    static ThreadPool<T> *tp_;
    static pthread_mutex_t lock_;
};

// 初始化线程池单例对象指针和互斥锁静态成员变量
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

注意:**这个代码要求Task类必须包含Run函数和GetResult函数 **

它要怎么使用呢?

其实很简单,就下面这3步是最关键的

ThreadPool<Task>::GetInstance()->Start(); 

Task t();//构造一个任务对象

ThreadPool<Task>::GetInstance()->Push(t);

Task.hpp

#pragma once
 
#include <iostream>
 
//任务类
class Task
{
public:
    Task(int x = 0, int y = 0, char op = 0)
        : _x(x), _y(y), _op(op)
    {}
    ~Task()
    {}
 
    //处理任务的方法
    void Run()
    {
        switch (_op)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result  = _x - _y;
            break;
        case '*':
            _result  = _x * _y;
            break;
        case '/':
            if (_y == 0){
                std::cerr << "Error: div zero!" << std::endl;
                return;
            }
            else{
                _result  = _x / _y;
            }
            break;
        case '%':
            if (_y == 0){
                std::cerr << "Error: mod zero!" << std::endl;
                return;
            }
            else{
                _result  = _x % _y;
            }
            break;
        default:
            std::cerr << "operation error!" << std::endl;
            return;
        }
    }

    int GetResult()
    {
        return _result;
    }

private:
    int _x;
    int _y;
    char _op;
    int _result;
};

此时

main.cc

想要使用线程池对象时,就得通过

getInstance()

获取句柄,然后才能进行操作

**源文件

main.cc

**

#include " ThreadPool_v1.hpp"
#include "Task.hpp"

pthread_spinlock_t slock;

int main()
{

    // 如果获取单例对象的时候,也是多线程获取的呢?
    std::cout << "process runn..." << std::endl;
    sleep(3);

    
    ThreadPool<Task>::GetInstance()->Start();
    
    srand(time(nullptr) ^ getpid());
    const char* ops = "+-*/%";

    while(true)
    {
        //1. 构建任务
        int x = rand() % 10 + 1;
        usleep(10);
        int y = rand() % 5;
        char op = ops[rand()%5];

        Task t(x, y, op);

        ThreadPool<Task>::GetInstance()->Push(t);
       //2. 交给线程池处理
        std::cout << "main thread make task: " << t.GetResult() << std::endl;

        sleep(1);
    }
}

此时是可以获取结果a的,也可以看到一批线程正在候等任务到达

  • 如何证明当前的 单例模式 生效了?

在调用 G

etInstance()

之前查看正在运行中的线程数量,调用完后再次查看,如果线程数量从

1

个变成多个,就证明 单例模式 是生效的(延迟加载)

主线程先睡眠3秒

3秒后开始

这就是线程池完全版本。

标签: linux 运维 服务器

本文转载自: https://blog.csdn.net/2301_80224556/article/details/141188737
版权归原作者 掘根 所有, 如有侵权,请联系我们删除。

“【Linux】多线程7——线程池”的评论:

还没有评论