0


Java多线程案例之阻塞队列

⭐️前面的话⭐️

本篇文章将介绍Java多线程案例,阻塞队列,阻塞队列在普通队列的基础上多了两种情况,一是阻塞队列为空时,如果进行出队操作,会使当前线程阻塞,直到有新元素插入阻塞队列,该线程才被通知继续执行出队操作;二是阻塞队列为满时,如果进行入队操作,会使当前线程阻塞,直到有元素出队时,该线程才会被通知继续执行入队操作。在实际开发中,常常使用消息队列,而消息队列就是阻塞队列,只是在阻塞队列的基础上增加了很多功能。

📒博客主页:未见花闻的博客主页
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
📌本文由未见花闻原创,CSDN首发!
📆首发时间:🌴2022年4月8日🌴
✉️坚持和努力一定能换来诗与远方!
💭参考书籍:📚《java核心技术》,📚《java编程思想》
💬参考在线编程网站:🌐牛客网🌐力扣
博主的码云gitee,平常博主写的程序代码都在里面。
博主的github,平常博主写的程序代码都在里面。
🍭作者水平很有限,如果发现错误,一定要及时告知作者哦!感谢感谢!


📌导航小助手📌


封面区


🍒1.阻塞队列概论

🍇1.1阻塞队列的概念与作用

阻塞队列本质上还是一种队列,遵循先进先出,后进后出的原则,在此基础上,如果出队时阻塞队列为空,则会使当前线程陷入阻塞,直到入队新元素时通知线程继续执行,如果入队时阻塞队列为满,则会使当前线程陷入阻塞,直到出队旧元素时才通知线程进行执行。

🍇1.2标准库中阻塞队列类

java官方也提供了阻塞队列的标准类,主要有下面几个:

  • ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列。
  • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue: 一个不存储元素的阻塞队列。
  • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
  • BlockingQueue接口: 单向阻塞队列实现了该接口。
  • BlockingDeque接口: 双向阻塞队列实现了该接口。

阻塞队列类的核心方法:
方法解释void put(E e) throws InterruptedException带有阻塞特性的入队操作方法E take() throws InterruptedException带有阻塞特性的出队操作方法boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException带有阻塞特性的入队操作方法,并且可以设置最长等待时间E poll(long timeout, TimeUnit unit) throws InterruptedException带有阻塞特性的出队操作方法,并且可以设置最长等待时间public boolean contains(Object o)判断阻塞队列中是否包含某个元素
其他一些普通队列的方法也支持,但是你都使用阻塞队列了,为什么还要使用普通队列的方法呢。

🍇1.3生产者消费者模型

这个模型怎么说呢,嗯…不好说直接看图吧。

生产者消费者模型

生产者消费者是一种高内聚,低耦合的模型,这也是它的优势,特别是在服务器场景中,假设有两个服务器A(请求服务器),B(应用服务器),如果A,B直接传递消息,而不通过阻塞队列,那么当A请求突然暴涨的时候,B服务器的请求也会跟着暴涨,由于B服务器是应用服务器,处理的任务是重量级的,所以该情况B服务器大概率会挂。
请求暴涨
但是,如果使用生产者消费者模型,那么即使A请求暴涨,也不会影响到B,顶多A挂了,应用服务器不会受到影响,这是因为A请求暴涨后,用户的请求都被打包到阻塞队列中(如果阻塞队列有界,则会引起队列阻塞,不会影响到B),B还是以相同的速度处理这些请求,所以生产者消费者模型可以起到“削峰填谷”的作用。
削峰填谷
了解清楚阻塞队列和生产者消费者模型,来简单实现一下,阻塞队列我们就基于数组实现吧,那么就先的实现循环队列。

🍒2.通过循环队列简单实现阻塞队列

🍇2.1循环队列的简单实现

循环队列是基于数组实现的,最重要的就是如何将队列为空状态与满状态区分开来,前面介绍数据结构的时候已经简单实现过了,现在就再简单复习一下,对队列不懂的,先好好学习队列:队列,Queue,Deque接口与LinkedList类。

区分判断空与满状态的方法如下:

不妨设对头索引为

front

,队尾索引为

rear

,顺序表长度为

len

方式1:记录队列元素个数

size

,当

size

的值与顺序表的大小相等时,代表队列已满。

size

值为

0

表示队列为空。
方式2:使用一个

boolean

类型的成员变量

flag

标记,初始为

false

,当每次入队时将

flag

设置为

true

,出队将

flag

设置为

false

,当

rear == front && flag == true

表示队列已满,当

rear == front && flag == false

表示队列为空。
方式3:牺牲一个单位的空间,在每次入队前判断

(rear+1)% len 

是否与

front

相等,如果相等表示队列已满,如果

rear == front

则表示队列为空。

比如我按照方式1创建循环队列,大小为8,如图,size=0为空队列,size=8为满队列。
1-2
1-3
1-4

方式1最简单,我们通过方式1实现循环队列,阻塞队列最核心的就是出队和入队操作,我们重点实现这两个方法。

//循环队列classMyCircularQueue{//队列数据privateint[] elem =newint[100];//队头指针privateint head;//队尾指针privateint tail;//队列元素个数privateint size;//出队头元素publicIntegertake(){if(size ==0){//队列为空returnnull;}int ret = elem[head];
        head++;//作用等价于 head %= elem.lengthif(head >= elem.length){
            head =0;}
        size--;return ret;}//入队尾元素publicvoidput(int val){if(size == elem.length){//队列满return;}
        elem[tail++]= val;//作用等价于 tail %= elem.lengthif(tail >= elem.length){
            tail =0;}
        size++;}}

🍇2.2阻塞队列的简单实现

目前上面实现的循环队列不是线程安全的,由于

take

put

方法都有写操作,直接无脑加锁。

//线程安全的循环队列classMySafeCircularQueue{//队列数据privateint[] elem =newint[100];//队头指针privateint head;//队尾指针privateint tail;//队列元素个数privateint size;//专门的锁对象privatefinalObject locker =newObject();//出队头元素publicIntegertake(){synchronized(locker){if(size ==0){//队列为空returnnull;}int ret = elem[head];
            head++;//作用等价于 head %= elem.lengthif(head >= elem.length){
                head =0;}
            size--;return ret;}}//入队尾元素publicvoidput(int val){synchronized(locker){if(size == elem.length){//队列满return;}
            elem[tail++]= val;//作用等价于 tail %= elem.lengthif(tail >= elem.length){
                tail =0;}
            size++;}}}

好了,重点来了,如何实现阻塞效果,关键是使用

wait

notify

机制:

入队时,队列为满需要使用

wait

方法使线程阻塞,直到有旧元素出队才使用

notify

通知线程执行。
出队时,队列为空需要使用

wait

方法使线程阻塞,直到有新元素入队才使用

notify

通知线程执行。

阻塞有界队列代码:

//基于循环队列实现阻塞队列classMyBlockingQueue{//初始化循环队列privateint[] elem =newint[100];//队头指针privateint head;//队尾指针privateint tail;//元素个数privateint size;//专门的锁对象privatefinalObject locker =newObject();//队头出元素,如果队列为空则阻塞publicIntegertake()throwsInterruptedException{//循环队列为空,需要阻塞线程,直到循环队列入元素后才通知线程继续执行该操作synchronized(locker){if(size ==0){
                locker.wait();}int ret = elem[head++];if(head >= elem.length){
                head =0;}
            size--;//循环队列出元素后,队列就不为满了,可以通知线程继续进行入队操作
            locker.notify();return ret;}}//队尾入元素,如果队列满了,就阻塞publicvoidput(int val)throwsInterruptedException{//循环队列如果满了,则需要使线程阻塞,直到循环队列出元素后才通知线程继续执行该操作synchronized(locker){if(size == elem.length){
                locker.wait();}
            elem[tail++]= val;if(tail >= elem.length){
                tail =0;}
            size++;//循环队列入元素后,队列就不为空了,可以通知线程继续进行出队操作
            locker.notify();}}}

我们来简单实现一个生产者消费者模型来验证一下我们所实现的阻塞队列是否有问题。
生产者生产数字,消费者消费数字,为了使效果更加明显,我们把我们实现的阻塞队列的大小改为

3

,即:

private int[] elem = new int[3];

我们使用

sleep

方法来模拟生产者消费者的生产或消费的频率。

情况1:生产者生产与消费者消费的频率一致

publicclassPCMod{privatestaticfinalMyBlockingQueue queue =newMyBlockingQueue();publicstaticvoidmain(String[] args){//生产者 每秒生产1个Thread producer =newThread(()->{int num =0;while(true){try{System.out.println("生产了:"+ num);
                    queue.put(num++);Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}}});
        producer.start();//消费者 每秒消费1个Thread customer =newThread(()->{while(true){try{int product = queue.take();System.out.println("消费了:"+ product);Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}}});
        customer.start();}}

运行结果:
相同频率
因为生产者与消费者频率一致,所以生产者刚生产好,就立即消费者被消费了。

情况2:生产者生产频率比消费者消费的频率更快

publicclassPCMod{privatestaticfinalMyBlockingQueue queue =newMyBlockingQueue();publicstaticvoidmain(String[] args){//生产者  每秒生产1个Thread producer =newThread(()->{int num =0;while(true){try{System.out.println("生产了:"+ num);
                    queue.put(num++);Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}}});
        producer.start();//消费者 每2秒消费1个Thread customer =newThread(()->{while(true){try{int product = queue.take();System.out.println("消费了:"+ product);Thread.sleep(2000);}catch(InterruptedException e){
                    e.printStackTrace();}}});
        customer.start();}}

运行结果:
生产者快
因为生产者生产快,消费者消费慢,所以阻塞队列满了之后生产者需要等待消费者消费后才能生产,此时生产者步调与消费者一致。

情况3:生产者生产频率比消费者消费的频率更慢

publicclassPCMod{privatestaticfinalMyBlockingQueue queue =newMyBlockingQueue();publicstaticvoidmain(String[] args){//生产者  每秒生产1个Thread producer =newThread(()->{int num =0;while(true){try{System.out.println("生产了:"+ num);
                    queue.put(num++);Thread.sleep(2000);}catch(InterruptedException e){
                    e.printStackTrace();}}});
        producer.start();//消费者 每秒消费1个Thread customer =newThread(()->{while(true){try{int product = queue.take();System.out.println("消费了:"+ product);Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}}});
        customer.start();}}

运行结果:
生产者慢

因为生产者生产慢,消费者消费快,所以阻塞队列为空后,消费者需要等待生产者生产,消费者才能消费,此时消费者步调与生产者一致。

好了,阻塞队列你学会了吗?


下期预告:多线程案例之定时器
觉得文章写得不错的老铁们,点赞评论关注走一波!谢谢啦!
1-99


本文转载自: https://blog.csdn.net/m0_59139260/article/details/123973080
版权归原作者 未见花闻 所有, 如有侵权,请联系我们删除。

“Java多线程案例之阻塞队列”的评论:

还没有评论