一. AMQP协议
什么是AMQP协议
AMQP(Advanced Message Queuing Protocol,高级消息队列协议):它是进程之间传递异步消息的网络协议
AMQP工作过程
发布者通过发布消息,通过交换机,交换机根据路由规则将收到的消息分发交换机绑定的下消息队列,最后AMQP代理将消息推送给订阅了此队列的消费者
或消费者按照需求自行获取。
二. RabbitMQ简介
RabbitMQ是通过Erlang语言基于AMQP协议编写的消息中间件,它在分布式系统中可以解应用耦合、流量削峰、异步消息等问题。它有两个特性
队列排队和异步
- 应用解耦:多个个应用程序之间可通过RabbitMQ作为媒介,两个应用不再粘连,实现解耦;
- 异步消息:多个应用可通过RabbitMQ进行消息传递;
- 流量削峰:在高并发情况下,可以通过RabbitMQ的队列特性实现流量削峰;
- 应用场景: 1. 应用到队列特性的应用场景: 排序算法、秒杀活动。2. 应用到异步特性的应用场景: 消息分发、异步处理、数据同步、处理耗时任务。
三.springBoot整合RabbitMQ
生产者端发送消息
pom文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version></dependency>
yml文件
spring:application:name: producer
rabbitmq:host: xxx
username: admin
password: admin
配置类,需要返回一个Queue,org.springframework.amqp.core.Queue下的Queue对象
@ConfigurationpublicclassRabbitMqConfig{@BeanprotectedQueuequeue(){returnnewQueue("myQueue");}}
使用RabbitMQ发送消息,注入AmqpTemplate,调用convertAndSend()方法
classProducerApplicationTests{@AutowiredprivateAmqpTemplate amqpTemplate;@Testvoidsend(){for(int i =0; i <10; i++){
amqpTemplate.convertAndSend("myQueue","这是发送的消息");System.out.println("发送成功!");}}}
消费端接收消息
配置同生产端,不需要配置RabbitMqConfig,接收消息时只需要使用注解RabbitMqConfig,queues属性绑定相应的队列即可。
@ComponentpublicclassReceiveService{@RabbitListener(queues ="myQueue")publicvoidtest01(String msg){System.out.println("接收到消息1"+ msg);}@RabbitListener(queues ="myQueue")publicvoidtest02(String msg){System.out.println("接收到消息2"+ msg);}@RabbitListener(queues ="myQueue")publicvoidtest03(String msg){System.out.println("接收到消息3"+ msg);}}
四.交换器(四种)
Direct Exchange:直连交换器
它是RabbitMQ的默认交换器,给指定队列发消息,绑定该消息队列的消费者一次获取消息
实战:
/** 生产者发送消息,发送10个消息*/@SpringBootTestclassProducerApplicationTests{@AutowiredprivateAmqpTemplate amqpTemplate;@Testvoidsend(){for(int i =0; i <10; i++){
amqpTemplate.convertAndSend("myQueue","这是发送的消息");System.out.println("发送成功!");}}}
/** 接收消息*/@ComponentpublicclassReceiveService{@RabbitListener(queues ="myQueue")publicvoidtest01(String msg){System.out.println("接收到消息1"+ msg);}@RabbitListener(queues ="myQueue")publicvoidtest02(String msg){System.out.println("接收到消息2"+ msg);}@RabbitListener(queues ="myQueue")publicvoidtest03(String msg){System.out.println("接收到消息3"+ msg);}}
结果:可以看到1、2、3依次接收消息
接收到消息1这是发送的消息
接收到消息2这是发送的消息
接收到消息3这是发送的消息
接收到消息2这是发送的消息
接收到消息3这是发送的消息
接收到消息1这是发送的消息
接收到消息3这是发送的消息
接收到消息1这是发送的消息
接收到消息2这是发送的消息
接收到消息1这是发送的消息
Fanout Exchange:扇形交换器
绑定该交换器的所有队列都可以接收到消息,扇形交换机将消息广播到所有与之绑定的队列。无论消息的路由键是什么,扇形交换机都会将消息发送到所有绑定的队列中。这种类型的交换机常用于实现发布-订阅模式,将消息广播给多个消费者。
实战
/** 绑定*//** Fanout Exchange*/@BeanpublicQueueFanoutExchangeQueue1(){returnnewQueue("fanoutExchangeQueue1");}@BeanpublicQueueFanoutExchangeQueue2(){returnnewQueue("fanoutExchangeQueue2");}@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("amq.fanout");}@BeanpublicBindingFanoutExchangeBinding1(QueueFanoutExchangeQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(FanoutExchangeQueue1).to(fanoutExchange);}@BeanpublicBindingFanoutExchangeBinding2(QueueFanoutExchangeQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(FanoutExchangeQueue2).to(fanoutExchange);}
/** 生产者发送消息*/@TestvoidsendByFanoutExchange(){
amqpTemplate.convertAndSend("amq.fanout","key","这是发送到的消息");System.out.println("发送成功!");}
/** 消费者 Direct Exchange*/@RabbitListener(queues ="fanoutExchangeQueue1")publicvoidtest04(String msg){System.out.println("接收到消息4"+ msg);}@RabbitListener(queues ="fanoutExchangeQueue2")publicvoidtest05(String msg){System.out.println("接收到消息5"+ msg);}
结果:每一个绑定到Fanout Exchange上的队列都可以接收到消息
接收到消息4这是发送到的消息
接收到消息5这是发送到的消息
Topic Exchange:主题交换器
允许在路由键中设置匹配规则:'*‘代表一个字母两个’.'之间的内容;‘#’代表0或多个字符;
实战
/** 绑定*/@BeanpublicQueuetopicExchangeQueue1(){returnnewQueue("topicExchangeQueue1");}@BeanpublicQueuetopicExchangeQueue2(){returnnewQueue("topicExchangeQueue2");}@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("amq.topic");}@BeanpublicBindingTopicExchangeToQueue1(Queue topicExchangeQueue1,TopicExchange topicExchange){returnBindingBuilder.bind(topicExchangeQueue1).to(topicExchange).with("com.shaoby.*");}@BeanpublicBindingTopicExchangeToQueue2(Queue topicExchangeQueue2,TopicExchange topicExchange){returnBindingBuilder.bind(topicExchangeQueue2).to(topicExchange).with("com.shaoby.test.#");}
/**生产者发送消息*//** key为com.shaoby.test*/@TestvoidsendByTopicExchange(){
amqpTemplate.convertAndSend("amq.topic","com.shaoby.test","这是发送到的消息");System.out.println("发送成功!");}/** key为com.shaoby.test.a*/@TestvoidsendByTopicExchange(){
amqpTemplate.convertAndSend("amq.topic","com.shaoby.test.a.b","这是发送到的消息");System.out.println("发送成功!");}
/**消费者接收消息*//**Topic Exchange*/@RabbitListener(queues ="topicExchangeQueue1")publicvoidtest06(String msg){System.out.println("接收到消息6"+ msg);}@RabbitListener(queues ="topicExchangeQueue2")publicvoidtest07(String msg){System.out.println("接收到消息7"+ msg);}
结果:
路由key为com.shaoby.test都能接收到消息,com.shaoby.test.a.b只有topicExchangeQueue2能接收到消息
Header Exchange:首部交换器
绑定:
/** Header Exchange*/@BeanpublicQueueheaderExchangeQueue1(){returnnewQueue("headerExchangeQueue1");}@BeanpublicQueueheaderExchangeQueue2(){returnnewQueue("headerExchangeQueue2");}@BeanpublicHeadersExchangeheadersExchange(){returnnewHeadersExchange("amp.header");}@BeanpublicBindingheadExchangeToQueue1(Queue headerExchangeQueue1,HeadersExchange headersExchange){HashMap<String,Object> map =newHashMap<>();
map.put("type","OK");
map.put("status","200");returnBindingBuilder.bind(headerExchangeQueue1).to(headersExchange).whereAll(map).match();}@BeanpublicBindingheadExchangeToQueue2(Queue headerExchangeQueue2,HeadersExchange headersExchange){HashMap<String,Object> map =newHashMap<>();
map.put("type","error");
map.put("status","500");returnBindingBuilder.bind(headerExchangeQueue2).to(headersExchange).whereAll(map).match();}
/** 生产者发送消息*/@TestvoidsendByHeadExchange(){Map<String,Object> headers =newHashMap<>();
headers.put("type","OK");
headers.put("status","200");String message ="这是发送到的消息";MessageProperties messageProperties =newMessageProperties();
headers.forEach(messageProperties::setHeader);Message msg =newMessage(message.getBytes(), messageProperties);
amqpTemplate.convertAndSend("amp.header",null, msg);System.out.println("发送成功!");}
@RabbitListener(queues ="headerExchangeQueue1")publicvoidtest08(Message msg){System.out.println("接收到消息8:"+ msg.toString());}@RabbitListener(queues ="headerExchangeQueue2")publicvoidtest09(Message msg){System.out.println("接收到消息9:"+ msg.toString());}
结果:只有匹配上header才能收到消息
接收到消息8:(Body:'[B@a7b38a8(byte[24])' MessageProperties [headers={type=OK, status=200}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=amp.header, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-1WTdKW4n_rAEdJUosQD7bg, consumerQueue=headerExchangeQueue1])
版权归原作者 让你三行代码QAQ 所有, 如有侵权,请联系我们删除。