本节主要介绍J.U.C包中的几种并发安全集合:ConcurrentHashMap,ConcurrentLinkedQueue,和ConcurrentLinkedDeque。所谓并发安全集合,相对于普通集合来说,能够保证在多线程环境下向集合中添加数据时的线程安全性。主要讲ConcurrentHashMap在实现线程安全性方面对性能和安全性的合理平衡。
并发安全集合ConcurrentHashMap
ConcurrentHashMap是JDK1.5引入的一个并发安全且高效的HashMap,简单来说,我们可以认为他在HashMap的基础上增加了线程安全性的保障。
实现上,关于HashMap采用数组+链表的结构来存储数据,在多个线程并发执行扩容时,可能造成环形链进而导致死循环和数据丢失;在JDK1.8中,HashMap采用数组+链表+红黑树的数据结构来存储数据,优化了JDK1.7中数据扩容的方案,解决了死循环和丢失的问题,但是在并发场景下调用put(),还会存在数据覆盖的问题。
为了解决线程安全性问题带来的影响,我们可以采用一些具备线程安全性的集合,比如HashTable,它使用synchronized关键字来保障线程的安全性;还有Conllections.synchronizeMap,它可以把一个线程不安全的Map,通过synchronized(互斥锁)的方式转换成安全的。但是这些方法有一个问题,在线程竞争激烈的情况下,效率非常低。原因是因为使用synchronized实现锁机制,会导致所有线程在操作数据的时候不管是put还是get操作都需要去竞争同一把锁。
ConcurrentHashMap在性能和安全方面的设计和实现都比较巧妙,技能保证线程的安全性,在性能方面也远远优于HashTable等集合。
正确理解ConcurrentHashMap的线程安全
ConcurrentHashMap本身就是一个HashMap,因此在实际应用上,只需要考虑当前场景是否存在多线程并发访问同一个Map实例,如果存在,则采用ConcurrentMap。需要注意的是,ConcurrentHashMap的线程安全性,只能保证多线程并发执行时,容器中的数据不会被破坏,但是涉及到多个线程的复合操作,ConcurrentHashMap无法保证业务行为的正确性。
public class ConCurrentHahMapExample {
private static final ConcurrentMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);
public static void main(String[] args) {
Long accessCount = USER_ACCESS_COUNT.get("Liang");
if (null == accessCount) {
USER_ACCESS_COUNT.put("Liang", 1L);
} else {
USER_ACCESS_COUNT.put("Liang", accessCount + 1);
}
}
}
上述代码属于一个复合操作,也就是“读-修改-写”,这三个操作不是原子的,所以当多个线程去访问同一个用户的时候可能会导致覆盖相互操作的结果,造成记录的数据少于实际的次数。
虽然ConcurrentHashMap是线程安全的,但是对于ConcurrentHash的复合操作,我们需要特别关注。当然上述问题有很多解决方案,比如我们针对于复合操作进行枷锁。ConcurrentHashMap提供了另外一个解决方案,就是使用ConcurrentMap接口定义。
publicinterfaceConcurrentMap<K,V>extendsMap<K,V>{VputIfAbsent(K key,V value);booleanremove(Object key,Object value);booleanreplace(K key,V oldValue,V newValue);Vreplace(K key,V value);}
- putIfAbsent():向ConcurrentHashMap集合插入数据,如果插入的key不存在集合中,则保存当前数据并返回null。如果key已经存在,则返回key对于的value值。
- remove():根据key和value来删除ConcurrentHashMap集合中的数据,该删除操作必须保证key和value完全匹配,如果匹配成功则返回true,否则返回false。
- replace(K,V,V):根据key和oldValue来替换ConcurrentHashMap中已经存在的值,新的值是newValue,该提壶按操作必须保证key和oldValue完全匹配,替换成功则返回地面true,否则返回false。
- replace(K,V):少了对oldValue的判断,如果替换成功,则返回替换之前的value,否则返回null。
public class ConCurrentHahMapExample {
private static final ConcurrentMap<String, Long> USER_ACCESS_COUNT = new ConcurrentHashMap<>(64);
public static void main(String[] args) {
while (true) {
Long accessCount = USER_ACCESS_COUNT.get("Liang");
if (null == accessCount) {
if (null == USER_ACCESS_COUNT.putIfAbsent("Liang", 1L)) {
break;
}
} else {
if (USER_ACCESS_COUNT.replace("Liang", accessCount, accessCount + 1)) {
break;
}
}
}
}
}
JDK1.8提供了支持lambda表达式的原子操作:
public interface ConcurrentMap<K, V> extends Map<K, V> {
@Override
default V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
Objects.requireNonNull(mappingFunction);
V v, newValue;
return ((v = get(key)) == null &&
(newValue = mappingFunction.apply(key)) != null &&
(v = putIfAbsent(key, newValue)) == null) ? newValue : v;
}
@Override
default V computeIfPresent(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
V oldValue;
while((oldValue = get(key)) != null) {
V newValue = remappingFunction.apply(key, oldValue);
if (newValue != null) {
if (replace(key, oldValue, newValue))
return newValue;
} else if (remove(key, oldValue))
return null;
}
return oldValue;
}
@Override
default V compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
V oldValue = get(key);
for(;;) {
V newValue = remappingFunction.apply(key, oldValue);
if (newValue == null) {
// delete mapping
if (oldValue != null || containsKey(key)) {
// something to remove
if (remove(key, oldValue)) {
// removed the old value as expected
return null;
}
// some other value replaced old value. try again.
oldValue = get(key);
} else {
// nothing to do. Leave things as they were.
return null;
}
} else {
// add or replace old mapping
if (oldValue != null) {
// replace
if (replace(key, oldValue, newValue)) {
// replaced as expected.
return newValue;
}
// some other value replaced old value. try again.
oldValue = get(key);
} else {
// add (replace if oldValue was null)
if ((oldValue = putIfAbsent(key, newValue)) == null) {
// replaced
return newValue;
}
// some other value replaced old value. try again.
}
}
}
}
@Override
default V merge(K key, V value,
BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
Objects.requireNonNull(remappingFunction);
Objects.requireNonNull(value);
V oldValue = get(key);
for (;;) {
if (oldValue != null) {
V newValue = remappingFunction.apply(oldValue, value);
if (newValue != null) {
if (replace(key, oldValue, newValue))
return newValue;
} else if (remove(key, oldValue)) {
return null;
}
oldValue = get(key);
} else {
if ((oldValue = putIfAbsent(key, value)) == null) {
return value;
}
}
}
}
}
computeIfAbsent()方法详解
computeIfAbsent()方法是通过判断传入key是否存在来对ConcurrentMap集合进行数据初始化操作,如果存在,则不做任何处理;如果不存在则调用remappingFunction计算出balue值,然后把key=value存入ConcurrentHashMap中。由于remappingFunction时一个函数式接口,所以他的返回值也会影响存储结果。
- 如果mappingFunction返回的value不为null,则存储key=value。
- 如果mappingFunction返回的value为null,由于ConcurrentHashMap不允许value为null,所以不会存储,返回null。
USER_ACCESS_COUNT.computeIfAbsent("Liang",v->10L);
computeIfPresent()方法详解
和computeIfAbsent()方法的作用正好相反,computeIfPresent()方法对已经存在的key对应的value值进行修改。如果key不存在,则返回null;如果key存在,则调用remappingFunction进行运算,根据返回的value的情况做出不同的处理。
- 如果remappingFunction返回的value不为null,则修改当前key的value为remappingFunction的值。
- 如果remappingFunction的返回值为null,则删除当前的key,相当于调用了remove(key)方法。
- 如果remappingFunction抛出异常,则原本key对应的value值不会发生变化。
USER_ACCESS_COUNT.computeIfPresent("Liang",(k,v)->v+1L);
compute()方法详解
compute()方法相当于computeIfAbsent()和computeIfPresent()方法的结合体,它不管key是否存在,都会调用remappingFunction进行计算。如果key存在,则调用remappingFunction对value进行修改;如果key不存在,则调用remappingFunction进行初始化。
USER_ACCESS_COUNT.compute("Liang",(k,v)->(null==v)?1L:v+1);
merge()方法详解
将ConcurrentHashMap中相同的key和value合并。
- ConcurrentHashMap不存在指定的key时,则把传入的value设置key的值。
- ConcurrentHashMap存在指定的key时,就将传入的新旧value值性自定义逻辑处理,返回最终的结果并设置为key的值。分不同的场景 - 如果写为(oldValue,newValue)->newValue,把当前key的value修改成newValue。- 如果写为(oldValue,newValue)->oldValue,表示保留oldValue,不做修改。- 如果写为(oldValue,newValue)->oldValue+newValue,表示对新老两个值的合并。- 如果写为(oldValue,newValue)->null,删除当前的key。
ConcurrentMap<Integer,Integer> cm = new ConcurrentHashMap<>();
Stream.of(1,2,8,2,5,6,5,8,3,8).forEach(v->{
cm.merge(v,2,Integer::sum);
});
System.out.println(cm);
ConcurrentHashMap的数据结构
在JDK1.8中,ConcurrentHashMap采用数组+链表+红黑树的方式来实现数据的存储,数据结构相比于JDK1.7,做了如下改造:
- 取消了segment分段设计,直接使用Node数组来保存数据,并且采用Node数组元素作为锁的范围,进一步减小了并发冲突的范围和概率。
- 引入红黑树的设计,降低了极端情况下插叙某个节点数据的时间复杂度,从O(n)降低到了O(logn),提高了查找性能。
ConcurrentHashMap在性能和安全性方面也做好了平衡,使用了一些巧妙的设计,主要体现在以下几个方面:
- 分段锁的设计
- 多线程协助实现并发扩容
- 高低位迁移设计
- 链表转红黑树,红黑树转链表
- 降低锁的粒度
ConcurrentHashMap数据存储的相关定义
transientvolatileNode<K,V>[] table;privatestaticfinalsun.misc.UnsafeU;// 初始化容量大小 ---> 初始化前// sizeCtl = -1 表示正在初始化中// 如果另一个线程进入发现sizeCtl = -1 ,他要让出CPU资源片 --->初始化中// 代表扩容阈值 ---> 初始化后privatetransientvolatileint sizeCtl;// 通过反射去获取sizeCtl的值privatestaticfinallongSIZECTL;privatestaticfinallongTRANSFERINDEX;privatestaticfinallongBASECOUNT;privatestaticfinallongCELLSBUSY;privatestaticfinallongCELLVALUE;privatestaticfinallongABASE;privatestaticfinalintASHIFT;static{try{U=sun.misc.Unsafe.getUnsafe();Class<?> k =ConcurrentHashMap.class;SIZECTL=U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));TRANSFERINDEX=U.objectFieldOffset
(k.getDeclaredField("transferIndex"));BASECOUNT=U.objectFieldOffset
(k.getDeclaredField("baseCount"));CELLSBUSY=U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));Class<?> ck =CounterCell.class;CELLVALUE=U.objectFieldOffset
(ck.getDeclaredField("value"));// 该对象在内存中所占有的初始化偏移量Class<?> ak =Node[].class;ABASE=U.arrayBaseOffset(ak);int scale =U.arrayIndexScale(ak);if((scale &(scale -1))!=0)thrownewError("data type scale not a power of two");// 在内存中占有的容量ASHIFT=31-Integer.numberOfLeadingZeros(scale);}catch(Exception e){thrownewError(e);}}// 当前的i在tab中所占有的偏移量的大小staticfinal<K,V>Node<K,V>tabAt(Node<K,V>[] tab,int i){return(Node<K,V>)U.getObjectVolatile(tab,((long)i <<ASHIFT)+ABASE);}
staticclassNode<K,V>implementsMap.Entry<K,V>{// 表示key对应的hash值finalint hash;// 实际存储的keyfinalK key;// 实际存储的valuevolatileV val;// 表示链表结构,next表示的只想下一个node节点的指针volatileNode<K,V> next;Node(int hash,K key,V val,Node<K,V> next){this.hash = hash;this.key = key;this.val = val;this.next = next;}// 切记:hashCode方法和equals方法是配套使用的,如果重写其中的一个,那么另外一个也需要重写publicfinalinthashCode(){return key.hashCode()^ val.hashCode();}publicfinalbooleanequals(Object o){Object k, v, u;Map.Entry<?,?> e;return((o instanceofMap.Entry)&&(k =(e =(Map.Entry<?,?>)o).getKey())!=null&&(v = e.getValue())!=null&&(k == key || k.equals(key))&&(v ==(u = val)|| v.equals(u)));}}
// 最大值:二进制:01000000 00000000 00000000 00000000privatestaticfinalintMAXIMUM_CAPACITY=1<<30;// 链表长度的阈值staticfinalintTREEIFY_THRESHOLD=8;// 扩容的数组最小的值staticfinalintMIN_TREEIFY_CAPACITY=64;// 获取当前可用线程数staticfinalintNCPU=Runtime.getRuntime().availableProcessors();
public ConcurrentHashMap() {
}
publicConcurrentHashMap(int initialCapacity){if(initialCapacity <0)thrownewIllegalArgumentException();int cap =((initialCapacity >=(MAXIMUM_CAPACITY>>>1))?MAXIMUM_CAPACITY:// 设置成距离1.5倍的initialCapacity距离最左边的2的倍数的数字 再扩大2倍// 原因:避免在多线程过程中,频繁扩容造成的性能开销---膨胀阈值设计tableSizeFor(initialCapacity +(initialCapacity >>>1)+1));// 把计算好的容量大小赋值给sizeCtlthis.sizeCtl = cap;}
publicVput(K key,V value){returnputVal(key, value,false);}
finalVputVal(K key,V value,boolean onlyIfAbsent){// 在多线程环境下不允许存在一些歧义现象。if(key ==null|| value ==null)thrownewNullPointerException();// int hash =spread(key.hashCode());int binCount =0;for(Node<K,V>[] tab = table;;){Node<K,V> f;int n, i, fh;if(tab ==null||(n = tab.length)==0)
tab =initTable();// (n-1)&hash ----> 如果n为2的整数倍时,这个式子等同于hash%nelseif((f =tabAt(tab, i =(n -1)& hash))==null){// 更改数组偏移量-->保存这个元素if(casTabAt(tab, i,null,newNode<K,V>(hash, key, value,null)))break;// no lock when adding to empty bin}elseif((fh = f.hash)==MOVED)
tab =helpTransfer(tab, f);// 要么替换,要么解决hash冲突else{V oldVal =null;synchronized(f){if(tabAt(tab, i)== f){if(fh >=0){
binCount =1;for(Node<K,V> e = f;;++binCount){K ek;// 如果hash,key相同,说明做新旧值替换if(e.hash == hash &&((ek = e.key)== key ||(ek !=null&& key.equals(ek)))){
oldVal = e.val;if(!onlyIfAbsent)
e.val = value;break;}// hash相同,维护链表Node<K,V> pred = e;if((e = e.next)==null){
pred.next =newNode<K,V>(hash, key,
value,null);break;}}}elseif(f instanceofTreeBin){Node<K,V> p;
binCount =2;if((p =((TreeBin<K,V>)f).putTreeVal(hash, key,
value))!=null){
oldVal = p.val;if(!onlyIfAbsent)
p.val = value;}}}}// 如果链表长度大于等于8,进行转换处理逻辑if(binCount !=0){if(binCount >=TREEIFY_THRESHOLD)treeifyBin(tab, i);if(oldVal !=null)return oldVal;break;}}}addCount(1L, binCount);returnnull;}
// 减少碰撞,进一步降低hash冲突的几率。// 使用异或运算 保留高低位特性 而且分布均匀// 与操作是为让最高位为0,消除符号位,等到的都是正数。因为负的hashCode在ConcurrentHashMap中有特殊的含义,因此我们需要得到一个正的hashCode。staticfinalintspread(int h){return(h ^(h >>>16))&HASH_BITS;}// 01111111 11111111 11111111 11111111staticfinalintHASH_BITS=0x7fffffff;
privatefinalNode<K,V>[]initTable(){Node<K,V>[] tab;int sc;// 防止在多线程环境下同时存在着多个线程进行初始化 while((tab = table)==null|| tab.length ==0){if((sc = sizeCtl)<0)// 将线程改变成就绪状态,释放CUP资源Thread.yield();elseif(U.compareAndSwapInt(this,SIZECTL, sc,-1)){try{if((tab = table)==null|| tab.length ==0){int n =(sc >0)? sc :DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt =(Node<K,V>[])newNode<?,?>[n];
table = tab = nt;
sc = n -(n >>>2);}}finally{// 设置扩容阈值的大小
sizeCtl = sc;}break;}}return tab;}
当链表长度大于或等于8时,ConcurrentHashMap认为链表已经有点长了,需要考虑优化,有两种方式:
- 对数组进行扩容,当数组长度小于等于64,并且链表的长度大于等于8,优先选择对数组进行扩容。
- 把链表转换为红黑树,当数组长度大于64,并且链表的长度大于等于8,会把链表转化为红黑树。
privatefinalvoidtreeifyBin(Node<K,V>[] tab,int index){Node<K,V> b;int n, sc;if(tab !=null){// 如果当前数组的大小小于64,则进行扩容。if((n = tab.length)<MIN_TREEIFY_CAPACITY)tryPresize(n <<1);elseif((b =tabAt(tab, index))!=null&& b.hash >=0){synchronized(b){if(tabAt(tab, index)== b){TreeNode<K,V> hd =null, tl =null;for(Node<K,V> e = b; e !=null; e = e.next){TreeNode<K,V> p =newTreeNode<K,V>(e.hash, e.key, e.val,null,null);if((p.prev = tl)==null)
hd = p;else
tl.next = p;
tl = p;}setTabAt(tab, index,newTreeBin<K,V>(hd));}}}}}
privatefinalvoidtryPresize(int size){int c =(size >=(MAXIMUM_CAPACITY>>>1))?MAXIMUM_CAPACITY:tableSizeFor(size +(size >>>1)+1);int sc;while((sc = sizeCtl)>=0){Node<K,V>[] tab = table;int n;if(tab ==null||(n = tab.length)==0){
n =(sc > c)? sc : c;if(U.compareAndSwapInt(this,SIZECTL, sc,-1)){try{if(table == tab){@SuppressWarnings("unchecked")Node<K,V>[] nt =(Node<K,V>[])newNode<?,?>[n];
table = nt;
sc = n -(n >>>2);}}finally{
sizeCtl = sc;}}}elseif(c <= sc || n >=MAXIMUM_CAPACITY)break;elseif(tab == table){000000000000000000000000000100001700000000000000000000000000010001000000000000000010000000000000000000000000000000100000000001000110000000000100010000000000001000// 获取rs的值int rs =resizeStamp(n);if(sc <0){Node<K,V>[] nt;if((sc >>>RESIZE_STAMP_SHIFT)!= rs || sc == rs +1||
sc == rs +MAX_RESIZERS||(nt = nextTable)==null||
transferIndex <=0)break;if(U.compareAndSwapInt(this,SIZECTL, sc, sc +1))transfer(tab, nt);}// 第一次扩容,走这个方法。elseif(U.compareAndSwapInt(this,SIZECTL, sc,(rs <<RESIZE_STAMP_SHIFT)+2))transfer(tab,null);}}}// (rs << RESIZE_STAMP_SHIFT) 扩容戳// SIZECTL:低16位,记录参与扩容的线程数
// ******** ******** 1******* ********staticfinalintresizeStamp(int n){// 返回无符号整数n最高位非0位前面的0的个数returnInteger.numberOfLeadingZeros(n)|// 10000000 00000000(1<<(RESIZE_STAMP_BITS-1));}
privatefinalvoidtransfer(Node<K,V>[] tab,Node<K,V>[] nextTab){int n = tab.length, stride;// 计算每个线程处理的区间长度,默认是16if((stride =(NCPU>1)?(n >>>3)/NCPU: n)<MIN_TRANSFER_STRIDE)
stride =MIN_TRANSFER_STRIDE;// 初始化nextTable,第二步初始化transferIndex,默认是老的数组长度if(nextTab ==null){// initiatingtry{@SuppressWarnings("unchecked")Node<K,V>[] nt =(Node<K,V>[])newNode<?,?>[n <<1];
nextTab = nt;}catch(Throwable ex){// try to cope with OOME
sizeCtl =Integer.MAX_VALUE;return;}
nextTable = nextTab;
transferIndex = n;}int nextn = nextTab.length;// 表示一个正在被迁移的NodeForwardingNode<K,V> fwd =newForwardingNode<K,V>(nextTab);// 用来判断是否还有待处理的数据迁移工作boolean advance =true;boolean finishing =false;// to ensure sweep before committing nextTabfor(int i =0, bound =0;;){Node<K,V> f;int fh;// 计算迁移数据的区间while(advance){int nextIndex, nextBound;if(--i >= bound || finishing)
advance =false;elseif((nextIndex = transferIndex)<=0){
i =-1;
advance =false;}elseif(U.compareAndSwapInt
(this,TRANSFERINDEX, nextIndex,
nextBound =(nextIndex > stride ?
nextIndex - stride :0))){
bound = nextBound;
i = nextIndex -1;
advance =false;}}if(i <0|| i >= n || i + n >= nextn){int sc;if(finishing){
nextTable =null;
table = nextTab;
sizeCtl =(n <<1)-(n >>>1);return;}if(U.compareAndSwapInt(this,SIZECTL, sc = sizeCtl, sc -1)){if((sc -2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT)return;
finishing = advance =true;
i = n;// recheck before commit}}// elseif((f =tabAt(tab, i))==null)
advance =casTabAt(tab, i,null, fwd);elseif((fh = f.hash)==MOVED)
advance =true;// already processedelse{synchronized(f){if(tabAt(tab, i)== f){Node<K,V> ln, hn;if(fh >=0){int runBit = fh & n;Node<K,V> lastRun = f;for(Node<K,V> p = f.next; p !=null; p = p.next){int b = p.hash & n;if(b != runBit){
runBit = b;
lastRun = p;}}if(runBit ==0){
ln = lastRun;
hn =null;}else{
hn = lastRun;
ln =null;}for(Node<K,V> p = f; p != lastRun; p = p.next){int ph = p.hash;K pk = p.key;V pv = p.val;if((ph & n)==0)
ln =newNode<K,V>(ph, pk, pv, ln);else
hn =newNode<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);
advance =true;}elseif(f instanceofTreeBin){TreeBin<K,V> t =(TreeBin<K,V>)f;TreeNode<K,V> lo =null, loTail =null;TreeNode<K,V> hi =null, hiTail =null;int lc =0, hc =0;for(Node<K,V> e = t.first; e !=null; e = e.next){int h = e.hash;TreeNode<K,V> p =newTreeNode<K,V>(h, e.key, e.val,null,null);if((h & n)==0){if((p.prev = loTail)==null)
lo = p;else
loTail.next = p;
loTail = p;++lc;}else{if((p.prev = hiTail)==null)
hi = p;else
hiTail.next = p;
hiTail = p;++hc;}}
ln =(lc <=UNTREEIFY_THRESHOLD)?untreeify(lo):(hc !=0)?newTreeBin<K,V>(lo): t;
hn =(hc <=UNTREEIFY_THRESHOLD)?untreeify(hi):(lc !=0)?newTreeBin<K,V>(hi): t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);
advance =true;}}}}}}
版权归原作者 DougLiang 所有, 如有侵权,请联系我们删除。