0


【SpringCloud】RabbitMQ——五种方式实现发送和接收消息

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

1.Basic Queue 简单队列模型

1.1引入依赖

<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

1.2消息发送

在publisher服务的application.yml中添加配置

spring:rabbitmq:host: 192.168.150.101 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: haojiale # 用户名password:123321# 密码

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送

packagecom.example.mq.spring;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message ="hello, spring amqp!";// 发送消息
        rabbitTemplate.convertAndSend(queueName, message);}}

1.3消息接收

在consumer服务的application.yml中添加的配置内容同消息发送一样

在consumer服务中编写监听器,消费消息

packagecom.example.mq.listener;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{System.out.println("spring 消费者接收到消息:【"+ msg +"】");}}

2.WorkQueue

WorkQueue也被称为任务模型。就是让多个消费者绑定到一个队列,共同消费队列中的消息
在这里插入图片描述

当消息处理比较耗时的时候,可能生产消息的速度远远大于消息的消费速度,消息就会堆积越来越多,无法及时处理,使用work模型,多个消费者共同处理消息提升消费速度

2.1消息发送

循环发送消息,模拟大量消息堆积现象

在publisher服务中添加一个测试方法:

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */@TestpublicvoidtestWorkQueue()throwsInterruptedException{// 队列名称String queueName ="simple.queue";// 消息String message ="hello, message_";for(int i =0; i <50; i++){// 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}

2.2消息接收

模拟多个消费者绑定同一个队列,在consumer服务中添加2个新的方法:

@RabbitListener(queues ="simple.queue")publicvoidlistenWorkQueue1(String msg)throwsInterruptedException{System.out.println("消费者1接收到消息:【"+ msg +"】"+LocalTime.now());Thread.sleep(20);}@RabbitListener(queues ="simple.queue")publicvoidlistenWorkQueue2(String msg)throwsInterruptedException{System.err.println("消费者2........接收到消息:【"+ msg +"】"+LocalTime.now());Thread.sleep(200);}

执行测试方法结果是消费者1很快处理完了自己的25条消息,消费者2却在缓慢处理自己的25条消息,也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力是存在问题的。

解决

修改consumer服务的application.yml文件配置,通过设置prefetch来控制消费者预取的消息数量

spring:rabbitmq:listener:simple:prefetch:1# 每次只能获取一条消息,处理完成才能获取下一个消息

3.发布/订阅

发布订阅的模型图如图:
在这里插入图片描述

Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.1 Fanout

在这里插入图片描述

3.1.1声明队列和交换机

在这里插入图片描述

在consumer服务中声明队列和交换机

packagecom.example.mq.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
     * 声明交换机
     * @return Fanout类型交换机
     */@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("example.fanout");}/**
     * 第1个队列
     */@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/**
     * 第2个队列
     */@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

3.1.2消息发送

在publisher服务中编写测试方法发送消息

@TestpublicvoidtestFanoutExchange(){// 队列名称String exchangeName ="example.fanout";// 消息String message ="hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName,"", message);}

3.1.3消息接收

在consumer服务中添加两个方法,作为消费者:

@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【"+ msg +"】");}

3.1.4总结

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败情况下消息丢失
  • FanoutExchange会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

3.2 Direct

在这里插入图片描述

在Direct模型下:

  • 队列与交换机的绑定不能是任意绑定,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不会再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RountingKey与消息的RoutingKey完全一致,才会收到消息

3.2.1消息发送

在这里插入图片描述

在publisher服务中添加测试方法:

@TestpublicvoidtestSendDirectExchange(){// 交换机名称String exchangeName ="itcast.direct";// 消息String message ="红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"red", message);}

3.2.2消息接收——基于注解声明交换机和队列

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue1"),
    exchange =@Exchange(name ="itcast.direct", type =ExchangeTypes.DIRECT),
    key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue2"),
    exchange =@Exchange(name ="itcast.direct", type =ExchangeTypes.DIRECT),
    key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【"+ msg +"】");}

3.2.3总结

Direct交换机与Fanout交换机的区别?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.3 Topic

Topic类型的Exchang可以让队列在绑定Routing key的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:

item.insert

通配符规则:

#

:匹配一个或多个词

*

:匹配不多不少恰好1个词
在这里插入图片描述

举例:

item.#

:能够匹配

item.spu.insert

或者

item.spu
item.*

:只能匹配

item.spu

图示:

3.3.1消息发送

在这里插入图片描述

在publisher服务的SpringAmqpTest类中添加测试方法:

@TestpublicvoidtestSendTopicExchange(){// 交换机名称String exchangeName ="itcast.topic";// 消息String message ="喜报!孙悟空大战哥斯拉,胜!";// 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}

3.3.2消息接收

在consumer服务中添加方法:

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue1"),
    exchange =@Exchange(name ="itcast.topic", type =ExchangeTypes.TOPIC),
    key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue2"),
    exchange =@Exchange(name ="itcast.topic", type =ExchangeTypes.TOPIC),
    key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【"+ msg +"】");}

本文转载自: https://blog.csdn.net/m0_65462447/article/details/141192854
版权归原作者 ~越努力越幸运~ 所有, 如有侵权,请联系我们删除。

“【SpringCloud】RabbitMQ——五种方式实现发送和接收消息”的评论:

还没有评论