0


SpringBoot集成RabbitMq

此篇文章主要包含如下三部分

一、什么是RabbitMq及有什么作用

二、安装RabbitMq(win环境下)及访问

三、SpringBoot集成RabbitMq代码部分

一、什么是RabbitMq及有什么作用

1.1、RabbitMq的简介

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
RabbitMQ是一个消息中间件,它接收并转发消息,但不处理消息

1.2、工作原理图

在这里插入图片描述Broker:接收和分发消息的应用,指RabbitMQ Server
Connection:生产者producer / 消费者consumer和 broker之间的TCP连接
Channel:在connection 内部建立的逻辑连接,channel 之间是完全隔离的
Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue中,消费者再去消费。
类型有:直连,主题,扇形
Routing Key:生产者将消息发送到交换机时会携带一个key,来指定路由规则
Binding Key:在绑定Exchange和Queue时,会指定一个BindingKey,生产者发送消息携带的RoutingKey会和bindingKey对比,若一致就将消息分发至这个队列
vHost 虚拟主机:每一个RabbitMQ服务器可以开设多个虚拟主机每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的 “交换机exchange、绑定Binding、队列Queue”,更重要的是每一个vhost拥有独立的权限机制,这样就能安全地使用一个RabbitMQ服务器来服务多个应用程序,其中每个vhost服务一个应用程序。

1.3、应运场景

引自RabbitMQ的应用场景

1.4、工作模式

1.simple (简单模式): 一个消费者消费一个生产者生产的信息
2.Work queues(工作模式):一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次
3.Publish/Subscribe(发布订阅模式): 生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息
4.Routing(路由模式):生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列
5.Topics(主题模式): 生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者

二、安装RabbitMq(win环境下)及访问

2.1、安装erlang

傻瓜式安装,一直下一步安装完成,记录安装路径,不能有中文,后面配置环境变量需要用

2.2、配置环境变量

新建系统变量名为:ERLANG_HOME 变量值为erlang安装地址
双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。
win+R键,输入cmd,命令行输入erl,显示版本号就说明erlang安装成功了。

2.3、安装RabbitMQ

双击下载后的.exe文件,记住安装路径
按win+r,输入cmd进入命令窗口,然后cd到Mq的sbin路径下,输入如下命令进行安装

rabbitmq-plugins enable rabbitmq_management
// 验证是否安装成功
rabbitmqctl status
2.4、浏览器访问

浏览器输入http://127.0.0.1:15672 ,即可看到RabbitMq的登录管理界面
用户名和密码默认是:guest

2.5、安装中问题

在这里插入图片描述
打开下面两个文件夹的.erlang.cookie内容,直接复制其中一个内容,到另外一个文件中去。

C:\Windows\System32\config\systemprofile\.erlang.cookie
C:\User\{{自己电脑的用户名}}\.erlang.cookie

按win+r,输入cmd进入命令窗口,然后cd到Mq的sbin路径下,输入如下命令即可

rabbitmqctl status

三、SpringBoot集成RabbitMq代码部分

添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置
spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
代码分三块编写:

1.配置类:创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系
2.消息生产者:生产消息
3.消息消费者:消费消息

3.1、直接交换机:一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者

配置类:

@Configuration
public class DirectConfig {

    @Bean
    public DirectExchange directExchangeOne(){
        return new DirectExchange("directExchangeOne");
    }

    @Bean
    public Queue directQueueOne(){
        return new Queue("directQueueOne");
    }

    @Bean
    public Queue directQueueTwo(){
        return new Queue("directQueueTwo");
    }

    @Bean
    public Binding directBindingOne(){
        return BindingBuilder.bind(directQueueOne()).to(directExchangeOne()).with("directKey1");
    }

    @Bean
    public Binding directBindingTwo(){
        return BindingBuilder.bind(directQueueTwo()).to(directExchangeOne()).with("directKey2");
    }
}

消息生产者:

@Slf4j
@RestController
@RequestMapping("/direct")
public class DirectProductController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/toDirectCliect")
    public String toDirectCliect(){
        Map<String, String> map = new HashMap<>();
        map.put("name","name");
        rabbitTemplate.convertAndSend("directExchangeOne","directKey1",map);

        Map<String, String> map1 = new HashMap<>();
        map1.put("address","address");
        rabbitTemplate.convertAndSend("directExchangeOne","directKey2",map1);
        return "直连队列消息发送成功";
    }
}

消息消费者:多个消费者可以写在同一个类中,分别监听不同的队列即可

@Component
public class DirectClient {

    @RabbitListener(queues = "directQueueOne")
    @RabbitHandler
    public void directClientOne(HashMap<String,String>mes){
        System.out.println("直连队列消息1:" + mes);
    }

    @RabbitListener(queues = "directQueueTwo")
    @RabbitHandler
    public void directClientTwo(HashMap<String,String>mes){
        System.out.println("直连队列消息2:" + mes);
    }
}
3.2、主题交换机:模糊匹配队列
    *:星号表示任意一个字符
    #:表示任意一个或者多个字符

配置类:

@Configuration
public class TopicConfig {

    public static final String topicA = "hello.world";
    public static final String topicB = "hello.#";

    /**
     *  @Description: 主题交换机
     */
    @Bean
    public TopicExchange topicExchange1(){
        return new TopicExchange("topic_exchange");
    }

    @Bean
    public Queue topicQueue1(){
        return new Queue("topic_queue1");
    }

    @Bean
    public Queue topicQueue2(){
        return new Queue("topic_queue2");
    }

    @Bean
    public Binding topicBinding1(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange1()).with(topicA);
    }

    @Bean
    public Binding topicBinding2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange1()).with(topicB);
    }
}

生产者:

@Slf4j
@RequestMapping("/topic/rabbit")
@RestController
public class TopicProductController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/topicToQueue")
    public String topicToQueue(){
        Map<String, String> topicMap1 = new HashMap<>();
        topicMap1.put("age","1");
        rabbitTemplate.convertAndSend("topic_exchange","hello.world",topicMap1);

        Map<String, String> topicMap2 = new HashMap<>();
        topicMap2.put("school","xxxxxx");
        rabbitTemplate.convertAndSend("topic_exchange","hello.#",topicMap2);

        return "主题消息发送成功";
    }
}

消费者:

@Component
public class TopicClient {

    @RabbitHandler
    @RabbitListener(queues = "topic_queue1")
    public void getTopicQueue1(HashMap<String,String>topicMes1){
        System.out.println("主题队列消息1:" + topicMes1);
    }

    @RabbitHandler
    @RabbitListener(queues = "topic_queue2")
    public void getTopicQueue2(HashMap<String,String>topicMes2){
        System.out.println("主题消息队列2:" + topicMes2);
    }
}
3.3、扇形交换机

配置类:

@Configuration
public class FanoutConfig {

    /**
     *  @Description: 扇形交换机
     */
    @Bean
    public FanoutExchange fanoutExchange1(){
        return new FanoutExchange("fanout_exchange1");
    }

    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout_queue1");
    }

    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout_queue2");
    }

    @Bean
    public Binding fanoutBinding1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange1());
    }

    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange1());
    }
}

生产者:

@Slf4j
@RestController
@RequestMapping("/fanout/rabbitMq")
public class FanoutProductController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/fanoutToQueue")
    public String fanoutToQueue(){
        Map<String, String> fanoutMap1 = new HashMap<>();
        fanoutMap1.put("key","11111");
        rabbitTemplate.convertAndSend("fanout_exchange1",null,fanoutMap1);

        Map<String, String> fanoutMap2 = new HashMap<>();
        fanoutMap2.put("value","2222");
        rabbitTemplate.convertAndSend("fanout_exchange1",null,fanoutMap2);
        return "扇形队列消息发送成功";
    }
}

消费者:

@Component
public class FanoutCliect {

    @RabbitListener(queues = "fanout_queue1")
    @RabbitHandler
    public void getFanout1(HashMap<String,String> fanoutMsg1){
        System.out.println("扇形队列消息1:" + fanoutMsg1);
    }

    @RabbitListener(queues = "fanout_queue2")
    @RabbitHandler
    public void getFanout2(HashMap<String,String> fanoutMsg2){
        System.out.println("扇形队列消息2:" + fanoutMsg2);
    }
}

一个在学习的开发者,勿喷,欢迎交流


本文转载自: https://blog.csdn.net/m0_55841258/article/details/130473573
版权归原作者 bug0到1 所有, 如有侵权,请联系我们删除。

“SpringBoot集成RabbitMq”的评论:

还没有评论