0


安全无忧:Java并发集合容器的应用与实践

Java 常见并发容器

JDK 提供的这些容器大部分在

java.util.concurrent

包中:

  • ConcurrentHashMap : 线程安全的 HashMap
  • CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector
  • ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列
  • BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道,解决生产者消费者问题
  • ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找

ConcurrentHashMap

我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用

Collections.synchronizedMap()

方法来包装 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。

所以就有了 HashMap 的线程安全版本 ConcurrentHashMap。

实现原理

在 ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

JDK1.7

底层数据结构:Segments 数组 + HashEntry 数组 + 链表,采用分段锁保证安全性。

容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。

JDK1.8

底层数据结构:Synchronized + CAS + Node + 红黑树,Node 的 val 和 next 都用 volatile 保证,保证可见性。查找、替换、赋值操作都使用 CAS。

CAS 是乐观锁,在一些场景中(并发不激烈的情况下)它比 Synchronized 和 ReentrentLock 的效率要高,当 CAS 保障不了线程安全的情况下(扩容、hash 冲突)转成Synchronized 来保证线程安全,大大提高了低并发下的性能。

读取操作的实现
publicVget(Object key){Node<K,V>[] tab;Node<K,V> e, p;int n, eh;K ek;int h =spread(key.hashCode());if((tab = table)!=null&&(n = tab.length)>0&&(e =tabAt(tab,(n -1)& h))!=null){if((eh = e.hash)== h){if((ek = e.key)== key ||(ek !=null&& key.equals(ek)))return e.val;}elseif(eh <0)return(p = e.find(h, key))!=null? p.val :null;while((e = e.next)!=null){if(e.hash == h &&((ek = e.key)== key ||(ek !=null&& key.equals(ek))))return e.val;}}returnnull;}

get 操作全程无锁。get 操作可以无锁是由于 Node 元素的 val 和指针 next 是用 volatile 修饰的,在多线程环境下线程 A 修改节点的 val 或者新增节点的时候是对线程 B 可见的。

写入操作的实现
finalVputVal(K key,V value,boolean onlyIfAbsent){if(key ==null|| value ==null)thrownewNullPointerException();// 根据key的进行hash操作,找到Node数组中的位置int hash =spread(key.hashCode());int binCount =0;// 这边加了一个循环,就是不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject// 因为如果其他线程正在修改tab,那么尝试就会失败,所以这边要加一个for循环,不断的尝试for(Node<K,V>[] tab = table;;){Node<K,V> f;int n, i, fh;if(tab ==null||(n = tab.length)==0)// 先判断Node数组有没有初始化,如果没有初始化先初始化initTable();
            tab =initTable();elseif((f =tabAt(tab, i =(n -1)& hash))==null){// 不存在hash冲突,即该位置是null,直接用CAS插入if(casTabAt(tab, i,null,newNode<K,V>(hash, key, value,null)))break;// no lock when adding to empty bin}elseif((fh = f.hash)==MOVED)// 如果tab[i]不为空并且hash值为MOVED(-1),说明该链表正在进行transfer操作,返回扩容完成后的table。
            tab =helpTransfer(tab, f);else{V oldVal =null;// 存在hash冲突,对链表的头节点或者红黑树的头节点加synchronized锁,进一步减少线程冲突synchronized(f){if(tabAt(tab, i)== f){if(fh >=0){
                        binCount =1;// 如果是链表,就遍历链表for(Node<K,V> e = f;;++binCount){K ek;if(e.hash == hash &&((ek = e.key)== key ||(ek !=null&& key.equals(ek)))){// 如果key相同就执行覆盖操作
                                oldVal = e.val;if(!onlyIfAbsent)
                                    e.val = value;break;}Node<K,V> pred = e;if((e = e.next)==null){// 如果key不同就将元素插入到链表的尾部
                                pred.next =newNode<K,V>(hash, key,
                                                          value,null);break;}}}elseif(f instanceofTreeBin){// 如果是红黑树,就按照红黑树的结构进行插入。Node<K,V> p;
                        binCount =2;if((p =((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value))!=null){
                            oldVal = p.val;if(!onlyIfAbsent)
                                p.val = value;}}}}if(binCount !=0){// 链表长度大于8, Node数组的长度超过64时,会将链表的转化为红黑树if(binCount >=TREEIFY_THRESHOLD)treeifyBin(tab, i);if(oldVal !=null)return oldVal;break;}}}// 计数增加1,有可能触发transfer操作(扩容)。addCount(1L, binCount);returnnull;}
  1. 根据 key 的进行 hash 操作,找到 Node 数组中的位置
  2. 然后加了一个死循环,意思是不断的尝试,因为在 table 的初始化和 casTabAt 用到了 compareAndSwapInt、compareAndSwapObject,因为如果其他线程正在修改table,那么尝试就会失败,所以要加一个 fo 循环,不断的尝试
  3. 之后判断 Node 数组有没有初始化,如果没有初始化先初始化 initTable
  4. 如果不存在 hash 冲突,即该位置是 null,直接用 CAS 插入,之后跳出循环
  5. 如果存在了冲突并且 hash 值为 MOVED(-1),说明该链表正在进行 transfer 操作,获取到扩容完成后的 table,进入下一次的循环
  6. 存在产生 hash 冲突,那么对链表的头节点或者红黑树的头节点加 synchronized 锁,进一步减少线程冲突。 1. 如果是链表,就遍历链表,如果 key 相同就执行覆盖操作,key 不同就追加,之后跳出循环2. 如果是红黑树,就按照红黑树的结构进行插入,之后跳出循环
  7. 循环结束后判断链表长度是否大于8,当同时满足 Node 数组的长度超过 64 时,会将链表的转化为红黑树
  8. 最后计数增加 1,同时有可能触发 transfer 操作(扩容)

CopyOnWriteArrayList

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。

这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。

实现原理

CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

读取操作的实现

privatetransientvolatileObject[] array;publicEget(int index){returnget(getArray(), index);}@SuppressWarnings("unchecked")privateEget(Object[] a,int index){return(E) a[index];}finalObject[]getArray(){return array;}

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

写入操作的实现

publicbooleanadd(E e){finalReentrantLock lock =this.lock;// 加锁
    lock.lock();try{Object[] elements =getArray();int len = elements.length;// 拷贝新数组Object[] newElements =Arrays.copyOf(elements, len +1);
        newElements[len]= e;setArray(newElements);returntrue;}finally{// 释放锁
        lock.unlock();}}

CopyOnWriteArrayList 写入操作在添加元素的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。

ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue 这个队列使用链表作为其数据结构,它在高并发环境中性能表现得非常好。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 底层是通过 CAS 非阻塞算法来实现线程的。

阻塞队列

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,阻塞队列导致线程阻塞。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。

阻塞队列的相关方法如下:

  • 如果将队列当作线程管理工具来使用,将要用到 put 和 take 方法。
  • 当试图向满的队列中添加或从空的队列中移出元素时,add、remove 和 element 操作抛出异常。
  • 在一个多线程程序中, 队列会在任何时候空或满,因此,一定要使用 offer、poll 和 peek 方法作为替代。这些方法如果不能完成任务,只是给出一个错误提示而不会抛出异常。poll 和 peek 方法通过返回空来指示失败。因此,向这些队列中插入 null 值是非法的。
java.util.concurrent

包提供了阻塞队列的几个变种。下面主要介绍一下 3 个常见的实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue 在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性。若设置了公平参数,则那么等待了最长时间的线程会优先得到处理。通常,公平性会降低性能,只有在确实非常需要时才使用它。

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

/**
 * 指定容量构造
 */publicArrayBlockingQueue(int capacity){// 不设置公平参数this(capacity,false);}/**
 * 指定容量和公平参数的构造,fair为true则表示公平,等待越长越优先
 * 底层通过公平锁实现
 */publicArrayBlockingQueue(int capacity,boolean fair){if(capacity <=0)thrownewIllegalArgumentException();this.items =newObject[capacity];
    lock =newReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();}/**
 * 添加一个元素,如果队列满,则阻塞
 */publicvoidput(E e)throwsInterruptedException{checkNotNull(e);// 可重入锁finalReentrantLock lock =this.lock;
    lock.lockInterruptibly();try{while(count == items.length)// 如果队列满,则阻塞
            notFull.await();enqueue(e);}finally{
        lock.unlock();}}/**
 * 移除并返回头元素,如果队列空,则阻塞
 */publicEtake()throwsInterruptedException{// 可重入锁finalReentrantLock lock =this.lock;
    lock.lockInterruptibly();try{while(count ==0)// 如果队列空,则阻塞
            notEmpty.await();returndequeue();}finally{
        lock.unlock();}}

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足先进先出的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于

Integer.MAX_VALUE

其并发控制采用可重入锁 ReentrantLock,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞,尝试从一个空队列中取一个元素也会同样阻塞。

LinkedBlockingDeque 相当于是 LinkedBlockingQueue 的变种,底层基于双向链表实现。

/**
 * 某种意义上的无界队列,容量是Integer.MAX_VALUE
 */publicLinkedBlockingQueue(){this(Integer.MAX_VALUE);}/**
 * 指定容量构造,有界队列
 */publicLinkedBlockingQueue(int capacity){if(capacity <=0)thrownewIllegalArgumentException();this.capacity = capacity;
    last = head =newNode<E>(null);}/**
 * 添加一个元素,如果队列满,则阻塞
 */publicvoidput(E e)throwsInterruptedException{if(e ==null)thrownewNullPointerException();int c =-1;Node<E> node =newNode<E>(e);finalReentrantLock putLock =this.putLock;finalAtomicInteger count =this.count;
    putLock.lockInterruptibly();try{while(count.get()== capacity){
            notFull.await();}enqueue(node);
        c = count.getAndIncrement();if(c +1< capacity)
            notFull.signal();}finally{
        putLock.unlock();}if(c ==0)signalNotEmpty();}/**
 * 移除并返回头元素,如果队列空,则阻塞
 */publicEtake()throwsInterruptedException{E x;int c =-1;finalAtomicInteger count =this.count;finalReentrantLock takeLock =this.takeLock;
    takeLock.lockInterruptibly();try{while(count.get()==0){
            notEmpty.await();}
        x =dequeue();
        c = count.getAndDecrement();if(c >1)
            notEmpty.signal();}finally{
        takeLock.unlock();}if(c == capacity)signalNotFull();return x;}

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,它不是一个先进先出的队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现

compareTo()

方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

PriorityBlockingQueue 底层基于数组实现,默认容量为 11。扩容机制:每次新增检测当前容量,已满的时候进行扩容,假设当前容量为 n,如果 n < 64,那么新容量是 2n+2,否则是 1.5n。

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

DelayQueue:基于 PriorityQueue 的一种特殊队列,里面的元素只有在其到期时才能从队列中取出。


本文转载自: https://blog.csdn.net/weixin_53287520/article/details/134976633
版权归原作者 薛伟同学 所有, 如有侵权,请联系我们删除。

“安全无忧:Java并发集合容器的应用与实践”的评论:

还没有评论