一.环境准备
1、在pom文件中引入对应的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
2、在application.yml配置文件中配置RabbitMQ:
spring:
#rabbitmq配置
rabbitmq:
host:192.168.150.152#rabbitMq的服务器地址
port:5672
username: root #用户名
password:123456 #密码
virtual-host:/hjl #虚拟主机
二、整合
- 点对点,简单模式 ①配置文件中声明队列
@SpringBootConfigurationpublicclassRabbitMqConfig{/**
* hello队列名称
*/publicstaticfinalString HELLO_MSG_QUEUE ="hello.msg.queue";/**
* 声明hello队列
*
* @return
*/@BeanpublicQueuegetHelloQueue(){//参数一:队列名;参数二:是否持久化队列returnnewQueue(HELLO_MSG_QUEUE,true);}}
②创建生产者
@SpringBootTest@RunWith(SpringRunner.class)publicclassRabbitMqTest{@ResourceprivateRabbitTemplate rabbitTemplate;@TestpublicvoidsendHello(){for(int i =0; i <10; i++){
rabbitTemplate.convertAndSend(RabbitMqConfig.HELLO_MSG_QUEUE,"hello world"+ i);}}}
消息发送成功后,在web管理页面查看:
可以看到对应队列中产生了消息
③创建消费者
@ComponentpublicclassRabbitMqConsumer{@RabbitListener(queues =RabbitMqConfig.HELLO_MSG_QUEUE)publicvoidlistenHelloMsg(String message){System.out.println("接受时间:"+System.currentTimeMillis());System.out.println("转发消息是:"+ message);}}
启动项目,可以看到消息成功消费:
- 工作队列(多个消费者对应一个队列) ①声明队列
@SpringBootConfigurationpublicclassRabbitMqConfig{/**
* work队列名称
*/publicstaticfinalString WORK_MSG_QUEUE ="work.msg.queue";/**
* 声明work队列
*
* @return
*/@BeanpublicQueuegetWorkQueue(){//参数一:队列名;参数二:是否持久化队列returnnewQueue(WORK_MSG_QUEUE,true);}}
②创建生产者
@SpringBootTest@RunWith(SpringRunner.class)publicclassWorkMqTest{@ResourceprivateRabbitTemplate rabbitTemplate;@Testpublicvoidsend(){for(int i =0; i <10; i++){
rabbitTemplate.convertAndSend(RabbitMqConfig.WORK_MSG_QUEUE,"这是一条工作队列消息"+ i);}}}
③创建消费者(多个)
@ComponentpublicclassRabbitMqConsumer{@RabbitListener(queues =RabbitMqConfig.WORK_MSG_QUEUE)publicvoidlistenWork1(String message){System.out.println("消费者一转发消息是:"+ message);}@RabbitListener(queues =RabbitMqConfig.WORK_MSG_QUEUE)publicvoidlistenWork2(String message){System.out.println("消费者二转发消息是:"+ message);}}
可以看到两个消费者都成功消费量word队列中的消息
- 发布订阅模式 ①声明队列,交换机并绑定
@SpringBootConfigurationpublicclassRabbitMqConfig{/**
* publish队列1
*/publicstaticfinalString PUBLISH_MSG_QUEUE1 ="publish.msg.queue1";/**
* publish队列2
*/publicstaticfinalString PUBLISH_MSG_QUEUE2 ="publish.msg.queue2";/**
* publish交换机
*/publicstaticfinalString PUBLISH_EXCHANGE ="publish.exchange";/**
* Publish队列
*
* @return
*/@BeanpublicQueuegetPublishQueue1(){returnnewQueue(PUBLISH_MSG_QUEUE1,true);}/**
* Publish队列
*
* @return
*/@BeanpublicQueuegetPublishQueue2(){returnnewQueue(PUBLISH_MSG_QUEUE2,true);}/**
* Publish交换机
*
* @return
*/@BeanpublicFanoutExchangepublishExchange(){FanoutExchange exchange =newFanoutExchange(PUBLISH_EXCHANGE,true,false);return exchange;}/**
* 绑定队列和交换机
*
* @return
*/@BeanpublicBindingbindPublishExchangeQueue1(){Binding binding =BindingBuilder.bind(getPublishQueue1()).to(publishExchange());return binding;}/**
* 绑定队列和交换机
*
* @return
*/@BeanpublicBindingbindPublishExchangeQueue2(){Binding binding =BindingBuilder.bind(getPublishQueue2()).to(publishExchange());return binding;}
②创建生产者
@SpringBootTest@RunWith(SpringRunner.class)publicclassPublishMqTest{@ResourceprivateRabbitTemplate rabbitTemplate;@Testpublicvoidsend(){for(int i =0; i <10; i++){//参数一:交换机名称;参数二:routingKey(广播模式不传);参数三:消息体
rabbitTemplate.convertAndSend(RabbitMqConfig.PUBLISH_EXCHANGE,null,"这是一条工作队列消息"+ i);}}}
③创建消费者(多个)
@ComponentpublicclassPublishConsumer{@RabbitListener(queues =RabbitMqConfig.PUBLISH_MSG_QUEUE1)publicvoidlistenDead1(String message){System.out.println("消费者一接收消息:"+ message);}@RabbitListener(queues =RabbitMqConfig.PUBLISH_MSG_QUEUE2)publicvoidlistenDead2(String message){System.out.println("消费者二接收消息:"+ message);}}
可以看到两个消费之都接收了生产者所有的消息;与工作队列不同的是,工作队列的消费者只消费部分消息,而此模式是消费所有。
- 路由模式
①声明队列
@SpringBootConfigurationpublicclassRabbitMqConfig{/**
* routing交换机
*/publicstaticfinalString ROUTING_EXCHANGE ="routing.exchange";/**
* routing队列1
*/publicstaticfinalString ROUTING_MSG_QUEUE1 ="routing.msg.queue1";/**
* routing队列2
*/publicstaticfinalString ROUTING_MSG_QUEUE2 ="routing.msg.queue2";/**
* routing队列
*
* @return
*/@BeanpublicQueuegetRoutingQueue1(){returnnewQueue(ROUTING_MSG_QUEUE1,true);}/**
* routing队列
*
* @return
*/@BeanpublicQueuegetRoutingQueue2(){returnnewQueue(ROUTING_MSG_QUEUE2,true);}/**
* Publish交换机
*
* @return
*/@BeanpublicDirectExchangeroutingExchange(){DirectExchange exchange =newDirectExchange(ROUTING_EXCHANGE,true,false);return exchange;}/**
* 绑定队列和交换机
*
* @return
*/@BeanpublicBindingbindRoutingExchangeQueue1(){Binding binding =BindingBuilder.bind(getRoutingQueue1()).to(routingExchange()).with("routingKey1");return binding;}/**
* 绑定队列和交换机
*
* @return
*/@BeanpublicBindingbindRoutingExchangeQueue2(){Binding binding =BindingBuilder.bind(getRoutingQueue2()).to(routingExchange()).with("routingKey2");return binding;}}
②创建生产者
@SpringBootTest@RunWith(SpringRunner.class)publicclassRoutingMqTest{@ResourceprivateRabbitTemplate rabbitTemplate;@Testpublicvoidsend(){//参数一:交换机名称;参数二:routingKey(交换机与队列绑定的key);参数三:消息体
rabbitTemplate.convertAndSend(RabbitMqConfig.ROUTING_EXCHANGE,"routingKey1","队列一:这是一条路由消息消息");
rabbitTemplate.convertAndSend(RabbitMqConfig.ROUTING_EXCHANGE,"routingKey2","队列二:这是一条路由消息消息");}}
③创建消费者
@ComponentpublicclassRoutingConsumer{@RabbitListener(queues =RabbitMqConfig.ROUTING_MSG_QUEUE1)publicvoidlistenDead1(String message){System.out.println("消费者一接收消息:"+ message);}@RabbitListener(queues =RabbitMqConfig.ROUTING_MSG_QUEUE2)publicvoidlistenDead2(String message){System.out.println("消费者二接收消息:"+ message);}}
运行结果:
x显而易见:与发布订阅模式不同的是,此模式需要将交换机与队列通过routingKey绑定,并且生产者可以通过指定routingKey,可以将消息发送到指定队列中
- 通配符模式 ①声明队列,交换机
@SpringBootConfigurationpublicclassRabbitConfig{/**
* topic交换机
*/publicstaticfinalString TOPIC_EXCHANGE ="topic.exchange";/**
* topic队列1
*/publicstaticfinalString TOPIC_MSG_QUEUE1 ="topic.msg.queue1";/**
*topic队列2
*/publicstaticfinalString TOPIC_MSG_QUEUE2 ="topic.msg.queue2";/**
* routing队列
*
* @return
*/@BeanpublicQueuegetTopicQueue1(){returnnewQueue(TOPIC_MSG_QUEUE1,true);}/**
* routing队列
*
* @return
*/@BeanpublicQueuegetTopicQueue2(){returnnewQueue(TOPIC_MSG_QUEUE2,true);}/**
* Publish交换机
*
* @return
*/@BeanpublicTopicExchangetopIcExchange(){TopicExchange exchange =newTopicExchange(TOPIC_EXCHANGE,true,false);return exchange;}/**
* 绑定队列和交换机
*
* @return
*/@BeanpublicBindingbindTopicExchangeQueue1(){Binding binding =BindingBuilder.bind(getTopicQueue1()).to(topIcExchange()).with("topKey.*");return binding;}/**
* 绑定队列和交换机
*
* @return
*/@BeanpublicBindingbindTopicExchangeQueue2(){Binding binding =BindingBuilder.bind(getTopicQueue2()).to(topIcExchange()).with("topKey.#");return binding;}}
②创建生产者
@SpringBootTest@RunWith(SpringRunner.class)publicclassTopicMqTest{@ResourceprivateRabbitTemplate rabbitTemplate;@Testpublicvoidsend(){//参数一:交换机名称;参数二:routingKey(广播模式不传);参数三:消息体
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"topic.key1","这是一条通配符模式消息一");
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"topic.key1.key2","这是一条通配符模式消息二");}}
③创建消费者
@ComponentpublicclassTopicConsumer{@RabbitListener(queues =RabbitConfig.TOPIC_MSG_QUEUE1)publicvoidlistenDead1(String message){System.out.println("消费者一接收消息:"+ message);}@RabbitListener(queues =RabbitConfig.TOPIC_MSG_QUEUE2)publicvoidlistenDead2(String message){System.out.println("消费者二接收消息:"+ message);}}
运行结果如下:
可以看到:与路由模式不同的是topic支持通配符模式的路由key;特别的是"*“只能代替一个单词;而”#"可以代替多个;
版权归原作者 随意hjl 所有, 如有侵权,请联系我们删除。