0


HashMap很美好,但线程不安全怎么办?ConcurrentHashMap告诉你答案!

写在开头

在《耗时2天,写完HashMap》这篇文章中,我们提到关于HashMap线程不安全的问题,主要存在如下3点风险:

风险1: put的时候导致元素丢失;如两个线程同时put,且key值相同的情况下,后一个线程put操作覆盖了前一个线程的操作,导致前一个线程的元素丢失。
风险2: put 和 get 并发时会导致 get 到 null;若一个线程的put操作触发了数组的扩容,这时另外一个线程去get,因为扩容的操作很耗时,这时有可能会卡死或者get到null。
风险3: 多线程下扩容会死循环;多线程下触发扩容时,因为前一个线程已经破坏了原有链表结构,后一个线程再去读取节点,进行链接的时候,很可能发生顺序错乱,从而形成一个环形链表,进而导致死循环。

Hashtable解决线程安全靠谱吗?

那我们怎么办呢?很多小伙伴可能第一时间想到了HashTable,因为它和HashMap拥有者相似的功能,底层也是基于哈希表实现,数组+链表构建,数组容量到达阈值后,同样会自动扩容,Hashtable 默认的初始大小为 11,之后每次扩充,容量变为原来的 2n+1。并且,**

Hashtable内部的方法几乎都是synchronized关键字修饰,保证了线程的安全

**。

哇!这样一看,Hashtable简直是解决HashMap线程不安全的天选之子啊!但事实上,因为性能的问题,Hashtable已经在被废弃的边缘了,非常不建议在代码中使用它,原因如下接着往下看。
我们先写一个小小的测试类,来感受一下Hashtable的使用。

【代码示例1】

publicclassTest{publicstaticvoidmain(String[] args){HashMap<Integer,String> map =newHashMap<>();
        map.put(1,"I");
        map.put(2,"love");
        map.put(3,"Java");Hashtable<Integer,String> hashtable =newHashtable<>();
        hashtable.put(1,"JavaBuild");for(Map.Entry<Integer,String> entry : hashtable.entrySet()){System.out.println(entry.getKey()+":"+entry.getValue());}}}

输出:

1:JavaBuild

然后,我们跟入到put中的原来,去看看它的底层实现

【源码解析1】

publicsynchronizedVput(K key,V value){// Make sure the value is not nullif(value ==null){thrownewNullPointerException();}// Makes sure the key is not already in the hashtable.Entry<?,?> tab[]= table;int hash = key.hashCode();int index =(hash &0x7FFFFFFF)% tab.length;@SuppressWarnings("unchecked")Entry<K,V> entry =(Entry<K,V>)tab[index];for(; entry !=null; entry = entry.next){if((entry.hash == hash)&& entry.key.equals(key)){V old = entry.value;
                entry.value = value;return old;}}addEntry(hash, key, value, index);returnnull;}

通过这段源码我们能够发现
1、Hashtable哈希值的计算,并没有像HashMap那样重新计算,而是直接取key的hashCode()方法,这样一来它的扰动次数明显降低,hash的重合度更高;
2,index的位置计算中,Hashtable采用了%取余运算,而HashMap采用的是&运算,我们知道位运算直接对内存数据进行操作,不需要转成十进制,处理速度非常快,相比之下Hashtable的效率低下。
3,底层大部分的方法都是synchronized修饰,我们知道用synchronized 来保证线程安全的效率非常低下。当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈效率越低。

以上3点足以让我们头也不回的舍弃Hashtable,那么问题来了,除了这个集合类外,我们还有什么选项呢?这时,**

ConcurrentHashMap

** 高高的举起了它的小手!

ConcurrentHashMap

文章写到这些,终于引出了我们今天的主角,ConcurrentHashMap!作为一个效率又高,又能保证线程安全的集合类,它的使用频率非常之高,话不多说,我们先来画一个底层逻辑实现图感受一下它的魅力!

JDK1.8下的ConcurrentHashMap底层实现

在这里插入图片描述
哦,对了,虽然我们现在主流的Java版本都是1.8+了,但很多公司在面试的时候,提及ConcurrentHashMap时,有时候还是会问到1.7的底层实现,因此,学有余力的小伙伴,私下里把JDK1.7的底层源码也拿过来读读哈(build哥本地没有安装JDK1.7,就不贴源码解析了)。

JDK1.8中ConcurrentHashMap抛弃了原有的 Segment 分段锁,采用了 CAS + synchronized 来保证并发安全性,底层结构采用Node数组+链表/红黑树,当链表长度达到一定长度后,会转为红黑树,这和HashMap一样。

【PUT源码解析】

publicVput(K key,V value){returnputVal(key, value,false);}/** Implementation for put and putIfAbsent */finalVputVal(K key,V value,boolean onlyIfAbsent){// key 和 value 不能为空if(key ==null|| value ==null)thrownewNullPointerException();int hash =spread(key.hashCode());int binCount =0;for(Node<K,V>[] tab = table;;){// f = 目标位置元素Node<K,V> f;int n, i, fh;// fh 后面存放目标位置的元素 hash 值if(tab ==null||(n = tab.length)==0)// 数组桶为空,初始化数组桶(自旋+CAS)
            tab =initTable();elseif((f =tabAt(tab, i =(n -1)& hash))==null){// 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出if(casTabAt(tab, i,null,newNode<K,V>(hash, key, value,null)))break;// no lock when adding to empty bin}elseif((fh = f.hash)==MOVED)
            tab =helpTransfer(tab, f);else{V oldVal =null;// 使用 synchronized 加锁加入节点synchronized(f){if(tabAt(tab, i)== f){// 说明是链表if(fh >=0){
                        binCount =1;// 循环加入新的或者覆盖节点for(Node<K,V> e = f;;++binCount){K ek;if(e.hash == hash &&((ek = e.key)== key ||(ek !=null&& key.equals(ek)))){
                                oldVal = e.val;if(!onlyIfAbsent)
                                    e.val = value;break;}Node<K,V> pred = e;if((e = e.next)==null){
                                pred.next =newNode<K,V>(hash, key,
                                                          value,null);break;}}}elseif(f instanceofTreeBin){// 红黑树Node<K,V> p;
                        binCount =2;if((p =((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value))!=null){
                            oldVal = p.val;if(!onlyIfAbsent)
                                p.val = value;}}}}if(binCount !=0){if(binCount >=TREEIFY_THRESHOLD)treeifyBin(tab, i);if(oldVal !=null)return oldVal;break;}}}addCount(1L, binCount);returnnull;}

源码有点长,大致做了如下几点:

  • 先根据 key 计算出 hashcode;
  • 判断数组桶是否为空,若为空则通过tab = initTable(),初始化数组桶(自旋+CAS);
  • 计算出key的数组桶位置后,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功;
  • 如果当前位置的 “hashcode == MOVED == -1”,则需要进行扩容;
  • 如果都不满足,则利用 synchronized 锁写入数据;
  • 如果数量大于 TREEIFY_THRESHOLD 则要执行树化方法,在 treeifyBin 中会首先判断当前数组长度 ≥64 时才会将链表转换为红黑树。

【源码扩展1】
上面put的时候,若Node数组桶为空时,需要进行初始化,那么我们跟入initTable()中去看一看它的源码实现。

/**
 * Initializes table, using the size recorded in sizeCtl.
 */privatefinalNode<K,V>[]initTable(){Node<K,V>[] tab;int sc;while((tab = table)==null|| tab.length ==0){// 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。if((sc = sizeCtl)<0)// 让出 CPU 使用权Thread.yield();// lost initialization race; just spinelseif(U.compareAndSwapInt(this,SIZECTL, sc,-1)){try{if((tab = table)==null|| tab.length ==0){int n =(sc >0)? sc :DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt =(Node<K,V>[])newNode<?,?>[n];
                    table = tab = nt;
                    sc = n -(n >>>2);}}finally{
                sizeCtl = sc;}break;}}return tab;}

从源码中我们可以看到,它的初始化是通过CAS和自旋完成的,注意其中的sizeCtl私有成员变量,当它的值小于0(准确来说等于-1)时,说明另外的线程执行CAS 成功,正在进行初始化。通过Thread.yield()做线程让步动作,让出CPU的使用权,自旋等待,随着获得资源,进入CAS。

知识点补充

CAS(compare and swap) 译为:比较与交换

// 如果在这个位置(address) 的值等于 这个值(expectedValue),那么交换(newValue)。booleanCAS(address,expectedValue,newValue){if(address 的 value == expectedValue){
        address 的 value = newValue;returntrue;}}

自旋: 所谓的自旋,旨在线程抢锁失败后进入阻塞状态,放弃 CPU,需要过很久才能再次被调度。但经过测算,大部分情况下,虽然当前抢锁失败,但过不了很久,锁就会被释放。因此,当某个线程抢占 CPU 失败后,保持就绪状态,一旦锁释放,就会继续抢占。
以上这2点内容,在后面的并发多线程中会着重学习,在这里浅浅点名,让大家明白他们的意思和作用即可。

【源码扩展2】
当链表的长度大于8时,会转为红黑树,而红黑树的实现,是通过底层的TreeBin,我们跟进去看一下。

staticfinalclassTreeBin<K,V>extendsNode<K,V>{TreeNode<K,V> root;volatileTreeNode<K,V> first;volatileThread waiter;volatileint lockState;// values for lockStatestaticfinalintWRITER=1;// set while holding write lockstaticfinalintWAITER=2;// set when waiting for write lockstaticfinalintREADER=4;// increment value for setting read lock...}

TreeBin通过root属性维护红黑树的根结点,因为红黑树在旋转的时候,根结点可能会被它原来的子节点替换掉,在这个时间点,如果有其他线程要写这棵红黑树就会发生线程不安全问题,所以在 ConcurrentHashMap 中TreeBin通过waiter属性维护当前使用这棵红黑树的线程,来防止其他线程的进入。

【Get源码解析】
与put相比,get的源码就简单太多了,大概进行了如下几步操作:
1,根据计算出来的 hash 值寻址,如果在桶上直接返回值;
2,如果是红黑树,按照树的方式获取值;
3,如果是链表,按链表的方式遍历获取值;

publicVget(Object key){Node<K,V>[] tab;Node<K,V> e, p;int n, eh;K ek;// key 所在的 hash 位置int h =spread(key.hashCode());if((tab = table)!=null&&(n = tab.length)>0&&(e =tabAt(tab,(n -1)& h))!=null){// 如果指定位置元素存在,头结点hash值相同if((eh = e.hash)== h){if((ek = e.key)== key ||(ek !=null&& key.equals(ek)))// key hash 值相等,key值相同,直接返回元素 valuereturn e.val;}elseif(eh <0)// 头结点hash值小于0,说明正在扩容或者是红黑树,find查找return(p = e.find(h, key))!=null? p.val :null;while((e = e.next)!=null){// 是链表,遍历查找if(e.hash == h &&((ek = e.key)== key ||(ek !=null&& key.equals(ek))))return e.val;}}returnnull;}

总结

文章写到这里,ConcurrentHashMap的介绍基本讲完了,我们现在来自我总结一下为啥它的效率又高,又能保证线程安全。
以JDK1.8版本阐述:

  1. Node + CAS + synchronized 保证并发安全,每次上锁的颗粒度细到链表或红黑树的根节点,不会影响其他Node的读写,此外CAS是轻量级的,synchronized 也经过了锁升级;
  2. JDK1.7的版本里采用的Segment 分段锁,颗粒度粗不说,Segment 的个数一旦初始化就不能改变。 Segment 数组的大小默认是 16,也就是说默认可以同时支持 16 个线程并发写。而1.8的版本中,Node是一个数组,初始默认为16,后续仍然可以以2的幂次方级别进行扩容,因此,它所支持的并发量要看它数组的真实容量;
  3. 效率高是因为它底层采用了和JDK1.8中HashMap相同的数组+链表/红黑树结构。

结尾彩蛋

如果本篇博客对您有一定的帮助,大家记得留言+点赞+收藏呀。原创不易,转载请联系Build哥!

在这里插入图片描述
如果您想与Build哥的关系更近一步,还可以关注俺滴公众号“JavaBuild888”,在这里除了看到《Java成长计划》系列博文,还有提升工作效率的小笔记、读书心得、大厂面经、人生感悟等等,欢迎您的加入!

在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_43506040/article/details/136397430
版权归原作者 JavaBuild888 所有, 如有侵权,请联系我们删除。

“HashMap很美好,但线程不安全怎么办?ConcurrentHashMap告诉你答案!”的评论:

还没有评论