此篇文章主要包含如下三部分
一、什么是RabbitMq及有什么作用
二、安装RabbitMq(win环境下)及访问
三、SpringBoot集成RabbitMq代码部分
一、什么是RabbitMq及有什么作用
1.1、RabbitMq的简介
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
RabbitMQ是一个消息中间件,它接收并转发消息,但不处理消息
1.2、工作原理图
Broker:接收和分发消息的应用,指RabbitMQ Server
Connection:生产者producer / 消费者consumer和 broker之间的TCP连接
Channel:在connection 内部建立的逻辑连接,channel 之间是完全隔离的
Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue中,消费者再去消费。
类型有:直连,主题,扇形
Routing Key:生产者将消息发送到交换机时会携带一个key,来指定路由规则
Binding Key:在绑定Exchange和Queue时,会指定一个BindingKey,生产者发送消息携带的RoutingKey会和bindingKey对比,若一致就将消息分发至这个队列
vHost 虚拟主机:每一个RabbitMQ服务器可以开设多个虚拟主机每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的 “交换机exchange、绑定Binding、队列Queue”,更重要的是每一个vhost拥有独立的权限机制,这样就能安全地使用一个RabbitMQ服务器来服务多个应用程序,其中每个vhost服务一个应用程序。
1.3、应运场景
引自RabbitMQ的应用场景
1.4、工作模式
1.simple (简单模式): 一个消费者消费一个生产者生产的信息
2.Work queues(工作模式):一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次
3.Publish/Subscribe(发布订阅模式): 生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息
4.Routing(路由模式):生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列
5.Topics(主题模式): 生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者
二、安装RabbitMq(win环境下)及访问
2.1、安装erlang
傻瓜式安装,一直下一步安装完成,记录安装路径,不能有中文,后面配置环境变量需要用
2.2、配置环境变量
新建系统变量名为:ERLANG_HOME 变量值为erlang安装地址
双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。
win+R键,输入cmd,命令行输入erl,显示版本号就说明erlang安装成功了。
2.3、安装RabbitMQ
双击下载后的.exe文件,记住安装路径
按win+r,输入cmd进入命令窗口,然后cd到Mq的sbin路径下,输入如下命令进行安装
rabbitmq-plugins enable rabbitmq_management
// 验证是否安装成功
rabbitmqctl status
2.4、浏览器访问
浏览器输入http://127.0.0.1:15672 ,即可看到RabbitMq的登录管理界面
用户名和密码默认是:guest
2.5、安装中问题
打开下面两个文件夹的.erlang.cookie内容,直接复制其中一个内容,到另外一个文件中去。
C:\Windows\System32\config\systemprofile\.erlang.cookie
C:\User\{{自己电脑的用户名}}\.erlang.cookie
按win+r,输入cmd进入命令窗口,然后cd到Mq的sbin路径下,输入如下命令即可
rabbitmqctl status
三、SpringBoot集成RabbitMq代码部分
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
代码分三块编写:
1.配置类:创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系
2.消息生产者:生产消息
3.消息消费者:消费消息
3.1、直接交换机:一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者
配置类:
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchangeOne(){
return new DirectExchange("directExchangeOne");
}
@Bean
public Queue directQueueOne(){
return new Queue("directQueueOne");
}
@Bean
public Queue directQueueTwo(){
return new Queue("directQueueTwo");
}
@Bean
public Binding directBindingOne(){
return BindingBuilder.bind(directQueueOne()).to(directExchangeOne()).with("directKey1");
}
@Bean
public Binding directBindingTwo(){
return BindingBuilder.bind(directQueueTwo()).to(directExchangeOne()).with("directKey2");
}
}
消息生产者:
@Slf4j
@RestController
@RequestMapping("/direct")
public class DirectProductController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/toDirectCliect")
public String toDirectCliect(){
Map<String, String> map = new HashMap<>();
map.put("name","name");
rabbitTemplate.convertAndSend("directExchangeOne","directKey1",map);
Map<String, String> map1 = new HashMap<>();
map1.put("address","address");
rabbitTemplate.convertAndSend("directExchangeOne","directKey2",map1);
return "直连队列消息发送成功";
}
}
消息消费者:多个消费者可以写在同一个类中,分别监听不同的队列即可
@Component
public class DirectClient {
@RabbitListener(queues = "directQueueOne")
@RabbitHandler
public void directClientOne(HashMap<String,String>mes){
System.out.println("直连队列消息1:" + mes);
}
@RabbitListener(queues = "directQueueTwo")
@RabbitHandler
public void directClientTwo(HashMap<String,String>mes){
System.out.println("直连队列消息2:" + mes);
}
}
3.2、主题交换机:模糊匹配队列
*:星号表示任意一个字符
#:表示任意一个或者多个字符
配置类:
@Configuration
public class TopicConfig {
public static final String topicA = "hello.world";
public static final String topicB = "hello.#";
/**
* @Description: 主题交换机
*/
@Bean
public TopicExchange topicExchange1(){
return new TopicExchange("topic_exchange");
}
@Bean
public Queue topicQueue1(){
return new Queue("topic_queue1");
}
@Bean
public Queue topicQueue2(){
return new Queue("topic_queue2");
}
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange1()).with(topicA);
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange1()).with(topicB);
}
}
生产者:
@Slf4j
@RequestMapping("/topic/rabbit")
@RestController
public class TopicProductController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/topicToQueue")
public String topicToQueue(){
Map<String, String> topicMap1 = new HashMap<>();
topicMap1.put("age","1");
rabbitTemplate.convertAndSend("topic_exchange","hello.world",topicMap1);
Map<String, String> topicMap2 = new HashMap<>();
topicMap2.put("school","xxxxxx");
rabbitTemplate.convertAndSend("topic_exchange","hello.#",topicMap2);
return "主题消息发送成功";
}
}
消费者:
@Component
public class TopicClient {
@RabbitHandler
@RabbitListener(queues = "topic_queue1")
public void getTopicQueue1(HashMap<String,String>topicMes1){
System.out.println("主题队列消息1:" + topicMes1);
}
@RabbitHandler
@RabbitListener(queues = "topic_queue2")
public void getTopicQueue2(HashMap<String,String>topicMes2){
System.out.println("主题消息队列2:" + topicMes2);
}
}
3.3、扇形交换机
配置类:
@Configuration
public class FanoutConfig {
/**
* @Description: 扇形交换机
*/
@Bean
public FanoutExchange fanoutExchange1(){
return new FanoutExchange("fanout_exchange1");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout_queue1");
}
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout_queue2");
}
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange1());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange1());
}
}
生产者:
@Slf4j
@RestController
@RequestMapping("/fanout/rabbitMq")
public class FanoutProductController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/fanoutToQueue")
public String fanoutToQueue(){
Map<String, String> fanoutMap1 = new HashMap<>();
fanoutMap1.put("key","11111");
rabbitTemplate.convertAndSend("fanout_exchange1",null,fanoutMap1);
Map<String, String> fanoutMap2 = new HashMap<>();
fanoutMap2.put("value","2222");
rabbitTemplate.convertAndSend("fanout_exchange1",null,fanoutMap2);
return "扇形队列消息发送成功";
}
}
消费者:
@Component
public class FanoutCliect {
@RabbitListener(queues = "fanout_queue1")
@RabbitHandler
public void getFanout1(HashMap<String,String> fanoutMsg1){
System.out.println("扇形队列消息1:" + fanoutMsg1);
}
@RabbitListener(queues = "fanout_queue2")
@RabbitHandler
public void getFanout2(HashMap<String,String> fanoutMsg2){
System.out.println("扇形队列消息2:" + fanoutMsg2);
}
}
一个在学习的开发者,勿喷,欢迎交流
版权归原作者 bug0到1 所有, 如有侵权,请联系我们删除。