0


【并发编程】JUC并发编程(彻底搞懂JUC)

文章目录

一、背景

如果你对多线程没什么了解,那么从入门模块开始。 如果你已经入门了多线程(知道基础的线程创建、死锁、synchronized、lock等),那么从juc模块开始。

新手学技术、老手学体系,高手学格局。

二、什么是JUC?

JUC实际上就是我们对于jdk中java.util .concurrent 工具包的简称

,其结构如下:
在这里插入图片描述

这个包下都是Java处理线程相关的类,自jdk1.5后出现。目的就是为了更好的支持高并发任务。让开发者进行多线程编程时减少竞争条件和死锁的问题!
在这里插入图片描述
JUC主要是指JDK8中java.util.concurrent里提供的一系列线程并发工具,但是线程并发的问题远不止几个工具这么简单。要学习工具使用,更要能深入理解工具的原理以及处理线程并发问题的思路。

三、JUC框架结构

UC是包的简称,JUC可能也是Java核心里最难的一块儿,JUC指的是Java的并发工具包,里边提供了各种各样的控制同步和线程通信的工具类。学习JUC之前,最重要的是了解JUC的结构是什么样的。就如同Java的集合框架的结构一样,JUC也有自己框架结构,只是往往被大家忽略,笔者就简单的梳理了下JUC的框架结构,JUC的框架结构不同于集合,它并非是实现继承框架结构。
在这里插入图片描述

四、JUC框架概述

JUC框架的底层在Java代码里是Unsafe,而Unsafe是底层Jvm的实现。有了Unsafe的支持实现了一些了支持原子型操作的Atomic类,然后上层才有了我们熟知的AQS,和LockSupport等类。有了这些之后才是各种读写锁各种线程通信以及同步工具的实现类。

五、JUC中常用类汇总

  • JUC的atomic包下运用了CAS的AtomicBoolean、AtomicInteger、AtomicReference等原子变量类
  • JUC的locks包下的AbstractQueuedSynchronizer(AQS)以及使用AQS的ReentantLock(显式锁)、ReentrantReadWriteLock
  • 附:运用了AQS的类还有: Semaphore、CountDownLatch、ReentantLock(显式锁)、ReentrantReadWriteLock
  • JUC下的一些同步工具类: CountDownLatch(闭锁)、Semaphore(信号量)、CyclicBarrier(栅栏)、FutureTask
  • JUC下的一些并发容器类: ConcurrentHashMap、CopyOnWriteArrayList
  • 读写分离: CopyOnWriteArrayList 写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。 写操作需要加锁,防止并发写入时导致写入数据丢失。 写操作结束之后需要把原始数组指向新的复制数组。
  • JUC下的一些Executor框架的相关类: 线程池的工厂类->Executors 线程池的实现类->ThreadPoolExecutor/ForkJoinPool
  • JUC下的一些阻塞队列实现类: ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue

在这里插入图片描述

  1. tools(工具类):又叫信号量三组工具类,包含有
  • CountDownLatch(闭锁) 是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
  • CyclicBarrier(栅栏) 之所以叫barrier,是因为是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 ,并且在释放等待线程后可以重用。
  • Semaphore(信号量) 是一个计数信号量,它的本质是一个“共享锁“。信号量维护了一个信号量许可集。线程可以通过调用 acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
  1. executor(执行者):是Java里面线程池的顶级接口,但它只是一个执行线程的工具,真正的线程池接口是ExecutorService,里面包含的类有:
  • ScheduledExecutorService 解决那些需要任务重复执行的问题
  • ScheduledThreadPoolExecutor 周期性任务调度的类实现
  1. atomic(原子性包):是JDK提供的一组原子操作类,包含有AtomicBoolean、AtomicInteger、AtomicIntegerArray等原子变量类,他们的实现原理大多是持有它们各自的对应的类型变量value,而且被volatile关键字修饰了。这样来保证每次一个线程要使用它都会拿到最新的值。
  2. locks(锁包):是JDK提供的锁机制,相比synchronized关键字来进行同步锁,功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁包含的实现类有:
  • ReentrantLock 它是独占锁,是指只能被独自占领,即同一个时间点只能被一个线程锁获取到的锁。
  • ReentrantReadWriteLock 它包括子类ReadLock和WriteLock。ReadLock是共享锁,而WriteLock是独占锁。
  • LockSupport 它具备阻塞线程和解除阻塞线程的功能,并且不会引发死锁。
  1. collections(集合类):主要是提供线程安全的集合, 比如:
  • ArrayList对应的高并发类是CopyOnWriteArrayList,
  • HashSet对应的高并发类是 CopyOnWriteArraySet,
  • HashMap对应的高并发类是ConcurrentHashMap等等

六、相关名词

进程和线程

进程

概述
进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,是

系统进行资源分配和调度的基本单位,是操作系统结构的基础

。 在当代面向线程设计的计算机结构中,

进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体

定义

狭义定义:

进程是正在运行的程序的实例(an instance of a computer program that is being executed)

广义定义:

进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元

线程

线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之 中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流, 一个进程中可以并发多个线程,每条线程并行执行不同的任务。

  1. 线程是独立调度和分派的基本单位。
  2. 同一进程中的多条线程将共享该进程中的全部系统资源。
  3. 一个进程可以有很多线程,每条线程并行执行不同的任务。可并发执行。

当然,我们Java都知道的线程的同步是Java多线程编程的难点,因为要给哪些地方是共享资源(竞争资源),什么时候要考虑同步等,都是编程中的难点。

创建线程的几种常见的方式

  1. 通过实现Runnable接口来创建Thread线程
  2. 通过继承Thread类来创建一个线程
  3. 通过实现Callable接口来创建Thread线程

备注:详细看我写的上一篇文章:Java实现多线程的4种方式

并发和并行

在了解并发和并行之前,让我们先来看一看串行是什么样的吧。

  1. 串行模式:
  • 串行模式:即表示所有任务都是按先后顺序进行。串行是一次只能取的一个任务,并执行这个任务。
  • 举个生活中的小例子:就是在火车站买票,今天只开放这一个窗口卖票,那么我们只有等到前面的人都买了,才能轮到我们去买。即按先后顺序买到票。
  1. 并行模式:
  • 概述:一组程序按独立异步的速度执行,无论从微观还是宏观,程序都是一起执行的。对比地,并发是指:在同一个时间段内,两个或多个程序执行,有时间上的重叠(宏观上是同时,微观上仍是顺序执行)。
  • 并行模式:并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。 我们还是用上面那个例子:还是在买票,以前是只有一个窗口卖票,但是近几年发展起来了,现在有五个窗口卖票啦,大大缩短了人们买票的时间。
  • 并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。不过并行的效率,一方面受多进程/线程编码的好坏的影响,另一方面也受硬件角度上的CPU的影响。
  1. 并发:
  • 并发:并发指的是多个程序可以同时运行的一种现象,并发的重点在于它是一种现象,并发描述的是多进程同时运行的现象。但真正意义上,一个单核心CPU任一时刻都只能运行一个线程。所以此处的"同时运行"表示的不是真的同一时刻有多个线程运行的现象(这是并行的概念),而是提供了一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占 CPU 的,而是根据CPU的调度,执行一会儿停一会儿。
  1. 小小的总结一下:
  • 并发:即同一时刻多个线程在访问同一个资源,多个线程对一个点 例子:秒杀活动、12306抢回家的票啦、抢演唱会的票…
  • 并行:多个任务一起执行,之后再汇总 例子:电饭煲煮饭、用锅炒菜,两个事情一起进行,(最后我们一起干饭啦干饭啦😁)

用户线程和守护线程

  1. 用户线程:指不需要内核支持而在用户程序中实现的线程,其不依赖于操作系统核心,应用进程利用线程库提供创建、同步、调度和管理线程的函数来控制用户线程。
  2. 守护线程:是指在程序运行的时候在后台提供一种通用服务的线程,用来服务于用户线程;不需要上层逻辑介入,当然我们也可以手动创建一个守护线程。(用白话来说:就是守护着用户线程,当用户线程死亡,守护线程也会随之死亡)

比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。
用一个简单代码来模拟一下:
未设置为守护线程时:主线程执行完成了,但是我们自己创建的线程仍然未结束。
在这里插入图片描述

设置为守护线程后:明显可以看到,当主线程执行完成后,我们设置为守护线程的那个线程也被强制结束了。
在这里插入图片描述

setDaemon就是设置为是否为守护线程。

七、synchronized 作用范围:

synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:

  1. 修饰某一处代码块,被修饰的代码块称为同步语句块。作用范围就是{}之间。作用的对象是调用这个代码块的对象。
synchronized(this){System.out.println("同步代码块 ");}
  1. 修饰在方法上,被修饰的方法就称为同步方法。作用范围则是整个方法。作用的对象则是调用这个方法的对象。
publicsynchronizedvoidsale(){}

注:synchronized 关键字不能被继承,如果父类中某方法使用了synchronized 关键字,字类又正巧覆盖了,此时,字类默认情况下是不同步的,必须显示的在子类的方法上加上才可。当然,如果在字类中调用父类中的同步方法,这样虽然字类并没有同步方法,但子类调用父类的同步方法,子类方法也相当同步了。

  1. 修饰某个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象。
publicstaticsynchronizedvoidtest(){}
  1. 修饰某个类,其作用的范围是 synchronized 后面括号括起来的部分,作用的对象是这个类的所有对象。
classTicket{publicvoidsale(){synchronized(Ticket.class){}}}

八、Lock锁(重点)

什么是 Lock

Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。

锁类型

可重入锁:在执行对象中所有同步方法不用再次获得锁
可中断锁:在等待获取锁过程中可中断
公平锁: 按等待获取锁的线程的等待时间进行获取,等待时间长的具有优先获取锁权利
读写锁:对资源读取和写入的时候拆分为2部分处理,读的时候可以多线程一起读,写的时候必须同步地写

Lock接口

publicinterfaceLock{voidlock();//获得锁。/**
    除非当前线程被中断,否则获取锁。
    
    如果可用,则获取锁并立即返回。
    如果锁不可用,则当前线程将出于线程调度目的而被禁用并处于休眠状态,直到发生以下两种情况之一:
        锁被当前线程获取; 
        要么其他一些线程中断当前线程,支持中断获取锁。
    如果当前线程:
        在进入此方法时设置其中断状态; 
        要么获取锁时中断,支持中断获取锁,
    */voidlockInterruptibly()throwsInterruptedException;/**
    仅在调用时空闲时才获取锁。
    如果可用,则获取锁并立即返回值为true 。 如果锁不可用,则此方法将立即返回false值。
    */booleantryLock();//比上面多一个等待时间 booleantryLock(long time,TimeUnit unit)throwsInterruptedException;// 解锁voidunlock();//返回绑定到此Lock实例的新Condition实例。ConditionnewCondition();  。
}

下面讲几个常用方法的使用。

lock()、unlock()

lock()是最常用的方法之一,作用就是获取锁,如果锁已经被其他线程获得,则当前线程将被禁用以进行线程调度,并处于休眠状态,等待,直到获取锁。

如果使用到了lock的话,那么必须去主动释放锁,就算发生了异常,也需要我们主动释放锁,因为lock并不会像synchronized一样被自动释放。所以使用lock的话,必须是在try{}catch(){}中进行,并将释放锁的代码放在finally{}中,以确保锁一定会被释放,以防止死锁现象的发生。

unlock()的作用就是主动释放锁。
lock接口的类型有好几个实现类,这里是随便找了个。

Lock lock =newReentrantLock();try{
    lock.lock();System.out.println("上锁了");}catch(Exception e){
    e.printStackTrace();}finally{
    lock.unlock();System.out.println("解锁了");}

newCondition

关键字 synchronized 与 wait()/notify()这两个方法一起使用可以实现等待/通知模式, Lock 锁的 newContition()方法返回 Condition 对象,Condition 类 也可以实现等待/通知模式。 用 notify()通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以 进行选择性通知, Condition 比较常用的两个方法:

  • await():会使当前线程等待,同时会释放锁,当等到其他线程调用signal()方法时,此时这个沉睡线程会重新获得锁并继续执行代码(在哪里沉睡就在哪里唤醒)。
  • signal():用于唤醒一个等待的线程。

注意:在调用 Condition 的 await()/signal()方法前,也需要线程持有相关 的 Lock 锁,调用 await()后线程会释放这个锁,在调用singal()方法后会从当前 Condition对象的等待队列中,唤醒一个线程,后被唤醒的线程开始尝试去获得锁, 一旦成功获得锁就继续往下执行。

在这个地方我们举个例子来用代码写一下:

这里就不举例synchronized 实现了,道理都差不多。

例子:我们有两个线程,实现对一个初始值是0的number变量,一个线程当number = =0时 对number值+1,另外一个线程当number = = 1时对number-1。

classShare{privateInteger number =0;privateReentrantLock lock =newReentrantLock();privateCondition newCondition = lock.newCondition();// +1 的方法publicvoidincr(){try{
            lock.lock();// 加锁while(number !=0){
                newCondition.await();//沉睡}
            number++;System.out.println(Thread.currentThread().getName()+"::"+ number);
            newCondition.signal();//唤醒另一个沉睡的线程 }catch(Exception e){
            e.printStackTrace();}finally{
            lock.unlock();}}// -1 的方法publicvoiddecr(){try{
            lock.lock();while(number !=1){
                newCondition.await();}
            number--;System.out.println(Thread.currentThread().getName()+"::"+ number);
            newCondition.signal();}catch(Exception e){
            e.printStackTrace();}finally{
            lock.unlock();}}}publicclassLockDemo2{publicstaticvoidmain(String[] args){Share share =newShare();newThread(()->{for(int i=0;i<=10;i++){
                share.incr();}},"AA").start();newThread(()->{for(int i=0;i<=10;i++){
                share.decr();}},"BB").start();/**
         * AA::1
         * BB::0
         * AA::1
         * BB::0
         * .....
         */}}

ReentrantLock (可重入锁)

ReentrantLock,意思是“可重入锁”。ReentrantLock 是唯一实现了 Lock 接口的类,并且 ReentrantLock 提供了更 多的方法。

可重入锁:什么是 “可重入”,可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁

packagecom.crush.juc02;importjava.util.Random;importjava.util.concurrent.locks.ReentrantLock;publicclassReentrantLockDemo{publicstaticvoidmain(String[] args){ReentrantLock lock =newReentrantLock();newThread(newRunnable(){@Overridepublicvoidrun(){try{
                    lock.lock();System.out.println("第1次获取锁,这个锁是:"+ lock);for(int i =2;i<=11;i++){try{
                            lock.lock();System.out.println("第"+ i +"次获取锁,这个锁是:"+ lock);try{Thread.sleep(newRandom().nextInt(200));}catch(InterruptedException e){
                                e.printStackTrace();}}finally{
                           lock.unlock();// 如果把这里注释掉的话,那么程序就会陷入死锁当中。}}}finally{
                    lock.unlock();}}}).start();newThread(newRunnable(){@Overridepublicvoidrun(){try{
                    lock.lock();System.out.println("这里是为了测试死锁而多写一个的线程");}finally{
                    lock.unlock();}}}).start();}}/**
 * 第1次获取锁,这个锁是:java.util.concurrent.locks.ReentrantLock@6b5fde1f[Locked by thread Thread-0]
 * 第2次获取锁,这个锁是:java.util.concurrent.locks.ReentrantLock@6b5fde1f[Locked by thread Thread-0]
 * 第3次获取锁,这个锁是:java.util.concurrent.locks.ReentrantLock@6b5fde1f[Locked by thread Thread-0]
 * ...
 */

死锁的话,程序就无法停止,直到资源耗尽或主动终止。
在这里插入图片描述

代码中也稍微提了一下死锁的概念,在使用Lock中必须手动解锁,不然就会可能造成死锁的现象。

ReadWriteLock (读写锁)

ReadWriteLock 也是一个接口,在它里面只定义了两个方法:

publicinterfaceReadWriteLock{// 获取读锁LockreadLock();// 获取写锁LockwriteLock();}

分为一个读锁一个写锁,将读写进行了分离,使可以多个线程进行读操作,从而提高了效率。
ReentrantReadWriteLock 实现了 ReadWriteLock 接口。里面提供了更丰富的方法,当然最主要的还是获取写锁(writeLock)和读锁(readLock)。

案例

假如多个线程要进行读的操作,我们用Synchronized 来实现的话。

publicclassSynchronizedDemo2{publicstaticvoidmain(String[] args){finalSynchronizedDemo2 test =newSynchronizedDemo2();newThread(()->{
            test.get(Thread.currentThread());}).start();newThread(()->{
            test.get(Thread.currentThread());}).start();}publicsynchronizedvoidget(Thread thread){long start =System.currentTimeMillis();while(System.currentTimeMillis()- start <=1){System.out.println(thread.getName()+"正在进行读操作");}System.out.println(thread.getName()+"读操作完毕");}}/**
 * 输出
 * Thread-0正在进行读操作
 * Thread-0读操作完毕
 * Thread-1正在进行读操作
 * Thread-1正在进行读操作
 * Thread-1正在进行读操作
 * ....
 * Thread-1读操作完毕
 */

改成读写锁之后

publicclassSynchronizedDemo2{privateReentrantReadWriteLock rwl =newReentrantReadWriteLock();publicstaticvoidmain(String[] args){finalSynchronizedDemo2 test =newSynchronizedDemo2();newThread(()->{
            test.get2(Thread.currentThread());}).start();newThread(()->{
            test.get2(Thread.currentThread());}).start();}publicvoidget2(Thread thread){
        rwl.readLock().lock();try{long start =System.currentTimeMillis();while(System.currentTimeMillis()- start <=1){System.out.println(thread.getName()+"正在进行读操作");}System.out.println(thread.getName()+"读操作完毕");}finally{
            rwl.readLock().unlock();}}}/**
 * 输出
 * Thread-0正在进行读操作
 * Thread-0读操作完毕
 * Thread-1正在进行读操作
 * Thread-1读操作完毕
 */

结论:改用读写锁后 线程1和线程2 同时在读,可以感受到效率的明显提升。

注意:

  • 若此时已经有一个线程占用了读锁,此时其他线程申请读锁是可以的,但是若此时其他线程申请写锁,则只有等待读锁释放,才能成功获得。
  • 若此时已经有一个线程占用了写锁,那么此时其他线程申请写锁或读锁,都只有持有写锁的线程释放写锁,才能成功获得。

Lock 与的 Synchronized 区别

类别synchronizedLock存在层次Java的关键字,在jvm层面上是一个接口锁的获取假设A线程获得锁,B线程等待。如果A线程阻塞,B线程会一直等待分情况而定,Lock有多个锁获取的方式,具体下面会说道,大致就是可以尝试获得锁,线程可以不用一直等待锁的释放1、当 synchronized 方法或者 synchronized 代码块执行完之后, 系统会自动让线程释放对锁的占用 (不需要手动释放锁)2、若线程执行发生异常,jvm会让线程释放锁在finally中必须释放锁,不然容易造成线程死锁现象 (需要手动释放锁)锁状态无法判断可以判断锁类型锁类型可重入 可判断 可公平(两者皆可)性能前提:大量线程情况下 同步效率较低前提:大量线程情况下 同步效率比synchronized高的多
Lock可以提高多个线程进行读操作的效率。

九、Callable 接口

前言:

在上上篇文章中,创建线程那个小角落,提到了这个,但是当时只是匆匆忙忙讲了一下。到这里再全面性的讲解一下。
我们以前使用实现Runnable接口的方式来创建线程,但是Runnable的run() 存在一个缺陷问题,就是不能将执行完的结果返回。
Java就是为了能够实现这个功能,在jdk1.5中提出了Callable接口。

概述:

Callable 接口位于java.util.concurrent包下。

@FunctionalInterfacepublicinterfaceCallable<V>{Vcall()throwsException;//计算结果,如果无法计算则抛出异常。}

Callable 类似于Runnable 接口,但Runnable 接口中的run()方法不会返回结果,并且也无法抛出经过检查的异常,但是Callable中call()方法能够返回计算结果,并且也能够抛出经过检查的异常。

实现:

通过实现Callable接口创建线程详细步骤:Runnable直接看代码就知道了哈。

  1. 创建实现Callable接口的类SomeCallable
  2. 创建一个类对象:Callable oneCallable = new SomeCallable();
  3. 由Callable创建一个FutureTask对象:FutureTask futureTask= new FutureTask(oneCallable); 注释:FutureTask是一个包装器,它通过接受Callable来创建,它同时实现了Future和Runnable接口。
  4. 由FutureTask创建一个Thread对象:Thread oneThread = new Thread(futureTask);
  5. 启动线程:oneThread.start();
publicclassDemo1{publicstaticvoidmain(String[] args){newThread(newRunnableDemo1(),"AA").start();FutureTask<Integer> futureTask =newFutureTask<>(newCallableDemo<Integer>());newThread(futureTask,"BB").start();// 在线程执行完后,我们可以通过futureTask的get方法来获取到返回的值。System.out.println(futureTask.get());}}classRunnableDemo1implementsRunnable{@Overridepublicvoidrun(){System.out.println(Thread.currentThread().getName()+"::通过实现Runnable来执行任务");}}classCallableDemo<Integer>implementsCallable<java.lang.Integer>{@Overridepublicjava.lang.Integercall()throwsException{System.out.println(Thread.currentThread().getName()+"::通过实现Callable接口来执行任务,并返回结果!");return1024;}}/**
 * AA::通过实现Runnable来执行任务
 * BB::通过实现Callable接口来执行任务,并返回结果!
 * 1024
 */

这里之所以要转成 FutureTask 放进 Thread中去,是因为Callable 本身与Thread没有关系,通过FutureTask 才能和Thread产生联系。

十、Future 接口

概述:

Future 接口同样位于java.util.concurrent包下。
Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。

publicinterfaceFuture<V>{booleancancel(boolean mayInterruptIfRunning);//尝试取消此任务的执行。booleanisCancelled();//如果此任务在正常完成之前被取消,则返回true booleanisDone();//如果此任务完成,则返回true 。 完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true Vget()throwsInterruptedException,ExecutionException;//获得任务计算结果Vget(long timeout,TimeUnit unit)throwsInterruptedException,ExecutionException,TimeoutException;//可等待多少时间去获得任务计算结果}

实现:

Future模式通俗点来描述就是:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。

publicclassDemo1{publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{FutureTask<Integer> futureTask =newFutureTask<>(newCallableDemo<Integer>());newThread(futureTask,"BB").start();System.out.println(futureTask.get());// 我们来测试一下任务是否已经完成System.out.println(futureTask.isDone());}}classCallableDemo<Integer>implementsCallable<java.lang.Integer>{@Overridepublicjava.lang.Integercall()throwsException{System.out.println(Thread.currentThread().getName()+"::通过实现Callable接口来执行任务,并返回结果!");return1024;}}

Future 用于存储从另一个线程获得的结果。如果只是简单创建线程,直接使用Runnable就可以,想要获得任务返回值,就用Future。

FutureTask

FutureTask介绍

位于 java.util.concurrent包下。
可取消的异步计算。 此类提供Future的基本实现,具有启动和取消计算、查询以查看计算是否完成以及检索计算结果的方法。 计算完成后才能检索结果; 如果计算尚未完成, get方法将阻塞。 一旦计算完成,就不能重新开始或取消计算(除非使用runAndReset调用计算)。结构图:
在这里插入图片描述

FutureTask实现了 Runnable 和 Future接口,并方便地将两种功能组合在一起。并且通过构造函数提供Callable来创建FutureTask,就可以提供给Thread来创建线程啦。
FutureTask有以下三种状态:

  • 未启动状态:还未执行run()方法。
  • 已启动状态:已经在执行run()方法。
  • 完成状态:已经执行完run()方法,或者被取消了,亦或者方法中发生异常而导致中断结束。

FutureTask应用场景及注意事项

应用场景:

  • 在主线程执行那种比较耗时的操作时,但同时又不能去阻塞主线程时,就可以将这样的任务交给FutureTask对象在后台完成,然后等之后主线程需要的时候,就可以直接get()来获得返回数据或者通过isDone()来获得任务的状态。
  • 一般FutureTask多应用于耗时的计算,这样主线程就可以把一个耗时的任务交给FutureTask,然后等到完成自己的任务后,再去获取计算结果

注意:

  • 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。
  • 一旦计 算完成,就不能再重新开始或取消计算。
  • get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
  • 因为只会计算一次,因此通常get方法放到最后。

使用放在下一小节啦👇

使用 Callable 和 Future🚩

这里的使用其实在上文已经提到过了,这里就将其更完善一些吧。

publicclassCallableDemo2{publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException,TimeoutException{CallableAndFutureTest callableAndFutureTest =newCallableAndFutureTest();FutureTask<String> task =newFutureTask<>(callableAndFutureTest);newThread(task).start();//        System.out.println("尝试取消任务,传true表示取消任务,false则不取消任务::"+task.cancel(true));System.out.println("判断任务是否已经完成::"+task.isDone());//结果已经计算出来,则立马取出来,如若摸没有计算出来,则一直等待,直到结果出来,或任务取消或发生异常。System.out.println("阻塞式获取结果::"+task.get());System.out.println("在获取结果时,给定一个等待时间,如果超过等待时间还未获取到结果,则会主动抛出超时异常::"+task.get(2,TimeUnit.SECONDS));}}classCallableAndFutureTestimplementsCallable<String>{@OverridepublicStringcall()throwsException{String str="";for(int i=0;i<10;i++){
            str+=String.valueOf(i);Thread.sleep(100);}return str;}}

十一、JUC三大常用工具类

CountDownLatch(减计数器)

概述:

CountDownLatch位于 java.util.concurrent包下。
CountDownLatch是一个同步辅助类,允许一个或多个线程等待,一直到其他线程执行的操作完成后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值是线程的数量。每当有一个线程执行完毕后,然后通过 countDown 方法来让计数器的值-1,当计数器的值为0时,表示所有线程都执行完毕,然后继续执行 await 方法 之后的语句,即在锁上等待的线程就可以恢复工作了。
CountDownLatch中主要有两个方法:

  1. countDown:
  • 递减锁存器的计数,如果计数达到零,则释放所有等待的线程。
  • 如果当前计数大于零,则递减。 如果新计数为零,则为线程调度目的重新启用所有等待线程。
  • 如果当前计数为零,则什么也不会发生。
publicvoidcountDown(){
    sync.releaseShared(1);}
  1. await:
  • 使当前线程等待直到闩锁倒计时为零,除非线程被中断。
  • 如果当前计数为零,则此方法立即返回。即await 方法阻塞的线程会被唤醒,继续执行
  • 如果当前计数大于零,则当前线程出于线程调度目的而被禁用并处于休眠状态
publicvoidawait()throwsInterruptedException{
    sync.acquireSharedInterruptibly(1);}

案例:

举个生活中的小例子:
我们一寝室人去上课,得等到1、2、3、4、5、6、7、8个人都出来,才可以锁上寝室门吧。即当计数器值为0时,就可以执行await的方法啦。

编码步骤:

  1. CountDownLatch countDownLatch = new CountDownLatch(8);
  2. countDownLatch.countDown(); 一个线程出来一个人,计数器就 -1
  3. countDownLatch.await(); 阻塞的等待计数器归零
  4. 执行后续步骤

我们用代码来模拟一下这个例子哈:

publicclassCountDownLatchDemo1{publicstaticvoidmain(String[] args){// 初始值8 有八个人需要出寝室门CountDownLatch countDownLatch =newCountDownLatch(8);for(int i =1; i <=8; i++){newThread(()->{System.out.println(Thread.currentThread().getName()+"出去啦");// 出去一个人计数器就减1
                countDownLatch.countDown();},String.valueOf(i)).start();}try{
            countDownLatch.await();// 阻塞等待计数器归零}catch(InterruptedException e){
            e.printStackTrace();}// 阻塞的操作 : 计数器  num++System.out.println(Thread.currentThread().getName()+"====寝室人都已经出来了,关门向教室冲!!!====");}}

小结:

CountDownLatch使用给定的计数进行初始化。 由于调用了countDown方法,每次-1, await方法会一直阻塞到当前计数达到零,然后释放所有等待线程,并且任何后续的await调用都会立即返回。 这是一种一次性现象——计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier 。
CountDownLatch一个有用属性是它不需要调用countDown线程在继续之前等待计数达到零,它只是阻止任何线程通过await,直到所有线程都可以通过。

CyclicBarrier(加法计数器)

概述:

CyclicBarrier 看英文单词就可以看出大概就是循环阻塞的意思。所以还常称为循环栅栏。
CyclicBarrier 主要方法有:

publicclassCyclicBarrier{privateintdowait(boolean timed,long nanos);// 供await方法调用 判断是否达到条件 可以往下执行吗//创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,每执行一次CyclicBarrier就累加1,达到了parties,就会触发barrierAction的执行publicCyclicBarrier(int parties,Runnable barrierAction);//创建一个新的CyclicBarrier ,参数就是目标障碍数,它将在给定数量的参与方(线程)等待时触发,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句publicCyclicBarrier(int parties)//返回触发此障碍所需的参与方数量。publicintgetParties()//等待,直到所有各方都在此屏障上调用了await 。// 如果当前线程不是最后一个到达的线程,那么它会出于线程调度目的而被禁用并处于休眠状态.直到所有线程都调用了或者被中断亦或者发生异常中断退出publicintawait()// 基本同上 多了个等待时间 等待时间内所有线程没有完成,将会抛出一个超时异常publicintawait(long timeout,TimeUnit unit)//将障碍重置为其初始状态。 publicvoidreset()}

public CyclicBarrier(int parties):的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后 的语句。可以将 CyclicBarrier 理解为加 1 操作。

public CyclicBarrier(int parties, Runnable barrierAction) :的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一次障碍数会加一,如果达到了目标障碍数,就会执行我们传入的Runnable;

案例:

我想大家多少玩过王者荣耀吧,里面不是有个钻石夺宝吗,抽201次必得荣耀水晶,这次让我们用代码来模拟一下吧。
编程步骤:

  1. 创建CyclicBarrier对象 CyclicBarrier cyclicBarrier = new CyclicBarrier(count, new MyRunnable());
  2. 编写业务代码
  3. cyclicBarrier.await(); //在线程里面等待阻塞,累加1,达到最大值count时,触发我们传入进去MyRunnable执行。
publicclassCyclicBarrierDemo1{publicstaticvoidmain(String[] args){// 第一个参数:目标障碍数  第二个参数:一个Runnable任务,当达到目标障碍数时,就会执行我们传入的Runnable// 当我们抽了201次的时候,就会执行这个任务。CyclicBarrier cyclicBarrier =newCyclicBarrier(201,()->{System.out.println("恭喜你,已经抽奖201次,幸运值已满,下次抽奖必中荣耀水晶!!!");});for(int i=1;i<=201;i++){finalint count=i;newThread(()->{System.out.println(Thread.currentThread().getName()+"抽奖一次");try{
                    cyclicBarrier.await();}catch(InterruptedException e){
                    e.printStackTrace();}catch(BrokenBarrierException e){
                    e.printStackTrace();}},String.valueOf(i)).start();}}// 这行代码是重置计数
    cyclicBarrier.reset();// 这里是我又加了 一次循环, 可以看到最后结果中输出了两次 "恭喜你"for(int i=1;i<=201;i++){finalint count=i;newThread(()->{System.out.println(Thread.currentThread().getName()+"抽奖一次");try{
                cyclicBarrier.await();}catch(InterruptedException e){
                e.printStackTrace();}catch(BrokenBarrierException e){
                e.printStackTrace();}},String.valueOf(i)).start();}}

小结:

CyclicBarrier和CountDownLatch其实非常相似,CyclicBarrier表示加法,CountDownLatch表示减法。
区别还是有的:

  1. CyclicBarrier只能够唤醒一个任务,CountDownLatch可以唤起多个任务。
  2. CyclicBarrier可以重置,重新使用,但是CountDownLatch的值等于0时,就不可重复用了。

Semaphore( 信号灯)

概述:

Semaphore:信号量通常用于限制可以访问某些(物理或逻辑)资源的线程数。
使用场景:
限制资源,如抢位置、限流等。

案例:

【例子】:
不知道大家有没有过在网吧抢电脑打游戏的那种经历,小时候,平常便宜点的网吧都比较小,而且也比较少,特别多的人去,去晚了的人就只有站在那里看,等别人下机才能上网。
这次的例子就是:网吧有十台高配置打游戏的电脑,有20个小伙伴想要上网。
我们用代码来模拟一下:
编程步骤:

  1. 创建信号灯 Semaphore semaphore = new Semaphore(10); // 5个位置
  2. 等待获取信号灯 semaphore.acquire();//等待获取许可证
  3. 业务代码
  4. 释放信号 semaphore.release();//释放资源,女朋友来找了,下机下机,陪女朋友去了,那么就要释放这台电脑啦
publicclassSemaphoreDemo1{publicstaticvoidmain(String[] args){// 10台电脑Semaphore semaphore =newSemaphore(10);// 20 个小伙伴想要上网for(int i =1; i <=20; i++){newThread(()->{try{//等待获取许可证
                    semaphore.acquire();System.out.println(Thread.currentThread().getName()+"抢到了电脑");//抢到的小伙伴,迅速就开打啦 这里就模拟个时间哈,TimeUnit.SECONDS.sleep(10);}catch(InterruptedException e){
                    e.printStackTrace();}finally{//打完几把游戏的小伙伴 女朋友来找了 溜啦溜啦 希望大家都有人陪伴System.out.println("女朋友来找,"+Thread.currentThread().getName()+"离开了");
                    semaphore.release();//释放资源,离开了就要把电脑让给别人啦。}},String.valueOf(i)).start();}}}

小结:

  1. 在获得一个项目之前,每个线程必须从信号量中获得一个许可,以保证一个项目可供使用。 当线程完成该项目时,它会返回到池中,并且将许可返回给信号量,允许另一个线程获取该项目。 请注意,调用acquire时不会持有同步锁,因为这会阻止项目返回到池中。 信号量封装了限制访问池所需的同步,与维护池本身一致性所需的任何同步分开。
  2. 初始化为 1 的信号量,并且使用时最多只有一个许可可用,可以用作互斥锁。 这通常被称为二进制信号量,因为它只有两种状态:一种许可可用,或零许可可用。 以这种方式使用时,二进制信号量具有属性(与许多java.util.concurrent.locks.Lock实现不同),即“锁”可以由所有者以外的线程释放(因为信号量没有所有权的概念)。 这在某些特定上下文中很有用,例如死锁恢复。
  3. 此类的构造函数可以选择接受公平参数。 当设置为 false 时,此类不保证线程获取许可的顺序。 当公平性设置为真时,信号量保证调用任何acquire方法的线程被选择以按照它们对这些方法的调用的处理顺序(先进先出;FIFO)获得许可。
  4. 通常,用于控制资源访问的信号量应初始化为公平的,以确保没有线程因访问资源而饿死。 当使用信号量进行其他类型的同步控制时,非公平排序的吞吐量优势通常超过公平性考虑。
  5. 内存一致性影响:在调用“释放”方法(如release()之前线程中的操作发生在另一个线程中成功的“获取”方法(如acquire()之后的操作之前。

简单讲述 | Phaser & Exchanger

Phaser

Phaser一种可重用的同步屏障,功能上类似于CyclicBarrier和CountDownLatch,但使用上更为灵活。非常适用于在多线程环境下同步协调分阶段计算任务(Fork/Join框架中的子任务之间需同步时,优先使用Phaser)

//默认的构造方法,初始化注册的线程数量为0,可以动态注册Phaser();//指定了线程数量的构造方法Phaser(int parties);//添加一个注册者  向此移相器添加一个新的未到达方。 如果正在进行对onAdvance调用,则此方法可能会在返回之前等待其完成。register();//添加指定数量的注册者  将给定数量的新未到达方添加到此移相器(移相器就是Phaser)。bulkRegister(int parties);// 到达屏障点直接执行 无需等待其他人到达。arrive();//到达屏障点后,也必须等待其他所有注册者到达这个屏障点才能继续下一步arriveAndAwaitAdvance();//到达屏障点,把自己注销了,不用等待其他的注册者到达arriveAndDeregister();//多个线程达到注册点之后,会回调这个方法,可以做一些逻辑的补充onAdvance(int phase,int registeredParties);
packagecom.crush.juc05;importjava.util.concurrent.Phaser;publicclassPhaserDemo{privatestaticPhaser phaser =newMyPhaser();//自定义一个移相器来自定义输出staticclassMyPhaserextendsPhaser{/**
         * @deprecated 在即将到来的阶段提前时执行操作并控制终止的可覆盖方法。 此方法在推进此移相器的一方到达时调用(当所有其他等待方处于休眠状态时)。
         *             如果此方法返回true ,则此移相器将在提前时设置为最终终止状态,并且对isTerminated后续调用将返回 true。
         * @param phase 进入此方法的当前阶段号,在此移相器前进之前
         * @param registeredParties 当前注册方的数量
         * @return
         */@OverrideprotectedbooleanonAdvance(int phase,int registeredParties){if(phase ==0){System.out.println("所有人都到达了网吧,准备开始开黑!!!");returnfalse;}elseif(phase ==1){System.out.println("大家都同意,一起去次烧烤咯!!!");returnfalse;}elseif(phase ==2){System.out.println("大家一起回寝室!!!");returntrue;}returntrue;}}//构建一个线程任务staticclassDoSomeThingimplementsRunnable{@Overridepublicvoidrun(){/**
             * 向此移相器添加一个新的未到达方
             */
            phaser.register();System.out.println(Thread.currentThread().getName()+"从家里出发,准备去学校后街上网开黑!!!");
            phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+"上着上着饿了,说去次烧烤吗?");
            phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName()+"烧烤次完了");
            phaser.arriveAndAwaitAdvance();}}publicstaticvoidmain(String[] args)throwsException{DoSomeThing thing =newDoSomeThing();newThread(thing,"小明").start();newThread(thing,"小王").start();newThread(thing,"小李").start();}}/**
 * 小李从家里出发,准备去学校后街上网开黑!!!
 * 小王从家里出发,准备去学校后街上网开黑!!!
 * 小明从家里出发,准备去学校后街上网开黑!!!
 * 所有人都到达了网吧,准备开始开黑!!!
 * 小李上着上着饿了,说去次烧烤吗?
 * 小明上着上着饿了,说去次烧烤吗?
 * 小王上着上着饿了,说去次烧烤吗?
 * 大家都同意,一起去次烧烤咯!!!
 * 小明烧烤次完了
 * 小李烧烤次完了
 * 小王烧烤次完了
 * 大家一起回寝室!!!
 */

注意:这里只是做了简单的一个使用,更深入的了解,我暂时也没有,想要研究可以去查一查。

Exchanger

Exchanger允许两个线程在某个汇合点交换对象,在某些管道设计时比较有用。
Exchanger提供了一个同步点,在这个同步点,一对线程可以交换数据。每个线程通过exchange()方法的入口提供数据给他的伙伴线程,并接收他的伙伴线程提供的数据并返回。

当两个线程通过Exchanger交换了对象,这个交换对于两个线程来说都是安全的。Exchanger可以认为是 SynchronousQueue 的双向形式,在运用到遗传算法和管道设计的应用中比较有用。

这个的使用我在Dubbo中的总体架构图中看到了它的身影。

在这里插入图片描述

十二、总结

本文主要介绍了JUC是什么、JUC框架结构概述、常用到的类汇总、相关名词、重点介绍了Lock锁,Callable接口、Feature接口以及JUC三大常用工具类,希望对大家有帮助,欢迎评论区留言。

标签: jvm java 大数据

本文转载自: https://blog.csdn.net/u011397981/article/details/129836706
版权归原作者 逆流°只是风景-bjhxcc 所有, 如有侵权,请联系我们删除。

“【并发编程】JUC并发编程(彻底搞懂JUC)”的评论:

还没有评论