线程系列:
Linux–线程的认识(一)
Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)
线程的互斥:临界资源只能在同一时间被一个线程使用
生产消费模型
信号量
线程池
线程池(Thread Pool)是一种基于池化技术设计用于管理和复用线程的机制。
在多线程编程中,频繁地创建和销毁线程会消耗大量的系统资源,并且由于线程的创建和销毁需要时间,这也会降低程序的执行效率。线程池通过预先创建一定数量的线程并放入池中,需要时从池中取出线程执行任务,执行完毕后线程并不销毁而是重新放回池中等待下一次使用,从而避免了线程的频繁创建和销毁所带来的开销。
主要优点
- 降低资源消耗:通过复用已存在的线程,减少线程创建和销毁的开销。
- 提高响应速度:当任务到达时,可以立即分配线程进行处理,减少了等待时间。
- 提高线程的可管理性:线程池可以统一管理线程,包括线程的创建、调度、执行和销毁等。
关键参数
- 核心线程数:线程池维护线程的最少数量,即使这些线程处于空闲状态,线程池也不会回收它们(一般为主线程)。
- 最大线程数:线程池中允许的最大线程数。
- 阻塞队列:用于存放待执行的任务的队列。当线程池中的所有线程都忙时,新任务会被添加到这个队列中等待处理。
- 非核心线程空闲存活时间:当线程池中的线程数量超过核心线程数时,如果线程空闲时间超过这个时间,多余的线程将被终止。
- 线程工厂:用于创建新线程的工厂。
- 拒绝策略:当线程池和队列都满了时,对于新来的任务将采取的拒绝策略。
实现原理
线程池的实现原理通常包括以下几个关键部分:
- 线程池管理器:负责创建并管理线程池,包括初始化线程池、创建工作线程、销毁线程池等。
- 工作线程:线程池中的线程,负责执行具体的任务。工作线程通常会不断从任务队列中取出任务并执行,直- 到线程池被销毁或所有任务都执行完毕。
- 任务接口:每个任务必须实现的接口,用于定义任务的执行逻辑。在Java中,这通常是通过实现Runnable或Callable接口来实现的。
- 任务队列:用于存放待处理的任务。当线程池中的工作线程数量达到最大值时,新到达的任务会被放入任务队列中等待处理。任务队列的实现通常依赖于Java的BlockingQueue接口。
实现流程
- 当有新任务提交给线程池时,线程池会首先判断当前正在运行的线程数量是否小于核心线程数。如果是,则直接创建新的线程来执行任务;否则,将任务加入任务队列等待处理。
- 如果任务队列已满,且当前正在运行的线程数量小于最大线程数(maximumPoolSize),则创建新的线程来处理任务;如果线程数量已经达到最大线程数,则根据配置的拒绝策略来处理新任务(如抛出异常、直接丢弃等)。
- 当一个线程完成任务后,它会从任务队列中取下一个任务来执行;如果没有任务可供执行,并且线程池中的线程数量超过了核心线程数,且这些线程空闲时间超过了设定的存活时间,则这些线程会被销毁,直到线程池中的线程数量减少到核心线程数为止。
实例
Thread.hpp
#ifndef__THREAD_HPP__#define__THREAD_HPP__#include<iostream>#include<string>#include<pthread.h>#include<functional>#include<unistd.h>usingnamespace std;namespace ThreadMdule
{using func_t = std::function<void(string)>;classThread{public:voidExcute(){_func(_threadname);}Thread(func_t func,const std::string &name="none-name"):_func(func),_threadname(name),_stop(true){}staticvoid*threadroutine(void* args){
Thread* self=static_cast<Thread*>(args);
self->Excute();returnnullptr;}boolstart(){int n=pthread_create(&_tid,nullptr,threadroutine,this);if(!n){
_stop =false;returntrue;}else{returnfalse;}}voidDetach(){if(!_stop){pthread_detach(_tid);}}voidJoin(){if(!_stop){pthread_join(_tid,nullptr);}}
string name(){return _threadname;}voidStop(){
_stop =true;}~Thread(){}private:
pthread_t _tid;
std::string _threadname;
func_t _func;bool _stop;};}#endif
ThreadPool.hpp
#pragmaonce#include<iostream>#include<vector>#include<queue>#include<pthread.h>#include"Thread.hpp"#include"Log.hpp"#include"LockGuard.hpp"usingnamespace ThreadMdule;usingnamespace std;conststaticint gdefaultthreadnum=3;//默认线程池的线程数template<classT>classThreadPool{public:ThreadPool(int threadnum=gdefaultthreadnum):_threadnum(threadnum),_waitnum(0),_isrunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);LOG(INFO,"ThreadPool COnstruct.");}//各个线程独立的任务函数voidHandlerTask(string name){LOG(INFO,"%s is running...",name.c_str());while(true){LockQueue();//开启保护//等到有任务时才退出循环执行下列语句while(_task_queue.empty()&&_isrunning){
_waitnum++;ThreadSleep();
_waitnum--;}//当任务队列空并且线程池停止时线程退出if(_task_queue.empty()&&!_isrunning){UnlockQueue();
cout<<name<<" quit "<<endl;sleep(1);break;}//1.任务队列不为空&&线程池开启//2.任务队列不为空&&线程池关闭,直到任务队列为空//所以,只要有任务,就要处理任务
T t=_task_queue.front();//取出对应任务
_task_queue.pop();UnlockQueue();LOG(DEBUG,"%s get a task",name.c_str());//处理任务t();LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str());}}//线程池中线程的构建voidInitThreadPool(){for(int i=0;i<_threadnum;i++){
string name="thread-"+to_string(i+1);
_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);LOG(INFO,"init thread %s done",name.c_str());}
_isrunning=true;}//线程池的启动voidStart(){for(auto& thread:_threads){
thread.start();}}//线程池停止voidStop(){LockQueue();
_isrunning=false;ThreadWakeupAll();UnlockQueue();}voidWait(){for(auto& thread:_threads){
thread.Join();LOG(INFO,"%s is quit...",thread.name().c_str());}}//将任务入队列boolEnqueue(const T& t){bool ret=false;LockQueue();if(_isrunning){
_task_queue.push(t);//如果有空闲的线程,那么唤醒线程让其执行任务if(_waitnum>0){ThreadWakeup();}LOG(DEBUG,"enqueue task success");
ret=true;}UnlockQueue();return ret;}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:voidLockQueue(){pthread_mutex_lock(&_mutex);}voidUnlockQueue(){pthread_mutex_unlock(&_mutex);}voidThreadSleep(){pthread_cond_wait(&_cond,&_mutex);}voidThreadWakeup(){pthread_cond_signal(&_cond);}voidThreadWakeupAll(){pthread_cond_broadcast(&_cond);}int _threadnum;//线程数
vector<Thread> _threads;//存储线程的vector
queue<T> _task_queue;//输入的任务队列
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _cond;//条件变量int _waitnum;//空闲的线程数bool _isrunning;//表示线程池是否启动};
Task.hpp
#include<iostream>#include<string>#include<functional>usingnamespace std;classTask{public:Task(){}Task(int a,int b):_a(a),_b(b),_result(0){}voidExcute(){
_result=_a+_b;}
string ResultToString(){returnto_string(_a)+"+"+to_string(_b)+"="+to_string(_result);}
string DebugToString(){returnto_string(_a)+"+"+to_string(_b)+"= ?";}voidoperator()(){Excute();}private:int _a;int _b;int _result;};
main.cc
intmain(){srand(time(nullptr)^getpid()^pthread_self());//EnableScreen();EnableFile();
unique_ptr<ThreadPool<Task>>tp(newThreadPool<Task>(5));
tp->InitThreadPool();
tp->Start();int tasknum=10;while(tasknum){sleep(1);int a=rand()%10+1;usleep(1024);int b=rand()%20+1;
Task t(a,b);LOG(INFO,"main thread push task: %s",t.DebugToString().c_str());
tp->Enqueue(t);
tasknum--;}
tp->Stop();
tp->Wait();}
Loh.hpp
#pragmaonce#include<iostream>#include<fstream>#include<ctime>#include<cstdarg>#include<string>#include<sys/types.h>#include<unistd.h>#include<cstdio>#include"LockGuard.hpp"usingnamespace std;bool gIsSave=false;//默认输出到屏幕const string logname="log.txt";//1.日志是有等级的enumLevel{
DEBUG=0,
INFO,
WARNING,
ERROR,
FATAL
};voidSaveFile(const string& filename,const string& messages){
ofstream out(filename,ios::app);if(!out.is_open()){return;}
out<<messages;
out.close();}//等级转化为字符串
string LevelToString(int level){switch(level){case DEBUG:return"Debug";case INFO:return"Info";case WARNING:return"Warning";case ERROR:return"Error";case FATAL:return"Fatal";default:return"Unkonwn";}}//获取当前时间
string GetTimeString(){
time_t curr_time=time(nullptr);//时间戳structtm* format_time=localtime(&curr_time);//转化为时间结构if(format_time==nullptr)return"None";char time_buffer[1024];snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
format_time->tm_year +1900,
format_time->tm_mon +1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);return time_buffer;}
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//获取日志信息voidLogMessage(string filename,int line,bool issave,int level,char* format,...){
string levelstr=LevelToString(level);
string timestr=GetTimeString();
pid_t selfid=getpid();char buffer[1024];
va_list arg;va_start(arg,format);vsnprintf(buffer,sizeof(buffer),format,arg);va_end(arg);
string message="["+ timestr +"]"+"["+ levelstr +"]"+"["+ std::to_string(selfid)+"]"+"["+ filename +"]"+"["+ std::to_string(line)+"] "+ buffer +"\n";
LockGuard lockguard(&lock);if(!issave){
cout<<message;}else{SaveFile(logname,message);}}#defineLOG(level,format,...)\do\{\LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);\}while(0)#defineEnableFile()\do\{\gIsSave=true; \ }while(0)#defineEnableScreen()\do\{\gIsSave=false; \ }while(0)
线程池解释(代码)
日志解释(代码)
日志是系统或程序记录所有发生事件的文件:
版权归原作者 诡异森林。 所有, 如有侵权,请联系我们删除。