0


大数据必学Java基础(六十六):BlockingQueue常见子类

BlockingQueue常见子类

一、ArrayBlockingQueue

源码中的注释的解释说明:

1、添加元素

  1. package com.lanson.test05;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * @author : Lansonli
  6. */
  7. public class Test01 {
  8. //这是main方法,程序的入口
  9. public static void main(String[] args) throws InterruptedException {
  10. //创建一个队列,队列可以指定容量指定长度3:
  11. ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
  12. //添加元素:
  13. //【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
  14. //aq.add(null);
  15. //aq.offer(null);
  16. //aq.put(null);
  17. //【2】正常添加元素:
  18. aq.add("aaa");
  19. aq.offer("bbb");
  20. aq.put("ccc");
  21. System.out.println(aq);//[aaa, bbb, ccc]
  22. //【3】在队列满的情况下,再添加元素:
  23. //aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
  24. //System.out.println(aq.offer("ddd"));//没有添加成功,返回false
  25. //设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
  26. //aq.offer("ddd",2, TimeUnit.SECONDS);
  27. //真正阻塞的方法: put ,如果队列满,就永远阻塞
  28. aq.put("ddd");
  29. System.out.println(aq);
  30. }
  31. }

2、获取元素

  1. package com.lanson.test05;
  2. import javax.sound.midi.Soundbank;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * @author : Lansonli
  7. */
  8. public class Test02 {
  9. //这是main方法,程序的入口
  10. public static void main(String[] args) throws InterruptedException {
  11. //创建一个队列,队列可以指定容量指定长度3:
  12. ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
  13. aq.add("aaa");
  14. aq.add("bbb");
  15. aq.add("ccc");
  16. //得到头元素但是不移除
  17. System.out.println(aq.peek());
  18. System.out.println(aq);
  19. //得到头元素并且移除
  20. System.out.println(aq.poll());
  21. System.out.println(aq);
  22. //得到头元素并且移除
  23. System.out.println(aq.take());
  24. System.out.println(aq);
  25. //清空元素:
  26. aq.clear();
  27. System.out.println(aq);
  28. System.out.println(aq.peek());//null
  29. System.out.println(aq.poll());//null
  30. //设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
  31. //System.out.println(aq.poll(2, TimeUnit.SECONDS));
  32. //真正阻塞:队列为空,永远阻塞
  33. System.out.println(aq.take());
  34. }
  35. }

3、源码

  1. public class ArrayBlockingQueue<E> {
  2. //底层就是一个数组:
  3. final Object[] items;
  4. //取元素用到的索引,初始结果为0
  5. int takeIndex;
  6. //放元素用到的索引,初始结果为0
  7. int putIndex;
  8. //数组中元素的个数:
  9. int count;
  10. //一把锁:这个锁肯定很多方法中用到了,所以定义为属性,初始化以后可以随时使用
  11. final ReentrantLock lock;
  12. //锁伴随的一个等待吃:notEmpty
  13. private final Condition notEmpty;
  14. //锁伴随的一个等待吃:notFull
  15. private final Condition notFull;
  16. //构造器:
  17. public ArrayBlockingQueue(int capacity) {//传入队列指定的容量
  18. this(capacity, false);
  19. }
  20. public ArrayBlockingQueue(int capacity, boolean fair) {//传入队列指定的容量
  21. //健壮性考虑
  22. if (capacity <= 0)
  23. throw new IllegalArgumentException();
  24. //初始化底层数组
  25. this.items = new Object[capacity];
  26. //初始化锁 和 等待队列
  27. lock = new ReentrantLock(fair);
  28. notEmpty = lock.newCondition();
  29. notFull = lock.newCondition();
  30. }
  31. //两个基本方法:一个是入队,一个是出队 ,是其他方法的基础:
  32. //入队:
  33. private void enqueue(E x) {
  34. // assert lock.getHoldCount() == 1;
  35. // assert items[putIndex] == null;
  36. final Object[] items = this.items;//底层数组赋给items
  37. //在对应的下标位置放入元素
  38. items[putIndex] = x;
  39. if (++putIndex == items.length) //++putIndex putIndex 索引 加1
  40. putIndex = 0;
  41. //每放入一个元素,count加1操作
  42. count++;
  43. notEmpty.signal();
  44. }
  45. //出队:
  46. private E dequeue() {
  47. // assert lock.getHoldCount() == 1;
  48. // assert items[takeIndex] != null;
  49. final Object[] items = this.items;//底层数组赋给items
  50. @SuppressWarnings("unchecked")
  51. E x = (E) items[takeIndex];//在对应的位置取出元素
  52. items[takeIndex] = null;//对应位置元素取出后就置为null
  53. if (++takeIndex == items.length)//++takeIndex 加1操作
  54. takeIndex = 0;
  55. count--;//每取出一个元素,count减1操作
  56. if (itrs != null)
  57. itrs.elementDequeued();
  58. notFull.signal();
  59. return x;//将取出的元素作为方法的返回值
  60. }
  61. }

takeIndex和putIndex置为0的原因:

4、其他的添加或者获取的方法都是依托与这个入队和出队的基础方法

5、感受一下put和take的阻塞

上面的while不可以换为if,因为如果notFull中的线程被激活的瞬间,有其他线程放入元素,那么队列就又满了。

那么沿着await后面继续执行就不可以,所以一定要反复确定队列是否满的,才能放入元素。

二、LinkedBlockingQueue

一个可选择的有边界的队列:

意思就是队列的长度可以指定,也可以不指定

1、添加元素

  1. package com.lanson.test05;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.LinkedBlockingQueue;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * @author : Lansonli
  7. */
  8. public class Test01 {
  9. //这是main方法,程序的入口
  10. public static void main(String[] args) throws InterruptedException {
  11. //创建一个队列,队列可以指定容量指定长度3:
  12. LinkedBlockingQueue aq = new LinkedBlockingQueue(3);
  13. //添加元素:
  14. //【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
  15. //aq.add(null);
  16. //aq.offer(null);
  17. aq.put(null);
  18. //【2】正常添加元素:
  19. aq.add("aaa");
  20. aq.offer("bbb");
  21. aq.put("ccc");
  22. System.out.println(aq);//[aaa, bbb, ccc]
  23. //【3】在队列满的情况下,再添加元素:
  24. //aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
  25. //System.out.println(aq.offer("ddd"));//没有添加成功,返回false
  26. //设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
  27. //aq.offer("ddd",2, TimeUnit.SECONDS);
  28. //真正阻塞的方法: put ,如果队列满,就永远阻塞
  29. aq.put("ddd");
  30. System.out.println(aq);
  31. }
  32. }

2、取出元素

  1. package com.lanson.test05;
  2. import javax.sound.midi.Soundbank;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.LinkedBlockingQueue;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * @author : Lansonli
  8. */
  9. public class Test02 {
  10. //这是main方法,程序的入口
  11. public static void main(String[] args) throws InterruptedException {
  12. //创建一个队列,队列可以指定容量指定长度3:
  13. LinkedBlockingQueue aq = new LinkedBlockingQueue();
  14. aq.add("aaa");
  15. aq.add("bbb");
  16. aq.add("ccc");
  17. //得到头元素但是不移除
  18. System.out.println(aq.peek());
  19. System.out.println(aq);
  20. //得到头元素并且移除
  21. System.out.println(aq.poll());
  22. System.out.println(aq);
  23. //得到头元素并且移除
  24. System.out.println(aq.take());
  25. System.out.println(aq);
  26. //清空元素:
  27. aq.clear();
  28. System.out.println(aq);
  29. System.out.println(aq.peek());//null
  30. System.out.println(aq.poll());//null
  31. //设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
  32. //System.out.println(aq.poll(2, TimeUnit.SECONDS));
  33. //真正阻塞:队列为空,永远阻塞
  34. System.out.println(aq.take());
  35. }
  36. }

3、特点

ArrayBlockingQueue : 不支持读写同时操作,底层基于数组的。

LinkedBlockingQueue:支持读写同时操作,并发情况下,效率高。底层基于链表。

4、源码

入队操作:

出队操作:

  1. public class LinkedBlockingQueue<E>{
  2. //内部类Node就是链表的节点的对象对应的类:
  3. static class Node<E> {
  4. E item;//封装你要装的那个元素
  5. Node<E> next;//下一个Node节点的地址
  6. Node(E x) { item = x; }//构造器
  7. }
  8. //链表的长度
  9. private final int capacity;
  10. //计数器:
  11. private final AtomicInteger count = new AtomicInteger();
  12. //链表的头结点
  13. transient Node<E> head;
  14. //链表的尾结点
  15. private transient Node<E> last;
  16. //取元素用的锁
  17. private final ReentrantLock takeLock = new ReentrantLock();
  18. //等待池
  19. private final Condition notEmpty = takeLock.newCondition();
  20. //放元素用的锁
  21. private final ReentrantLock putLock = new ReentrantLock();
  22. //等待池
  23. private final Condition notFull = putLock.newCondition();
  24. public LinkedBlockingQueue() {
  25. this(Integer.MAX_VALUE);//调用类本类的空构造器,传入正21亿
  26. }
  27. public LinkedBlockingQueue(int capacity) {
  28. //健壮性考虑
  29. if (capacity <= 0) throw new IllegalArgumentException();
  30. //给队列指定长度
  31. this.capacity = capacity;
  32. //last,head指向一个新的节点,新的节点中 元素为null
  33. last = head = new Node<E>(null);
  34. }
  35. //入队:
  36. private void enqueue(Node<E> node) {
  37. last = last.next = node;
  38. }
  39. //出队:
  40. private E dequeue() {
  41. Node<E> h = head;//h指向了head
  42. Node<E> first = h.next;//first 指向head的next
  43. h.next = h; // help GC h.next指向自己,更容易被GC发现 被GC
  44. head = first;//head的指向指为first
  45. E x = first.item;//取出链中第一个元素,给了x
  46. first.item = null;
  47. return x;//把x作为方法的返回值
  48. }
  49. }

5、put的阻塞

阻塞的前提是 队列是固定长度的

三、SynchronousQueue

这个特殊的队列设计的意义:

1、先添加元素

  1. public class Test01 {
  2. //这是main方法,程序的入口
  3. public static void main(String[] args) {
  4. SynchronousQueue sq = new SynchronousQueue();
  5. sq.add("aaa");
  6. }
  7. }

直接报错:说队列满了,因为队列没有容量,理解为满也是正常的

2、put方法阻塞

队列是空的,可以理解为队列满了,满的话放入元素 put 一定会阻塞

  1. public class Test01 {
  2. //这是main方法,程序的入口
  3. public static void main(String[] args) throws InterruptedException {
  4. SynchronousQueue sq = new SynchronousQueue();
  5. sq.put("aaa");
  6. }
  7. }

3、先取再放

  1. package com.lanson.test06;
  2. import java.util.concurrent.SynchronousQueue;
  3. /**
  4. * @author : Lansonli
  5. */
  6. public class Test02 {
  7. //这是main方法,程序的入口
  8. public static void main(String[] args) {
  9. SynchronousQueue sq = new SynchronousQueue();
  10. //创建一个线程,取数据:
  11. new Thread(new Runnable() {
  12. @Override
  13. public void run() {
  14. while(true){
  15. try {
  16. System.out.println(sq.take());
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. }).start();
  23. //搞一个线程,往里面放数据:
  24. new Thread(new Runnable() {
  25. @Override
  26. public void run() {
  27. try {
  28. sq.put("aaa");
  29. sq.put("bbb");
  30. sq.put("ccc");
  31. sq.put("ddd");
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }).start();
  37. }
  38. }

结果:

4、poll方法

  1. package com.lanson.test06;
  2. import java.util.concurrent.SynchronousQueue;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * @author : Lansonli
  6. */
  7. public class Test02 {
  8. //这是main方法,程序的入口
  9. public static void main(String[] args) {
  10. SynchronousQueue sq = new SynchronousQueue();
  11. //创建一个线程,取数据:
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. while(true){
  16. try {
  17. //设置一个阻塞事件:超出事件就不阻塞了
  18. Object result = sq.poll(5, TimeUnit.SECONDS);
  19. System.out.println(result);
  20. if(result == null){
  21. break;
  22. }
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. }).start();
  29. //搞一个线程,往里面放数据:
  30. new Thread(new Runnable() {
  31. @Override
  32. public void run() {
  33. try {
  34. sq.put("aaa");
  35. sq.put("bbb");
  36. sq.put("ccc");
  37. sq.put("ddd");
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }).start();
  43. }
  44. }

注意:取出元素 不能用peek,因为peek不会将元素从队列中拿走,只是查看的效果;

四、PriorityBlockingQueue

带有优先级的阻塞队列。

优先级队列,意味着队列有先后顺序的,数据有不同的权重。

无界的队列,没有长度限制,但是在你不指定长度的时候,默认初始长度为11,也可以手动指定,

当然随着数据不断的加入,底层(底层是数组Object[])会自动扩容,直到内存全部消耗殆尽了,导致 OutOfMemoryError内存溢出 程序才会结束。

不可以放入null元素的,不允许放入不可比较的对象(导致抛出ClassCastException),对象必须实现内部比较器或者外部比较器。

1、添加null数据

  1. public class Test {
  2. //这是main方法,程序的入口
  3. public static void main(String[] args) {
  4. PriorityBlockingQueue pq = new PriorityBlockingQueue();
  5. pq.put(null);
  6. }
  7. }

2、添加四个数据

  1. package com.lanson.test07;
  2. /**
  3. * @author : Lansonli
  4. */
  5. public class Student implements Comparable<Student> {
  6. String name;
  7. int age;
  8. public Student() {
  9. }
  10. public Student(String name, int age) {
  11. this.name = name;
  12. this.age = age;
  13. }
  14. @Override
  15. public String toString() {
  16. return "Student{" +
  17. "name='" + name + '\'' +
  18. ", age=" + age +
  19. '}';
  20. }
  21. @Override
  22. public int compareTo(Student o) {
  23. return this.age - o.age;
  24. }
  25. }
  26. package com.lanson.test07;
  27. import java.util.concurrent.PriorityBlockingQueue;
  28. /**
  29. * @author : Lansonli
  30. */
  31. public class Test02 {
  32. //这是main方法,程序的入口
  33. public static void main(String[] args) {
  34. PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
  35. pq.put(new Student("nana",18));
  36. pq.put(new Student("lulu",11));
  37. pq.put(new Student("feifei",6));
  38. pq.put(new Student("mingming",21));
  39. System.out.println(pq);
  40. }
  41. }

结果:

发现结果并没有按照优先级顺序排列

3、取出数据

  1. package com.lanson.test07;
  2. import java.util.concurrent.PriorityBlockingQueue;
  3. /**
  4. * @author : Lansonli
  5. */
  6. public class Test02 {
  7. //这是main方法,程序的入口
  8. public static void main(String[] args) throws InterruptedException {
  9. PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
  10. pq.put(new Student("nana",18));
  11. pq.put(new Student("lulu",11));
  12. pq.put(new Student("feifei",6));
  13. pq.put(new Student("mingming",21));
  14. System.out.println("------------------------------------------");
  15. System.out.println(pq.take());
  16. System.out.println(pq.take());
  17. System.out.println(pq.take());
  18. System.out.println(pq.take());
  19. }
  20. }

从结果证明,这个优先级队列,并不是在put数据的时候计算谁在前谁在后

而是取数据的时候,才真正判断谁在前谁在后

优先级:取数据的优先级


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
标签: java jvm 开发语言

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/127096843
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“大数据必学Java基础(六十六):BlockingQueue常见子类”的评论:

还没有评论