0


[并发进阶]——读写锁 原理

笔记来源于 黑马程序员全面深入学习Java并发编程,从《Java并发编程的艺术》中作为补充

文章目录

📌概念

🚩读写锁与排它锁不同在于它锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读 线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写 锁,使得并发性相比一般的排他锁有了很大提升。

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select … from … lock in share mode

💻读写状态的设计

读写锁同样是通过自定义同步器来实现同步功能,而不同于ReentrantLock中的同步状态仅能表示锁被一个线程重复获取的次数,读写锁状态(一个整型变量)可以表示多个读线程和一个写线程的状态。

那如何才能实现一个整型变量可以维护多个状态?

读写锁将 变量切分成了两个部分,高16位表示读,低16位表示写

image-20220112202308470

读写锁是如何迅速确定读和写各自的状态呢?

💡通过位运算。

假设当前同步状态 值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移 16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是 S+0x00010000。

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读 状态(S>>>16)大于0,即读锁已被获取

⚠️注意事项:

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
  • 重入时降级支持:即持有写锁的情况下去获取读锁

🎬图解流程以及源码分析

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

我们假设有两个线程t1、t2,其中t1是写线程,t2是读线程

1、t1先上写锁,然后t2尝试获取读锁

1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁 使用的是 state 的高 16 位

image-20220112205731698

// 外部类 WriteLock 方法, 方便阅读, 放在此处publicvoidlock(){
    sync.acquire(1);}// AQS 继承过来的方法, 方便阅读, 放在此处publicfinalvoidacquire(int arg){if(// 尝试获得写锁失败!tryAcquire(arg)&&// 将当前线程关联到一个 Node 对象上, 模式为独占模式// 进入 AQS 队列阻塞acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){selfInterrupt();}}

2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写 锁占据,那么 tryAcquireShared 返回 -1 表示失败

tryAcquireShared 返回值表示

  • -1 表示失败
  • 0 表示成功,但后继节点不会继续唤醒
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒

实际上,在读写锁中,只会返回-1、1两种值

image-20220112205739754

publicfinalvoidacquireShared(int arg){// tryAcquireShared 返回负数, 表示获取读锁失败if(tryAcquireShared(arg)<0){doAcquireShared(arg);}}

3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

image-20220112210224611

// AQS 继承过来的方法, 方便阅读, 放在此处privatevoiddoAcquireShared(int arg){// 将当前线程关联到一个 Node 对象上, 模式为共享模式final Node node =addWaiter(Node.SHARED);boolean failed =true;try{boolean interrupted =false;for(;;){final Node p = node.predecessor();if(p == head){// 再一次尝试获取读锁int r =tryAcquireShared(arg);// 成功if(r >=0){// r 表示可用资源数, 在这里总是 1 允许传播//(唤醒 AQS 中下一个 Share 节点)setHeadAndPropagate(node, r);
                    p.next = null;// help GCif(interrupted)selfInterrupt();
                    failed =false;return;}}if(// 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)shouldParkAfterFailedAcquire(p, node)&&// park 当前线程parkAndCheckInterrupt()){
                interrupted =true;}}}finally{if(failed)cancelAcquire(node);}}

4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁(节点自旋)

5)如果没有成功,在 doAcquireShared 内 循环一次,把前驱节点的 waitStatus 改为 -1,再 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

image-20220112210612334

2、又 有t3加读锁和 t4加写锁

假设t3是读锁、t4是写锁

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

image-20220112210911985

3、t1释放锁

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

image-20220112211020968

// AQS 继承过来的方法publicfinalbooleanrelease(int arg){// 尝试释放写锁成功if(tryRelease(arg)){// unpark AQS 中等待的线程
            Node h = head;if(h != null && h.waitStatus !=0)unparkSuccessor(h);returntrue;}returnfalse;}
// Sync 继承过来的方法protectedfinalbooleantryRelease(int releases){if(!isHeldExclusively())thrownewIllegalMonitorStateException();int nextc =getState()- releases;// 因为可重入的原因, 写锁计数为 0, 才算释放成功boolean free =exclusiveCount(nextc)==0;if(free){setExclusiveOwnerThread(null);}setState(nextc);return free;}

4、t2恢复运行

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一,如下state从0_0变成了1_0

image-20220112211323960

这时 t2 已经恢复运行 ,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

注意此处的t2不是设置为exclusiveOwnerThread,因为读线程是并行的

image-20220112211438439

5、紧接着唤醒下一个读线程

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

因为读线程是并行的,所以要继续唤醒下一个读线程

// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处privatevoidsetHeadAndPropagate(Node node,int propagate){
    Node h = head;// Record old head for check below// 设置自己为 headsetHead(node);// propagate 表示有共享资源(例如共享读锁或信号量)// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE// 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATEif(propagate >0|| h == null || h.waitStatus <0||(h = head)== null || h.waitStatus <0){
        Node s = node.next;// 如果是最后一个节点或者是等待共享读锁的节点if(s == null || s.isShared()){doReleaseShared();}}}

image-20220112211610042

这回再来一次 for (;😉 执行 tryAcquireShared 成功则让读锁计数加一

image-20220112211828461

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

image-20220112212017005

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

6、读线程释放锁

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

// AQS 继承过来的方法publicfinalbooleanreleaseShared(int arg){if(tryReleaseShared(arg)){doReleaseShared();returntrue;}returnfalse;}
// Sync 继承过来的方法, 方便阅读, 放在此处protectedfinalbooleantryReleaseShared(int unused){// ... 省略不重要的代码for(;;){int c =getState();int nextc = c - SHARED_UNIT;if(compareAndSetState(c, nextc)){// 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程// 计数为 0 才是真正释放return nextc ==0;}}}

image-20220112212129855

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

注意这里要先将waitStatus 先改为 0, 防止 多线程状态下 unparkSuccessor 被多次执行

// AQS 继承过来的方法privatevoiddoReleaseShared(){// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark// 如果 head.waitStatus == 0 ==> Node.PROPAGATE for(;;){
        Node h = head;if(h != null && h != tail){int ws = h.waitStatus;// 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0// 防止 unparkSuccessor 被多次执行if(ws == Node.SIGNAL){if(!compareAndSetWaitStatus(h, Node.SIGNAL,0))continue;// loop to recheck casesunparkSuccessor(h);}// 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析elseif(ws ==0&&!compareAndSetWaitStatus(h,0, Node.PROPAGATE))continue;// loop on failed CAS}if(h == head)// loop if head changedbreak;}}

image-20220112212146161

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;😉 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束

image-20220112212200469

🔒锁降级

锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读 锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到 读锁,随后释放(先前拥有的)写锁的过程。

😕锁降级中读锁的获取是否必要呢?

答案是必要的。主要是为了保证数据的可见性,如果 当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修 改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级 的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进 行数据更新。

⚠️RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的 也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了 数据,则其更新对其他获取到读锁的线程是不可见的。
锁降级是指把持住(当前拥有的)写锁,再获取到 读锁,随后释放(先前拥有的)写锁的过程。


本文转载自: https://blog.csdn.net/weixin_65349299/article/details/122463278
版权归原作者 一定会去到彩虹海的麦当 所有, 如有侵权,请联系我们删除。

“[并发进阶]&mdash;&mdash;读写锁 原理”的评论:

还没有评论