1.首先我们先看看MQ的常见模型
1.1 基本消息模型
1.2Work Queues消息模型
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,而MQ会采用轮询的方式,分配消息, S所以也就保证了一条消息只会被一个消费者接收
1.3 广播模型
广播模型的就是他发的消息只会发给与他绑定的交换机和对列进行消息,不用指定路由键
1.4Routing消息模型
** 生产者将消息投递到交换机同时指定路由key,消费者根据路由key从对应的消息队列中获取消息进行消费,这个模型必须当双方的路由key相同时才能匹配到**
** 1.5**Topics消息模型
Topics消息模型又称通配符模式,意思就是我们在指定路由键的时候,可以用*.什么什么代替,*就代表任何路由键,然后点后面就是以什么什么去结尾达到一个路由键匹配
2 常见的交换机模式
2.1Fanout:广播,将消息交给所有绑定到交换机的队列 all*
2.2Direct:定向,把消息交给符合指定routing key **的队列 **一堆或一个*
2.3Topic:通配符,把消息交给符合routing pattern**(路由模式)的队列 **一堆或者一个*
3.接下来我们开始搭建Spirngboot整合MQ的使用
3.1我们先导入MQ和springboot的基本依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
3.2配置MQ的yml信息
server:
port: 8080
spring:
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
3.3配置MQ的配置类
3.3.1 在配置类里面我们需要声明交换机 ,声明对列,绑定交换机和对列等
这里我们先玩Fanout广播模式
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author sj
* @data 2022/1/27 15:42
*/
@Configuration
public class RabbitMqConfig {
// 对列1
public static final String QUEUEFAMOUNT ="QUEUEFAMOUNT";
// 对列2
public static final String QUEUEFAMOUNT2 ="QUEUEFAMOUNT";
// 交换机名字
public static final String FAMOUNTEXCHANGE="QUEUEFAMOUNT";
/**
* 声明交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(RabbitMqConfig.FAMOUNTEXCHANGE,true,false);
}
/**
* 声明对列
*/
@Bean
public Queue getQueue(){
return new Queue(RabbitMqConfig.QUEUEFAMOUNT,true,false,false);
}
/**
* 声明对列2
*/
@Bean
public Queue getQueue2(){
return new Queue(RabbitMqConfig.QUEUEFAMOUNT2,true,false,false);
}
/**
* 绑定交换机和对列
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(getQueue()).to(fanoutExchange());
}
/**
* 绑定交换机和对列
*/
@Bean
public Binding binding2(){
return BindingBuilder.bind(getQueue2()).to(fanoutExchange());
}
通过上面我们可以看到我们并没有指定路由键,只是将对列和交换机进行绑定
3.3.2创建一个生产者发送消息
ackage cn.config.controller;
import cn.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author sj
* @data 2022/1/27 15:50
*/
@RestController
public class Product {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/famount")
public String faSon(){
for (int i=1; i<50 ; i++){
rabbitTemplate.convertAndSend(RabbitMqConfig.FAMOUNTEXCHANGE,"你好广播模式");
}
return "消费者消息发送成功";
}
}
我这里写的是应该controller模拟生产者发送消息,首先我们在里面注入 RabbitTemplate然后调用convertAndSend发送消息,因为我们现在是广播模式,所以我们只需要向交换机发送消息即可,后面则是消息内容 ,有兴趣的朋友点进去看看源码,了解里面携带的参数
3.3.3创建一个消费者
/**
* @author sj
* @data 2022/1/27 15:57
*/
@Component
public class Consumer {
@RabbitListener(queues = RabbitMqConfig.QUEUEFAMOUNT)
public void listenerQueue(String msg){
System.out.println("我是对列1"+msg);
}
@RabbitListener(queues = RabbitMqConfig.QUEUEFAMOUNT2)
public void listenerQueue2(String msg){
System.out.println("我是对列2"+msg);
}
消费者里面我们只需要去注意 @RabbitListener(queues =“监听的对列”)去加上这个注解监听对列,拿到消息去消费,然后我们现在去访问 http://localhost:8080//famount
消息已经被我们拿到消费了,上面我也讲到过**Fanout模式他采用的是轮询的方式分配消息的,有兴趣的朋友可以去了解下轮询,
4.现在我们玩玩Topic通配符模式
- ** 4.1 修改MQ配置类*
public static final String T0PIC_Exchange = "T0PIC_Exchange";
public static final String TEST_QUEUE= "TopicExchange";
/**
* 声明交换机
*/
@Bean
public TopicExchange getExchange(){
return new TopicExchange(T0PIC_Exchange,true,false);
}
/**
* 声明对列
*/
@Bean
public Queue getQueue(){
return new Queue(TEST_QUEUE,true,false,false);
}
/**
* 绑定交换机对对列
*/
@Bean
public Binding bindingQueueExchange(){
return BindingBuilder.bind(getQueue()).to(getExchange()).with("*.w");
}
4.2 修改生产者*
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/topic")
public String faSon(){
for (int i=1; i<10 ; i++) {
rabbitTemplate.convertAndSend(RabbitMqConfig.T0PIC_Exchange, "3232.w", "你好Topic");
}
return "消费者消息发送成功";
}
这里的*代表匹配不多不少恰好1个词,#:匹配一个或多个词
4,3消费者
@RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
public void listenerQueue(String msg){
System.out.println("我是TOPIC"+msg);
}
4.4 拿到消息,消费成功
现在我们在去修改一下生产者
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/topic")
public String faSon(){
for (int i=1; i<10 ; i++) {
rabbitTemplate.convertAndSend(RabbitMqConfig.T0PIC_Exchange, "3232", "你好Topic");
}
return "消费者消息发送成功";
}
可以看到我们把路由键改了一下,然后我们在去测试,
很明显这次我们并没有收到消息消费,因为这么这里选择的topic模式,我们在绑定对列和交换机的时候指定了路由键以w结尾,而我们发送消息的时候的路由键不是以w结尾,所以并不能接收消息消费
5.订阅模型-Direct
5.1修改MQ配置
public static final String DIRECT = "DIRECT";
public static final String TEST_QUEUE= "TopicExchange";
/**
* 声明交换机
*/
@Bean
public DirectExchange getExchange(){
return new DirectExchange(DIRECT,true,false);
}
/**
* 声明对列
*/
@Bean
public Queue getQueue(){
return new Queue(TEST_QUEUE,true,false,false);
}
/**
* 绑定交换机对对列
*/
@Bean
public Binding bindingQueueExchange(){
return BindingBuilder.bind(getQueue()).to(getExchange()).with("direct");
}
5.2修改生产者
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/topic")
public String faSon(){
for (int i=1; i<10 ; i++) {
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT, "direct", "direct");
}
return "消费者消息发送成功";
}
这里的路由键必须和绑定的时候的路由键一样才行,不然就不能向指定的交换机发送消息,消费者也就接收不到消息
5.3 生产者
@RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
public void listenerQueue(String msg){
System.out.println("我是dierct"+msg);
}
测试
成功,direct定向模式,就是他只会向指定路由键一模一样的发送消息,
6下一章,会继续去见MQ的回调,以及异常的处理
版权归原作者 墨家巨子-俏如来(笛子) 所有, 如有侵权,请联系我们删除。