作者:半身风雪
上篇:阻塞队列原理解析
线程池原理解析
一、为什么要用线程池
Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3 个好处。
**第一**:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成 的消耗。
**第二**:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时, 不会有 T1,T3 的开销了。
**第三**:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会 消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和 监控。
二、ThreadPoolExecutor 的类关系
Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的 执行分离开来。
ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()、submit() 的扩展,可以说是真正的线程池接口; AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法; ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期 执行"功能 ExecutorService;
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令, 或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。
三、线程池的创建各个参数含义
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
3.1、corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;
如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启 动所有核心线程。
3.2、maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize
3.3、keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于 corePoolSize 时才有用
3.4、TimeUnit
keepAliveTime 的时间单位
3.5、workQueue
workQueue 必须是 BlockingQueue 阻塞队列。当线程池中的线程数超过它的 corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能。
一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
- 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待, 因此线程池中的线程数不会超过 corePoolSize。
- 由于 1,使用无界队列时 maximumPoolSize 将是一个无效参数。
- 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数。
- 更重要的,使用无界 queue 可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范 围。
3.6、threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有 的线程为守护线程。
Executors 静态工厂里默认的 threadFactory,线程的命名规则是“pool-数字 -thread-数字”。
3.7、RejectedExecutionHandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提 交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务; 当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和 策略,如记录日志或持久化存储不能处理的任务。
四、线程池的工作机制
- 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意, 执行这一步骤需要获取全局锁)。
- 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
- 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处 理任务。
- 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
五、提交任务
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程 池执行成功。
submit()方法用于提交需要返回值的任务。线程池会返回一个 future 类型的 对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get (long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这 时候有可能任务没有执行完。
六、关闭线程池
可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们 的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别, shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行 或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后**中断所有没有正在执行任务的线程**。
只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法 会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任 务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完, 则可以调用 shutdownNow 方法。
七、合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来 分析。
任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
任务的优先级:高、中和低。
任务的执行时间:长、中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。 CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池。 由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2*Ncpu。 混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐 量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。 可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可 以让优先级高的任务先执行。 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先 级队列,让执行时间短的任务先执行。 建议使用有界队列。 有界队列能增加系统的稳定性和预警能力,可以根据需 要设大一点儿,比如几千。 如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会 撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
八、AbstractQueuedSynchronizer
8.1、学习 AQS 的必要性
队列同步器 AbstractQueuedSynchronizer(以下简称同步器或 AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量表示同步状 态,通过内置的 FIFO 队列来完成资源获取线程的排队工作。并发包的大师(Doug Lea)期望它能够成为实现大部分同步需求的基础。
8.2、AQS 使用方式和其中的设计模式
AQS 的主要使用方式是继承,子类通过继承 AQS 并实现它的抽象方法来管 理同步状态,在 AQS 里由一个 int 型的 state 来代表这个状态,在抽象方法的实 现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的 3 个方法 (getState()、setState(int newState)和 compareAndSetState(int expect,int update)) 来进行操作,因为它们能够保证状态的改变是安全的。
private volatile int state;
在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS 自身没有 实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义 同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地 获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、 ReentrantReadWriteLock 和 CountDownLatch 等)。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
- 锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线 程并行访问),隐藏了实现细节;
- 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、 线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者 所需关注的领域。
- 实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步 组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者 重写的方法。
8.2.1、模板方法模式
同步器的设计基于模板方法模式。模板方法模式的意图是,定义一个操作中 的算法的骨架,而将一些步骤的实现延迟到子类中。模板方法使得子类可以不改 变一个算法的结构即可重定义该算法的某些特定步骤。我们最常见的就是 Spring 框架里的各种 Template。
举个栗子:
例子我们开了个蛋糕店,蛋糕店不能只卖一种蛋糕呀,于是我们决定先卖奶油蛋糕,芝士蛋糕和慕斯蛋糕。三种蛋糕在制作方式上一样,都包括造型,烘焙和涂 抹蛋糕上的东西。所以可以定义一个抽象蛋糕模型
publicabstractclass AbstractCake {protectedabstract void shape();protectedabstract void apply();protectedabstract void brake();/**
* 模块方法
*/publicfinal void run(){this.shape();this.apply();this.brake();}}
然后可以批量生产三种蛋糕
publicclass CheeseCake extends AbstractCake{@Overrideprotected void shape(){
System.out.println("CheeseCake.shape,芝士蛋糕造型");}@Overrideprotected void apply(){
System.out.println("CheeseCake.apply,芝士蛋糕涂满");}@Overrideprotected void brake(){
System.out.println("CheeseCake.brake,芝士蛋糕烘焙");}}
publicclass CreamCake extends AbstractCake{}
publicclass MouseCake extends AbstractCake{}
publicclass TestMain {public static void main(String[] args){
AbstractCake cake = new CheeseCake();
cake.run();}}
这样一来,不但可以批量生产三种蛋糕,而且如果日后有扩展,只需要继承抽象蛋糕方法就可以了,十分方便,我们天天生意做得越来越赚钱。突然有一天, 我们发现市面有一种最简单的小蛋糕销量很好,这种蛋糕就是简单烘烤成型就可以卖,并不需要涂抹什么食材,由于制作简单销售量大,这个品种也很赚钱,于是我们也想要生产这种蛋糕。但是我们发现了一个问题,抽象蛋糕是定义了抽象的涂抹方法的,也就是说扩展的这种蛋糕是必须要实现涂抹方法,这就很鸡儿蛋疼了。怎么办?我们可以将原来的模板修改为带钩子的模板。
/**
* 模块方法
*/publicfinal void run(){this.shape();if(this.shouldApply()){this.apply();}this.brake();}protected boolean shouldApply(){returntrue;}
做小蛋糕的时候通过 flag 来控制是否涂抹,其余已有的蛋糕制作不需要任何修改可以照常进行。
publicclass SmallCake extends AbstractCake{private boolean flag =false;public void setFlag(boolean flag){this.flag = flag;}@Overrideprotected void shape(){
System.out.println("SmallCake.shape,小蛋糕造型");}@Overrideprotected boolean shouldApply(){returnthis.flag;}@Overrideprotected void apply(){}@Overrideprotected void brake(){}}
8.3、AQS 中的方法
8.3.1、模板方法
实现自定义同步组件时,将会调用同步器提供的模板方法,
这些模板方法同步器提供的模板方法基本上分为 3 类:独占式获取与释放同 步状态、共享式获取与释放、同步状态和查询同步队列中的等待线程情况。
8.3.2、可重写的方法
8.3.3、访问或修改同步状态的方法
重写同步器指定的方法时,需要使用同步器提供的如下 3 个方法来访问或修 改同步状态。
getState()
:获取当前同步状态。setState(int newState)
:设置当前同步状态。compareAndSetState(int expect,int update)
:使用 CAS 设置当前状态,该方 法能够保证状态设置的原子性。
8.3.4、CLH 队列锁
CLH 队列锁即 Craig, Landin, and Hagersten (CLH) locks。
CLH 队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程 仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束 自旋。
当一个线程需要获取锁时:
1、创建一个的QNode,将其中的locked设置为true表示需要获取锁,myPred 表示对其前驱结点的引用
2、线程 A 对 tail 域调用 getAndSet 方法,使自己成为队列的尾部,同时获取 一个指向其前驱结点的引用 myPred 线程
B 需要获得锁,同样的流程再来一遍。
3、 线程就在前驱结点的 locked 字段上旋转,直到前驱结点释放锁(前驱节点 的锁值 locked == false) 。
4、 当一个线程需要释放锁时,将当前结点的 locked 域设置为 false,同时回收 前驱结点
如上图所示,前驱结点释放锁,线程 A 的 myPred 所指向的前驱结点的 locked 字段变为 false,线程 A 就可以获取到锁。
CLH 队列锁的优点是空间复杂度低(如果有 n 个线程,L 个锁,每个线程每 次只获取一个锁,那么需要的存储空间是 O(L+n),n 个线程有 n 个 myNode, L 个锁有 L 个 tail)。CLH 队列锁常用在 SMP 体系结构下。
Java 中的 AQS 是 CLH 队列锁的一种变体实现。
8.3.5、ReentrantLock 的实现
8.3.5.1、锁的可重入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞, 该特性的实现需要解决以下两个问题。
1、线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程, 如果是,则再次成功获取。
2、锁的最终释放。线程重复 n 次获取了锁,随后在第 n 次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于 0 时表示锁已 经成功释放。
nonfairTryAcquire 方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回 true,表示获取同步状态成功。同步状态表 示锁被一个线程重复获取的次数。
如果该锁被获取了 n 次,那么前(n-1)次 tryRelease(int releases)方法必须返回 false,而只有同步状态完全释放了,才能返回 true。可以看到,该方法将同步状 态是否为 0 作为最终释放的条件,当同步状态为 0 时,将占有线程设置为 null, 并返回 true,表示释放成功。
8.3.5.2、公平和非公平锁
ReentrantLock 的构造函数中,默认的无参构造函数将会把 Sync 对象创建为 NonfairSync 对象,这是一个“非公平锁”;而另一个构造函数 ReentrantLock(boolean fair)传入参数为 true 时将会把 Sync 对象创建为“公平锁” FairSync。
nonfairTryAcquire(int acquires)方法,对于非公平锁,只要 CAS 设置同步状态 成功,则表示当前线程获取了锁,而公平锁则不同。tryAcquire 方法,该方法与 nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了 hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的 判断,如果该方法返回 true,则表示有线程比当前线程更早地请求获取锁,因此 需要等待前驱线程获取并释放锁之后才能继续获取锁。
版权归原作者 半身风雪 所有, 如有侵权,请联系我们删除。