0


【在Linux世界中追寻伟大的One Piece】多线程(三)

1 -> Linux线程同步

1.1 -> 条件变量

  • 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

1.2 -> 同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题不难理解。

1.3 -> 条件变量函数

初始化

int pthread_cond_init(pthread_cond_t* restrict cond, const pthread_condattr_t* restrict attr);

参数:
cond:要初始化的条件变量
attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t* cond);

等待条件满足

int pthread_cond_wait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex);

参数:
cond:要在这个条件变量上等待
mutex:互斥量

唤醒等待

int pthread_cond_broadcast(pthread_cond_t* cond);
int pthread_cond_signal(pthread_cond_t* cond);

简单案例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

pthread_cond_t cond;
pthread_mutex_t mutex;

void* r1(void* arg)
{
    while (1) 
    {
        pthread_cond_wait(&cond, &mutex);
        printf("活动\n");
    }
}

void* r2(void* arg)
{
    while (1) 
    {
        pthread_cond_signal(&cond);
        sleep(1);
    }
}

int main(void)
{
    pthread_t t1, t2;

    pthread_cond_init(&cond, NULL);
    pthread_mutex_init(&mutex, NULL);

    pthread_create(&t1, NULL, r1, NULL);
    pthread_create(&t2, NULL, r2, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
}

**[root@localhost linux]# ./a.out **

**活动 **

**活动 **

活动

1.4 -> 为什么pthread_cond_wait需要互斥量

  • 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
  • 条件不会无缘无故的突然变得满足,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

  • 按照上面的说法,设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上就行了。
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) 
{
    pthread_mutex_unlock(&mutex);
    //解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
    pthread_cond_wait(&cond);
    pthread_mutex_lock(&mutex);

}
pthread_mutex_unlock(&mutex);
  • 由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么pthread_cond_wait将错过这个信号,可能会导致线程永远阻塞在这个pthread_cond_wait。所以解锁和等待必须是一个原子操作。
  • int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);进入该函数后,会去看条件量是否等于0,等于0,就把互斥量变成1,直到cond_wait返回,把条件量改成1,把互斥量恢复成原样。

1.5 -> 条件变量使用规范

  • 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
    pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
  • 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

2 -> 生产者消费者模型

2.1 -> 为什么要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者索要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

2.2 -> 生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

2.3 -> 基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于:当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

2.4 -> C++ queue模拟阻塞队列的生产消费模型

代码:

#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>

#define NUM 8

class BlockQueue 
{
private:
    std::queue<int> q;
    int cap;
    pthread_mutex_t lock;
    pthread_cond_t full;
    pthread_cond_t empty;

private:
    void LockQueue()
    {
        pthread_mutex_lock(&lock);
    }

    void UnLockQueue()
    {
        pthread_mutex_unlock(&lock);
    }

    void ProductWait()
    {
        pthread_cond_wait(&full, &lock);
    }

    void ConsumeWait()
    {
        pthread_cond_wait(&empty, &lock);
    }

    void NotifyProduct()
    {
        pthread_cond_signal(&full);
    }

    void NotifyConsume()
    {
        pthread_cond_signal(&empty);
    }

    bool IsEmpty()
    {
        return (q.size() == 0 ? true : false);
    }

    bool IsFull()
    {
        return (q.size() == cap ? true : false);
    }

public:
    BlockQueue(int _cap = NUM) :cap(_cap)
    {
        pthread_mutex_init(&lock, NULL);
        pthread_cond_init(&full, NULL);
        pthread_cond_init(&empty, NULL);
    }

    void PushData(const int& data)
    {
        LockQueue();
        while (IsFull()) 
        {
            NotifyConsume();
            std::cout << "queue full, notify
                consume data, product stop." << std::endl;
                ProductWait();
        }

        q.push(data);
        // NotifyConsume();
        UnLockQueue();
    }

    void PopData(int& data)
    {
        LockQueue();
        while (IsEmpty()) 
        {
            NotifyProduct();
            std::cout << "queue empty, notify
                product data, consume stop." << std::endl;
                ConsumeWait();
        }

        data = q.front();
        q.pop();
        // NotifyProduct();
        UnLockQueue();
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&full);
        pthread_cond_destroy(&empty);
    }
};

void* consumer(void* arg)
{
    BlockQueue* bqp = (BlockQueue*)arg;
    int data;
    for (; ; ) 
    {
        bqp->PopData(data);
        std::cout << "Consume data done : " << data <<
            std::endl;
    }
}

//more faster
void* producter(void* arg)
{
    BlockQueue* bqp = (BlockQueue*)arg;
    srand((unsigned long)time(NULL));
    for (; ; ) 
    {
        int data = rand() % 1024;
        bqp->PushData(data);
        std::cout << "Prodoct data done: " << data <<
            std::endl;
        // sleep(1);
    }
}

int main()
{
    BlockQueue bq;

    pthread_t c, p;

    pthread_create(&c, NULL, consumer, (void*)&bq);
    pthread_create(&p, NULL, producter, (void*)&bq);

    pthread_join(c, NULL);
    pthread_join(p, NULL);

    return 0;
}

2.5 -> POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);

参数:
pshared : 0 表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量

int sem_destroy(sem_t* sem);

等待信号量

功能:等待信号量,会将信号量的值减 1
int sem_wait(sem_t * sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1。
int sem_post(sem_t * sem);//V()

生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量)。

2.6 -> 基于环形队列的生产消费模型

  • 环形队列采用数组模拟,用模运算来模拟环状特性。

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

  • 但是现有的信号量这个计数器,是简单的进行多进程间的同步过程。
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

#define NUM 16

class RingQueue 
{
private:
    std::vector<int> q;
    int cap;
    sem_t data_sem;
    sem_t space_sem;
    int consume_step;
    int product_step;

public:
    RingQueue(int _cap = NUM) :q(_cap), cap(_cap)
    {
        sem_init(&data_sem, 0, 0);
        sem_init(&space_sem, 0, cap);
        consume_step = 0;
        product_step = 0;
    }

    void PutData(const int& data)
    {
        sem_wait(&space_sem); // P
        q[consume_step] = data;
        consume_step++;
        consume_step %= cap;
        sem_post(&data_sem); //V
    }

    void GetData(int& data)
    {
        sem_wait(&data_sem);
        data = q[product_step];
        product_step++;
        product_step %= cap;
        sem_post(&space_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&data_sem);
        sem_destroy(&space_sem);
    }
};

void* consumer(void* arg)
{
    RingQueue* rqp = (RingQueue*)arg;
    int data;
    for (; ; ) 
    {
        rqp->GetData(data);
        std::cout << "Consume data done : " << data << std::endl;
        sleep(1);
    }
}

//more faster
void* producter(void* arg)
{
    RingQueue* rqp = (RingQueue*)arg;
    srand((unsigned long)time(NULL));
    for (; ; ) 
    {
        int data = rand() % 1024;
        rqp->PutData(data);
        std::cout << "Prodoct data done: " << data << std::endl;
        // sleep(1);
    }
}

int main()
{
    RingQueue rq;

    pthread_t c, p;

    pthread_create(&c, NULL, consumer, (void*)&rq);
    pthread_create(&p, NULL, producter, (void*)&rq);

    pthread_join(c, NULL);
    pthread_join(p, NULL);

}

3 -> 线程池

3.1 -> 线程池概念

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。

3.2 -> 线程池应用场景

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
  2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
  3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。

3.3 -> 线程池示例

threadpool.hpp

#ifndef __M_TP_H__
#define __M_TP_H__

#include <iostream>
#include <queue>
#include <pthread.h>

#define MAX_THREAD 5

typedef bool (*handler_t)(int);

class ThreadTask
{
private:
    int _data;
    handler_t _handler;
public:
    ThreadTask() :_data(-1), _handler(NULL) {}

    ThreadTask(int data, handler_t handler) 
    {
        _data = data;
        _handler = handler;
    }

    void SetTask(int data, handler_t handler) 
    {
        _data = data;
        _handler = handler;
    }

    void Run() 
    {
        _handler(_data);
    }
};

class ThreadPool
{
private:
    int _thread_max;
    int _thread_cur;
    bool _tp_quit;
    std::queue<ThreadTask*> _task_queue;
    pthread_mutex_t _lock;
    pthread_cond_t _cond;

private:
    void LockQueue() 
    {
        pthread_mutex_lock(&_lock);
    }

    void UnLockQueue() 
    {
        pthread_mutex_unlock(&_lock);
    }

    void WakeUpOne() 
    {
        pthread_cond_signal(&_cond);
    }

    void WakeUpAll() 
    {
        pthread_cond_broadcast(&_cond);
    }

    void ThreadQuit() 
    {
        _thread_cur--;
        UnLockQueue();
        pthread_exit(NULL);
    }

    void ThreadWait() 
    {
        if (_tp_quit) 
        {
            ThreadQuit();
        }
        pthread_cond_wait(&_cond, &_lock);
    }

    bool IsEmpty() 
    {
        return _task_queue.empty();
    }

    static void* thr_start(void* arg) 
    {
        ThreadPool* tp = (ThreadPool*)arg;
        while (1) 
        {
            tp->LockQueue();
            while (tp->IsEmpty()) 
            {
                tp->ThreadWait();
            }

            ThreadTask* tt;
            tp->PopTask(&tt);
            tp->UnLockQueue();
            tt->Run();

            delete tt;
        }

        return NULL;
    }

public:
    ThreadPool(int max = MAX_THREAD) :_thread_max(max),
        _thread_cur(max),
        _tp_quit(false) 
    {
        pthread_mutex_init(&_lock, NULL);
        pthread_cond_init(&_cond, NULL);
    }

    ~ThreadPool() 
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

    bool PoolInit() 
    {
        pthread_t tid;
        for (int i = 0; i < _thread_max; i++) 
        {
            int ret = pthread_create(&tid, NULL, thr_start,
                this);
            if (ret != 0) 
            {
                std::cout << "create pool thread error\n";

                return false;
            }
        }

        return true;
    }

    bool PushTask(ThreadTask* tt) 
    {
        LockQueue();
        if (_tp_quit) 
        {
            UnLockQueue();
            return false;
        }

        _task_queue.push(tt);
        WakeUpOne();
        UnLockQueue();

        return true;
    }

    bool PopTask(ThreadTask** tt) 
    {
        *tt = _task_queue.front();
        _task_queue.pop();

        return true;
    }

    bool PoolQuit() 
    {
        LockQueue();
        _tp_quit = true;
        UnLockQueue();
        while (_thread_cur > 0) 
        {
            WakeUpAll();
            usleep(1000);
        }

        return true;
    }

};
#endif

main.cpp

bool handler(int data)
{
    srand(time(NULL));
    int n = rand() % 5;
    printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n);
    sleep(n);

    return true;
}

int main()
{
    int i;
    ThreadPool pool;
    pool.PoolInit();
    for (i = 0; i < 10; i++) 
    {
        ThreadTask* tt = new ThreadTask(i, handler);
        pool.PushTask(tt);
    }

    pool.PoolQuit();

    return 0;
}

g++ -std=c++0x test.cpp -o test -pthread -lrt

4 -> 线程安全的单例模式

4.1 -> 什么是单例模式

单例模式(Singleton Pattern)是一种设计模式,旨在确保一个类只有一个实例,并提供一个全局访问点来获取这个实例。这种模式常用于管理共享资源,如配置信息、线程池、缓存等。单例模式的核心思想是控制对象的实例化过程,使得在整个应用程序中只有一个实例存在,并且所有对该实例的访问都通过同一个访问点进行。

4.2 -> 单例模式的特点

  1. 唯一性:单例模式确保一个类只有一个实例。这意味着在整个应用程序的生命周期中,无论在何处调用该类的实例,都将返回同一个对象。
  2. 全局访问点:单例模式提供了一个全局访问点,通常是一个静态方法,用于获取该类的唯一实例。这使得在程序的任何地方都能够方便地访问该实例。
  3. 延迟初始化:单例模式的实例通常在第一次被请求时才会创建,这有助于节省资源。这种延迟初始化的特性也被称为懒加载(Lazy Initialization)。
  4. 线程安全:在多线程环境下,单例模式需要确保其唯一性和全局访问的正确性。这通常通过同步机制来实现,如使用锁或其他并发控制手段。
  5. 控制实例化:单例模式的构造函数通常是私有的,这防止了外部代码通过常规的构造函数创建新的实例。
  6. 可扩展性:单例模式可以通过继承或其他方式进行扩展,以满足不同的应用需求。
  7. 资源管理:单例模式常用于管理共享资源,如数据库连接、线程池、配置信息等,确保这些资源在整个应用程序中只有一个实例,从而提高资源的使用效率和管理便利性。
  8. 性能优化:由于单例模式只创建一个实例,因此可以减少内存开销和提高性能,特别是在处理大量数据或频繁访问的对象时。
  9. 简化代码结构:单例模式可以简化代码结构,因为它提供了一个单一的、全局的访问点,使得代码的维护和理解更加容易。
  10. 适用场景:单例模式适用于那些在整个应用程序中只需要一个实例的场景,如日志记录器、配置管理器、数据库连接池等。

4.3 -> 饿汉方式实现单例模式

template <typename T>
class Singleton 
{
    static T data;
public:
    static T* GetInstance() 
    {
        return &data;
    }
};

只要通过Singleton这个包装类来使用T对象,则一个进程中只有一个T对象的实例。

4.4 -> 懒汉方式实现单例模式

template <typename T>
class Singleton 
{
    static T* inst;
public:
    static T* GetInstance() 
    {
        if (inst == NULL) 
        {
            inst = new T();
        }
        return inst;
    }
};

存在一个严重的问题,线程不安全。

第一个调用GetInstance时,如果两个线程同时调用,可能会创建出两份T对象的实例。

但是后续再次调用,就没有问题了。

4.5 -> 懒汉方式实现单例模式(线程安全版本)

// 懒汉模式, 线程安全
template <typename T>
class Singleton 
{
    volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
    static std::mutex lock;

public:
    static T* GetInstance() 
    {
        if (inst == NULL) 
        { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
            lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
            if (inst == NULL) 
            {
                inst = new T();
            }
            lock.unlock();
        }

        return inst;
    }
};

注意:

  1. 加锁解锁的位置
  2. 双重if判断,避免不必要的锁竞争
  3. volatile关键字防止过度优化

5 -> STL,智能指针和线程安全

5.1 -> STL中的容器是否是线程安全的

不是。

原因是,STL的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。

而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶)。

因此STL默认不是线程安全。如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。

5.2 -> 智能指针是否是线程安全的

对于unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题。

对于shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证shared_ptr能够高效,原子的操作引用计数。


感谢各位大佬支持!!!

互三啦!!!

标签: java 开发语言 linux

本文转载自: https://blog.csdn.net/weixin_74809706/article/details/144124044
版权归原作者 枫叶丹4 所有, 如有侵权,请联系我们删除。

“【在Linux世界中追寻伟大的One Piece】多线程(三)”的评论:

还没有评论