0


大数据必学Java基础(六十六):BlockingQueue常见子类

BlockingQueue常见子类

一、ArrayBlockingQueue

源码中的注释的解释说明:

1、添加元素

package com.lanson.test05;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author : Lansonli
 */
public class Test01 {
    //这是main方法,程序的入口
    public static void main(String[] args) throws InterruptedException {
        //创建一个队列,队列可以指定容量指定长度3:
        ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
        //添加元素:
        //【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
        //aq.add(null);
        //aq.offer(null);
        //aq.put(null);
        //【2】正常添加元素:
        aq.add("aaa");
        aq.offer("bbb");
        aq.put("ccc");
        System.out.println(aq);//[aaa, bbb, ccc]

        //【3】在队列满的情况下,再添加元素:
        //aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
        //System.out.println(aq.offer("ddd"));//没有添加成功,返回false
        //设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
        //aq.offer("ddd",2, TimeUnit.SECONDS);
        //真正阻塞的方法: put ,如果队列满,就永远阻塞
        aq.put("ddd");
        System.out.println(aq);
    }
}

2、获取元素

package com.lanson.test05;

import javax.sound.midi.Soundbank;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author : Lansonli
 */
public class Test02 {
    //这是main方法,程序的入口
    public static void main(String[] args) throws InterruptedException {
        //创建一个队列,队列可以指定容量指定长度3:
        ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
        aq.add("aaa");
        aq.add("bbb");
        aq.add("ccc");
        //得到头元素但是不移除
        System.out.println(aq.peek());
        System.out.println(aq);
        //得到头元素并且移除
        System.out.println(aq.poll());
        System.out.println(aq);
        //得到头元素并且移除
        System.out.println(aq.take());
        System.out.println(aq);

        //清空元素:
        aq.clear();
        System.out.println(aq);

        System.out.println(aq.peek());//null
        System.out.println(aq.poll());//null
        //设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
        //System.out.println(aq.poll(2, TimeUnit.SECONDS));
        //真正阻塞:队列为空,永远阻塞
        System.out.println(aq.take());

    }
}

3、源码

public class ArrayBlockingQueue<E> {
        //底层就是一个数组:
        final Object[] items;
        //取元素用到的索引,初始结果为0
        int takeIndex;
        //放元素用到的索引,初始结果为0
        int putIndex;
        //数组中元素的个数:
        int count;

        //一把锁:这个锁肯定很多方法中用到了,所以定义为属性,初始化以后可以随时使用
    final ReentrantLock lock;

    //锁伴随的一个等待吃:notEmpty
    private final Condition notEmpty;

    //锁伴随的一个等待吃:notFull
    private final Condition notFull;

        //构造器:
        public ArrayBlockingQueue(int capacity) {//传入队列指定的容量
        this(capacity, false);
    }
        
        public ArrayBlockingQueue(int capacity, boolean fair) {//传入队列指定的容量
                //健壮性考虑
        if (capacity <= 0)
            throw new IllegalArgumentException();
                //初始化底层数组
        this.items = new Object[capacity];
                //初始化锁 和  等待队列
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

        //两个基本方法:一个是入队,一个是出队  ,是其他方法的基础:
        //入队:
        private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;//底层数组赋给items
                //在对应的下标位置放入元素
        items[putIndex] = x;
        if (++putIndex == items.length) //++putIndex putIndex 索引 加1
            putIndex = 0;
                //每放入一个元素,count加1操作
        count++;
        notEmpty.signal();
    }

        //出队:
        private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;//底层数组赋给items
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];//在对应的位置取出元素
        items[takeIndex] = null;//对应位置元素取出后就置为null
        if (++takeIndex == items.length)//++takeIndex 加1操作
            takeIndex = 0;
        count--;//每取出一个元素,count减1操作
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;//将取出的元素作为方法的返回值
    }

}

takeIndex和putIndex置为0的原因:

4、其他的添加或者获取的方法都是依托与这个入队和出队的基础方法

5、感受一下put和take的阻塞

上面的while不可以换为if,因为如果notFull中的线程被激活的瞬间,有其他线程放入元素,那么队列就又满了。

那么沿着await后面继续执行就不可以,所以一定要反复确定队列是否满的,才能放入元素。

二、LinkedBlockingQueue

一个可选择的有边界的队列:

意思就是队列的长度可以指定,也可以不指定

1、添加元素

package com.lanson.test05;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author : Lansonli
 */
public class Test01 {
    //这是main方法,程序的入口
    public static void main(String[] args) throws InterruptedException {
        //创建一个队列,队列可以指定容量指定长度3:
        LinkedBlockingQueue aq = new LinkedBlockingQueue(3);
        //添加元素:
        //【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
        //aq.add(null);
        //aq.offer(null);
        aq.put(null);
        //【2】正常添加元素:
        aq.add("aaa");
        aq.offer("bbb");
        aq.put("ccc");
        System.out.println(aq);//[aaa, bbb, ccc]

        //【3】在队列满的情况下,再添加元素:
        //aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
        //System.out.println(aq.offer("ddd"));//没有添加成功,返回false
        //设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
        //aq.offer("ddd",2, TimeUnit.SECONDS);
        //真正阻塞的方法: put ,如果队列满,就永远阻塞
        aq.put("ddd");
        System.out.println(aq);
    }
}

2、取出元素

package com.lanson.test05;

import javax.sound.midi.Soundbank;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author : Lansonli
 */
public class Test02 {
    //这是main方法,程序的入口
    public static void main(String[] args) throws InterruptedException {
        //创建一个队列,队列可以指定容量指定长度3:
        LinkedBlockingQueue aq = new LinkedBlockingQueue();
        aq.add("aaa");
        aq.add("bbb");
        aq.add("ccc");
        //得到头元素但是不移除
        System.out.println(aq.peek());
        System.out.println(aq);
        //得到头元素并且移除
        System.out.println(aq.poll());
        System.out.println(aq);
        //得到头元素并且移除
        System.out.println(aq.take());
        System.out.println(aq);

        //清空元素:
        aq.clear();
        System.out.println(aq);

        System.out.println(aq.peek());//null
        System.out.println(aq.poll());//null
        //设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
        //System.out.println(aq.poll(2, TimeUnit.SECONDS));
        //真正阻塞:队列为空,永远阻塞
        System.out.println(aq.take());

    }
}

3、特点

ArrayBlockingQueue : 不支持读写同时操作,底层基于数组的。

LinkedBlockingQueue:支持读写同时操作,并发情况下,效率高。底层基于链表。

4、源码

入队操作:

出队操作:

public class LinkedBlockingQueue<E>{
        //内部类Node就是链表的节点的对象对应的类:
        static class Node<E> {
        E item;//封装你要装的那个元素

        Node<E> next;//下一个Node节点的地址

        Node(E x) { item = x; }//构造器
    }
        //链表的长度
        private final int capacity;
        //计数器:
        private final AtomicInteger count = new AtomicInteger();
        //链表的头结点
        transient Node<E> head;
        //链表的尾结点
        private transient Node<E> last;
        //取元素用的锁
        private final ReentrantLock takeLock = new ReentrantLock();
        //等待池
    private final Condition notEmpty = takeLock.newCondition();
    //放元素用的锁
    private final ReentrantLock putLock = new ReentrantLock();
    //等待池
    private final Condition notFull = putLock.newCondition();

        public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);//调用类本类的空构造器,传入正21亿
    }

        public LinkedBlockingQueue(int capacity) {
                //健壮性考虑
        if (capacity <= 0) throw new IllegalArgumentException();
                //给队列指定长度
        this.capacity = capacity;
                //last,head指向一个新的节点,新的节点中 元素为null
        last = head = new Node<E>(null);
    }

        //入队:
        private void enqueue(Node<E> node) {
        last = last.next = node;
    }

        //出队:
        private E dequeue() {
        Node<E> h = head;//h指向了head
        Node<E> first = h.next;//first 指向head的next
        h.next = h; // help GC   h.next指向自己,更容易被GC发现 被GC
        head = first;//head的指向指为first
        E x = first.item;//取出链中第一个元素,给了x
        first.item = null;
        return x;//把x作为方法的返回值
    }
}

5、put的阻塞

阻塞的前提是 队列是固定长度的

三、SynchronousQueue

这个特殊的队列设计的意义:

1、先添加元素

public class Test01 {
    //这是main方法,程序的入口
    public static void main(String[] args) {
        SynchronousQueue sq = new SynchronousQueue();
        sq.add("aaa");
    }
}

直接报错:说队列满了,因为队列没有容量,理解为满也是正常的

2、put方法阻塞

队列是空的,可以理解为队列满了,满的话放入元素 put 一定会阻塞

public class Test01 {
    //这是main方法,程序的入口
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue sq = new SynchronousQueue();
        sq.put("aaa");
    }
}

3、先取再放

package com.lanson.test06;

import java.util.concurrent.SynchronousQueue;

/**
 * @author : Lansonli
 */
public class Test02 {
    //这是main方法,程序的入口
    public static void main(String[] args) {
        SynchronousQueue sq = new SynchronousQueue();

        //创建一个线程,取数据:
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        System.out.println(sq.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        //搞一个线程,往里面放数据:
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sq.put("aaa");
                    sq.put("bbb");
                    sq.put("ccc");
                    sq.put("ddd");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();
    }
}

结果:

4、poll方法

package com.lanson.test06;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author : Lansonli
 */
public class Test02 {
    //这是main方法,程序的入口
    public static void main(String[] args) {
        SynchronousQueue sq = new SynchronousQueue();

        //创建一个线程,取数据:
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        //设置一个阻塞事件:超出事件就不阻塞了
                        Object result = sq.poll(5, TimeUnit.SECONDS);
                        System.out.println(result);
                        if(result == null){
                            break;
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        //搞一个线程,往里面放数据:
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sq.put("aaa");
                    sq.put("bbb");
                    sq.put("ccc");
                    sq.put("ddd");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();
    }
}

注意:取出元素 不能用peek,因为peek不会将元素从队列中拿走,只是查看的效果;

四、PriorityBlockingQueue

带有优先级的阻塞队列。

优先级队列,意味着队列有先后顺序的,数据有不同的权重。

无界的队列,没有长度限制,但是在你不指定长度的时候,默认初始长度为11,也可以手动指定,

当然随着数据不断的加入,底层(底层是数组Object[])会自动扩容,直到内存全部消耗殆尽了,导致 OutOfMemoryError内存溢出 程序才会结束。

不可以放入null元素的,不允许放入不可比较的对象(导致抛出ClassCastException),对象必须实现内部比较器或者外部比较器。

1、添加null数据

public class Test {
    //这是main方法,程序的入口
    public static void main(String[] args) {
        PriorityBlockingQueue pq = new PriorityBlockingQueue();
        pq.put(null);
    }
}

2、添加四个数据

package com.lanson.test07;

/**
 * @author : Lansonli
 */
public class Student implements Comparable<Student> {
    String name;
    int age;

    public Student() {
    }

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    @Override
    public int compareTo(Student o) {
        return this.age - o.age;
    }
}

package com.lanson.test07;

import java.util.concurrent.PriorityBlockingQueue;

/**
 * @author : Lansonli
 */
public class Test02 {
    //这是main方法,程序的入口
    public static void main(String[] args) {
        PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
        pq.put(new Student("nana",18));
        pq.put(new Student("lulu",11));
        pq.put(new Student("feifei",6));
        pq.put(new Student("mingming",21));
        System.out.println(pq);
    }
}

结果:

发现结果并没有按照优先级顺序排列

3、取出数据

package com.lanson.test07;

import java.util.concurrent.PriorityBlockingQueue;

/**
 * @author : Lansonli
 */
public class Test02 {
    //这是main方法,程序的入口
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
        pq.put(new Student("nana",18));
        pq.put(new Student("lulu",11));
        pq.put(new Student("feifei",6));
        pq.put(new Student("mingming",21));
        System.out.println("------------------------------------------");
        System.out.println(pq.take());
        System.out.println(pq.take());
        System.out.println(pq.take());
        System.out.println(pq.take());
    }
}

从结果证明,这个优先级队列,并不是在put数据的时候计算谁在前谁在后

而是取数据的时候,才真正判断谁在前谁在后

优先级:取数据的优先级


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
标签: java jvm 开发语言

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/127096843
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“大数据必学Java基础(六十六):BlockingQueue常见子类”的评论:

还没有评论