0


Spring Boot中使用RabbitMQ 生产消息和消费消息

引入RabbitMQ依赖

<!-- springboot集成rabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

增加RabbitMQ配置

#rabbitmq配置spring:rabbitmq:host: ip地址
    port:5672username: 账号
    password: 密码
    virtual-host: /

配置RabbitMQ交换机以及队列

packagecom.ckm.ball.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;//rabbitMQ绑定交换机 / 队列@ConfigurationpublicclassRabbitMQConfig{//========================================================RabbitMQ Queue========================================================////创建fanout模式交换机@BeanpublicFanoutExchangefanoutExchangeProcess(){returnnewFanoutExchange("process-data-change-exchange",true,false);}//创建队列@BeanpublicQueueprocessDataChangeQueue(){returnnewQueue("process-data-change-queue",true);}//将队列绑定到交换机@BeanpublicBindingchatBindExchange(){returnBindingBuilder.bind(processDataChangeQueue()).to(fanoutExchangeProcess());}}

编写接口,模拟生产消息

@ResourceprivateRabbitTemplate rabbitTemplate;@GetMapping("/produceMessage")@ApiOperation(value ="生产消息", tags ="测试接口")publicvoidupdateTokenTime(){//生产消息,会到交换机,交换机下发给队列,队列监听到就会消费,执行业务逻辑
    rabbitTemplate.convertAndSend("process-data-change-exchange","process-data-change-queue","hhhhhhhhhhhhhh");}

编写消息监听类,模拟消费消息

packagecom.ckm.ball.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;importjava.util.Date;@Slf4j@ComponentpublicclassRabbitMQDataSyncListenerProcess{//监听process-data-change-queue队列 -> 消费@RabbitListener(queues ="process-data-change-queue")publicvoidorderDead(@PayloadString productIdAndOrderId){

        log.info("当前时间:{},收到队列信息:{}",newDate().toString(), productIdAndOrderId);//执行你的业务逻辑for(int i =0; i <5; i++){System.out.println("循环次数: "+(i +1));try{// 暂停 2000 毫秒(2 秒)Thread.sleep(2000);}catch(InterruptedException e){// 处理异常System.err.println("线程被中断: "+ e.getMessage());}}}}

RabbitMQ 中的交换机的作用

RabbitMQ 中的交换机(Exchange)是消息路由的核心组件。它负责接收来自生产者发送的消息,并根据特定的路由规则将这些消息传递给一个或多个队列(Queue)。
**

交换机的主要功能和类型

**

  1. 消息路由:- 交换机决定消息应该发送到哪些队列,基于绑定(Binding)和路由键(Routing Key)。
  2. 类型:- 直连交换机(Direct Exchange):消息直接发送到与路由键精确匹配的队列。- 主题交换机(Topic Exchange):消息根据路由键模式匹配一个或多个队列,支持通配符。- 扇出交换机(Fanout Exchange):将消息广播到所有绑定的队列,不考虑路由键。- 头交换机(Headers Exchange):通过消息的属性(Headers)进行路由,而不是使用路由键。

**

工作流程

**

  1. 生产者发送消息到交换机。
  2. 交换机根据配置的路由规则和队列的绑定关系,将消息路由到相应的队列。
  3. 消费者从队列中获取消息进行处理。

在我的代码中生产消息语句:

convertAndSend(交换机,路由键也就是队列,你想传递的参数)

在扇出交换机(Fanout Exchange)模式不需要指定路由键,因为指定了也没用。
rabbitTemplate.convertAndSend("process-data-change-exchange","process-data-change-queue","hhhhhhhhhhhhhh");

在扇出交换机(Fanout Exchange)模式,应改成:

rabbitTemplate.convertAndSend("process-data-change-exchange","","hhhhhhhhhhhhhh");

在扇出交换机中,可以将路由键设置为空字符串 “”,因为扇出交换机会将消息发送到所有绑定的队列,而不需要考虑路由键的具体值。

  • 在扇出交换机中,路由键被忽略。
  • 消息会被广播到所有与交换机绑定的队列中。

四种交换机模式

1. 直连交换机(Direct Exchange)

直连交换机:发送到匹配路由键的队列。

// 创建直连交换机@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct-exchange",true,false);}// 创建队列@BeanpublicQueuedirectQueue(){returnnewQueue("direct-queue",true);}// 将队列绑定到直连交换机,同时指定路由键@BeanpublicBindingdirectBinding(){returnBindingBuilder.bind(directQueue()).to(directExchange()).with("direct-routing-key");}

生产消息:

直连交换机生产消息:需要指定路由键。

// 发送消息到直连交换机
rabbitTemplate.convertAndSend("direct-exchange","direct-routing-key","Your message here");

2. 主题交换机(Topic Exchange)

主题交换机:支持模糊匹配路由键。

// 创建主题交换机@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("topic-exchange",true,false);}// 创建队列@BeanpublicQueuetopicQueue(){returnnewQueue("topic-queue",true);}// 将队列绑定到主题交换机,同时指定路由键@BeanpublicBindingtopicBinding(){returnBindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.#");}

生产消息:

主题交换机生产消息:需要指定符合主题模式的路由键。

// 发送消息到主题交换机
rabbitTemplate.convertAndSend("topic-exchange","topic.routing.key","Your message here");

3. 扇出交换机(Fanout Exchange)

扇出交换机:将消息广播到所有绑定的队列。

// 创建扇出交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout-exchange",true,false);}// 创建队列@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout-queue-1",true);}@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout-queue-2",true);}// 将队列绑定到扇出交换机@BeanpublicBindingfanoutBinding1(){returnBindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}@BeanpublicBindingfanoutBinding2(){returnBindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}

生产消息:

扇出交换机生产消息:不需要路由键,使用空字符串即可。

// 发送消息到扇出交换机
rabbitTemplate.convertAndSend("fanout-exchange","","Your message here");

4. 头交换机(Headers Exchange)

头交换机:根据消息头中匹配的属性进行路由。

// 创建头交换机@BeanpublicHeadersExchangeheadersExchange(){returnnewHeadersExchange("headers-exchange",true,false);}// 创建队列@BeanpublicQueueheadersQueue(){returnnewQueue("headers-queue",true);}// 将队列绑定到头交换机,同时指定头属性@BeanpublicBindingheadersBinding(){Map<String,Object> headers =newHashMap<>();
    headers.put("format","pdf");
    headers.put("type","report");returnBindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(headers).match();}

生产消息:

头交换机生产消息:需要构建一个带有头属性的消息。

// 发送消息到头交换机MessageProperties messageProperties =newMessageProperties();
messageProperties.setHeader("format","pdf");
messageProperties.setHeader("type","report");Message message =newMessage("Your message here".getBytes(), messageProperties);
rabbitTemplate.send("headers-exchange","", message);

本文转载自: https://blog.csdn.net/weixin_44912902/article/details/142821457
版权归原作者 令人作呕的溏心蛋 所有, 如有侵权,请联系我们删除。

“Spring Boot中使用RabbitMQ 生产消息和消费消息”的评论:

还没有评论