前言
ThreadPoolExecutor是java的JUC提供的线程池化技术,是一种享元模式(主要用于减少创建对象的数量,以减少内存占用和提高性能),它的作用是管理线程。 优点:帮助我们去创建、管理和销毁线程,减少内存占用和提高性能。 缺点:好像没有啥大的缺点,使用不好是我们自己的问题,不是线程池的。 学习线程池可以解答我们的很多疑惑,如:我们传入的任务,线程池如何调用、线程池的线程如何复用、线程池与队列的联系、线程池的核心线程是什么和线程池的非核心线程是什么。
一、ThreadPoolExecutor基本使用
1.1 参数介绍
/** * 当前类:ThreadPoolExecutor */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
上面的是线程池参数最全的一个构造方法,接下来讲解每个参数的作用 int corePoolSize:核心线程数 int maximumPoolSize:最大的线程数=核心+临时 long keepAliveTime:线程超时等待时间 TimeUnit unit:超时等待单位 BlockingQueue<Runnable> workQueue:阻塞队列,常用的有三个: ArrayBlockingQueue:数组队列,必须指定初始容量 LinkedBlockingDeque:链表队列,不用指定 SynchronousQueue:没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者 ThreadFactory threadFactory:线程工厂,使用默认就行,不想用,可以实现ThreadFactory接口自定义 RejectedExecutionHandler handler:决绝策略,也可以自定义,实现RejectedExecutionHandler接口,JUC提供的有四个: AbortPolicy:执行不了抛RejectedExecutionException异常 CallerRunsPolicy:交给main线程执行,就是直接调用任务的run方法 DiscardOldestPolicy:把队列最先进来的任务扔掉 DiscardPolicy:啥也不干,就是把最后进来的任务扔掉
1.2 创建线程
创建线程的方式很多,JUC提供了工具类可以创建不同类型的线程池,但是一般公司不建议使用,还有就是调用线程池构造方法创建。
/**
* 工具类创建
* 比较复杂的业务场景,不建议用工具类创建线程池
*/
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExcutorTest {
public static void main(String[] args) {
// 最大创建int.MaxValue这么多个线程,来一个任务new一个线程,使用SynchronousQueue队列
ExecutorService es1 = Executors.newCachedThreadPool();
// 创建5个线程的线程池,可以指定线程池线程的数量
ExecutorService es2 = Executors.newFixedThreadPool(5);
// 创建1个线程的线程池,其他任务都放队列LinkedBlockingQueue
ExecutorService es3 = Executors.newSingleThreadExecutor();
// 有定时功能的线程池
ExecutorService ess1 = Executors.newScheduledThreadPool(100);
}
}
/**
* 参数最全的构造方法
*/
import java.util.concurrent.*;
public class ThreadPoolExcutorTest{
public static void main(String[] args) {
/**
* 核心线程:2
* 最大线程:3
* 超时时间:1
* 超时时间单位:秒
* 队列:ArrayBlockingQueue:容量2
* 默认线程工厂
* 拒绝策略:AbortPolicy
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.SECONDS
, new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
// allowCoreThreadTimeOut设置为true,线程池执行完会销毁所有线程
// threadPoolExecutor.allowCoreThreadTimeOut(true);
for (int i = 0; i < 5; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}
}
二、线程池工作原理
2.1 前提条件
** **学习线程池,必须了解阻塞队列的几个方法。下发图片来自个人学习笔记。。。
2.2 文字描述
**下面的场景,以所有的任务都是死循环为前提,就是一直执行,不会结束,线程池参数如下:** /** * 核心线程:2 * 最大线程:3 * 超时时间:1 * 超时等待时间单位:秒 * 队列:ArrayBlockingQueue:容量2 * 默认线程工厂 * 拒绝策略:AbortPolicy */
1.每次调用**threadPoolExecutor.execute**()方法,传入一个任务**Runnable**,这个**Runnable**是一个**函数式接口**,我们可以给**execute**方法传入一个匿名内部类、实现了**Runnable**接口的实现类或者**lambda**表达式。 2.当第**1**个任务进来的时候,判断**工作线程数**是否小于**核心线程**数,小于的话就创建一个**核心线程**,执行任务。 3.每一个**核心线程**或者**临时线程**都会被包装成为一个**Worker**对象,传入的任务也会被包装在**Worker**对象里面,最后调用**线程**的**start**方法执行任务,具体的源码解释。 4.当第**2**个任务进来的时候,判断**工作线程数**是否小于**核心线程数**,小于的话就创建一个**核心线程**,执行任务。 5.当第**3**个任务进来的时候,判断**工作线程数**是否小于**核心线程数**,发现不小于了,就加入**阻塞队列**了。 6.当第**4**个任务进来的时候,判断**工作线程数**是否小于**核心线程数**,发现不小于了,就加入**阻塞队列**了。 7.当第**5个**任务进来的时候,判断**工作线程数**是否小于**核心线程数**,发现不小于了,就加入**阻塞队列**,但是发现**阻塞队列**满了,就创建一个**临时线程**,执行任务。 8.当第**6**个任务进来的时候,判断**工作线程数**是否小于**核心线程数**,发现不小于了,就加入**阻塞队列**,但是发现**阻塞队列**满了,就创建一个**临时线程**,但是发现**工作线程数**大于等于**最大线程数**了,所以按照**构造方法**传入的**拒绝策略**,**拒绝**了,抛出**RejectedExecutionException**异常。
2.3 流程图展示
当其他线程死循环的时候,第六个进来的就抛异常了。
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ConcurrentHashMapTest$$Lambda$1/1096979270@214c265e rejected from java.util.concurrent.ThreadPoolExecutor@448139f0[Running, **pool size = 3**, **active threads = 3**, **queued tasks = 2**, completed tasks = 0]
三、源码解析
3.1 execute源码中的位运算
线程池常用的位运算常量介绍,32位int类型,二进制的高3位表示线程状态,低29位表示线程数。后续的线程池状态和线程数量的统计就是对这些值进行位运算,这个就不讲解。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits @Native public static final int SIZE = 32; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
3.2 线程池状态
这个是从其他优秀文章中复制的,因为这个基本知识,也不是讲解的内容。
状态解释RUNNING能接受新提交的任务,并且也能处理阻塞队列中的任务SHUTDOW关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务,在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态STOP不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程,在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态TIDYING如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态TERMINATED在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
3.3 execute源码解析
execute方法解析
/**
* 当前类:ThreadPoolExecutor
* private volatile int corePoolSize; 核心线程数
* private final BlockingQueue<Runnable> workQueue; 阻塞队列
* workerCountOf方法用来计算工作线程数量
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 得到一个int值,具体的看3.1标题的解释
int c = ctl.get();
// 判断工作线程是否小于核心线程
if (workerCountOf(c) < corePoolSize) {
// 小于就创建核心线程处理任务,这个方法后面解释,true代表创建核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果工作线程数大于等于核心线程数
// 判断是不是运行状态,运行状态就加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 加入队列成功,再次校验状态,如果不是运行时,把刚刚加入阻塞队列的任务移除
if (! isRunning(recheck) && remove(command))
// 按照不同拒绝策略处理
reject(command);
// 如果线程工作线程为0了,就生成一个临时线程处理队列中的任务
else if (workerCountOf(recheck) == 0)
// 生成一个临时线程,false代表创建临时线程
addWorker(null, false);
}
// 不是运行状态或者加入阻塞队列失败,就会创建临时线程处理了,失败就拒绝
// 为什么不是运行状态也会调用addWorker方法哪,因为addWorker方法里面是会校验状态的
else if (!addWorker(command, false))
// 按照不同拒绝策略处理
reject(command);
}
addWorker方法解析
// 核心或者临时线程被包装为Worker对象,线程和任务是Worker对象的一个属性
// 不重要的代码没有赋值出来
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails.
* 真正的线程
*/
final Thread thread;
/** Initial task to run. Possibly null. 传入的任务*/
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// 把传入的任务赋值给类变量
this.firstTask = firstTask;
//###这里很重要,new的线程传入的this对象,因为Worker也实现了Runnable
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
/**
* 当前类:ThreadPoolExecutor
* private volatile int corePoolSize; 核心线程数
* private volatile int maximumPoolSize; 最大线程数
* private final HashSet<Worker> workers = new HashSet<Worker>(); worker线程集合
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 检查状态,返回false,调用者就是return或者调用拒绝处理方法
// 状态大于等于SHUTDOWN ,而且不等于SHUTDOWN,传入的任务不为空,队列为空,就返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 自旋校验
for (;;) {
// 工作线程数量
int wc = workerCountOf(c);
// 工作线程大于2^29-1,大于核心线程或者临时线程,返回flase,
//调用者就是return或者调用拒绝处理方法
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 这个就是对线程数量+1的,是原子操作,失败就跳到上面的retry
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果此时和上面int rs = runStateOf(c);处,线程池状态不一样,也会跳到上面的retry
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// new一个线程,其实就是new一个Worker对象,详细看上面Worker对象解析
w = new Worker(firstTask);
// 拿到Worker对象的线程属性
final Thread t = w.thread;
if (t != null) {
// 排它锁
final ReentrantLock mainLock = this.mainLock;
// 抢锁,高并发要保证线程安全
mainLock.lock();
try {
// 线程状态
int rs = runStateOf(ctl.get());
// 运行状态或者
// 是SHUTDOWN但是任务为空(因为这种情况创建线程去处理队列中的任务)
// 这个就是execute方法,else if (workerCountOf(recheck) == 0)
// addWorker(null, false);这种情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把Worker对象放入workers集合
workers.add(w);
int s = workers.size();
// 记录workers大小
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程,接下来在介绍
workerStarted = true;
}
}
} finally {
// 没有放入成功,或者放入成功了,发生其他问题,就要减去增加的线程数量
if (! workerStarted)
addWorkerFailed(w); // 接下来在介绍
}
return workerStarted;
}
addWorkerFailed解析
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 如果放入成功就取出
if (w != null)
workers.remove(w);
// 原子操作,ctl低29位-1
decrementWorkerCount();
// 尝试终止线程,这个不讲解
tryTerminate();
} finally {
// 释放锁
mainLock.unlock();
}
}
如果放入成功了,而且线程启动了,线程启动时会调用传入**Runnable**的run方法,此时传入的**Runnable**是**Worker**对象。 那么线程启动后就会调用**Worker**对象的**run**方法。
/**
* 当前类:ThreadPoolExecutor.Worker
* 进入runWorker方法
* 不重要的代码没有赋值出来
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails.
* 真正的线程
*/
final Thread thread;
/** Initial task to run. Possibly null. 传入的任务*/
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// 把传入的任务赋值给类变量
this.firstTask = firstTask;
//###这里很重要,new的线程传入的this对象,因为Worker也实现了Runnable
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
/**
* 当前类:ThreadPoolExecutor
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 得到任务
Runnable task = w.firstTask;
// 帮助GC
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 传入的任务不为空就执行任务,为空就去队列中取任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
进入****getTask****方法查看如何取任务
// 当前类:ThreadPoolExecutor
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 线程状态
int rs = runStateOf(c);
// 状态大于等于SHUTDOWN,而且队列状态大于等于STOP或者阻塞队列为空,就返回
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 线程数-1
decrementWorkerCount();
return null;
}
// 工作线程的数量
int wc = workerCountOf(c);
// allowCoreThreadTimeOut为是否允许核心线程过期,
// 如果为true,所有线程等待设置的超时时间,队列没有任务,就销毁
// 默认是false,当工作线程大于核心线程时,大于的线程会被销毁,
// 销毁时不会判断它是不是核心线程,详细看下面解析
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 一些异常处理,可以不用关注
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// timed为true按照超时时间等待取任务,取不到就线程结束了,相当于销毁了
// 为false阻塞取任务,队列具体的方法看2.1标题,取到就继续执行runWorker
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
看完上面的解析,就会知道为什么线程池的线程会一直存活,不断的消费任务了。
总结
线程的基本流程就讲完了,其中的一些细节,加锁、释放锁、停止线程和中断啊,这些都没有讲,这篇文章只讲线程池的核心原理。核心原理是比较简单的,复杂的是线程状态的的改变对于队列中的任务进行如何的处理,Worker对于AbstractQueuedSynchronizer的使用和它对线程中断进行的一系列处理等等。
版权归原作者 鱼跃龙门^我跃架构 所有, 如有侵权,请联系我们删除。