1、什么是RabbitMQ
1.1、什么是MQ
MQ就是消息队列,“消息队列”是在消息的传输过程中保存消息的容器。
1.2、RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
2、为什么要用 MQ
- 解耦 ()
- 流量削峰
- 消息发布和管理
- 。。。。。。
3、安装
基于docker 20.10.23安装
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:3.9.9-management
management:是有web管理页面的镜像
访问:http://宿主机ip:15672
用户名和密码都是guest
注意防火墙
4、基本概念
4.1、Queues(队列)
是用来存放消息的队列。
4.1.1、点对点消息
从一个生产者生产消息,到一个消费者消费消息,这种方式就是点对点消息。相当于发短信,张三给李四发短信,张三就是生产者,李四就是消费者。例子举得极端,大家理解意思。
4.1.2、主题
订阅,是一个生产者生产消息,很多订阅了这个生产者的消费者都可以接收到消息。相当于公众号推送文章。
4.2、Exchange(交换器)
和硬件交换机有类似的含义,交换机可以帮助消息找到对应的队列,将消息送达到队列中。
4.2.1、创建交换机
交换机很少在页面中创建,都是通过代码去创建交换机,在5.4.1中详解。
4.2.2、绑定队列
将交换机和队列建立关系。同样很少通过页面创建。
4.2.3、常用的三种类型
- direct:直连,点对点,A发给B。默认的模式。是通过路由键找到队列,是精准匹配,如果没有符合规则的消息会被废弃。
- fanout:扇形,点对面,A发给BCD ,可以收也可以废弃。不通过路由键找队列,通过交换机和队列的绑定找到队列。
- topic:主题,点对面,A发给BCD ,BCD 都接收消息。是通过路由键找队列,模糊匹配方式,通过通配符(*)模糊匹配所有符合规则的,如果没有符合规则的消息会被废弃。
还有一种是headers,是废弃的类型,效率比较低。类似于direct的方式。
路由键
帮助消息能找到对应的交换机和队列。
一个字符串,规约上定义格式是XXX.XXX.XXX,以点为分隔符的形式定义的字符串。
4.3、Broker(消息服务器)
RabbitMQ做分布式时用到的。
4.4、Connections&Channels(连接和信道)
要发送消息到队列,自然要有一个连接才可以,消息时通过Connections连接到RabbitMQ的,然后通过信道找到交换机和队列。
4.5、虚拟主机
是RabbitMQ虚拟出的虚拟机,可以为多个项目服务。
在多项目用到RabbitMQ时,可以创建多个虚拟主机,每个虚拟主机都是沙盒隔离的,相互不会有影响,即使某个虚拟主机宕机,也不会影响到其它的虚拟主机。
5、SpringBoot整合
官方文档:
5.1、添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>compile</scope> </dependency>
5.2、开启RabbitMQ
在启动类上添加注解,以开启RabbitMQ。
@EnableRabbit
5.3、添加配置文件
spring:
rabbitmq:
host: ip
port: 5672
username: guest
password: guest
guest是游客账号,只能在本机访问,外部访问需要创建一个新的账户。
5672端口号是程序访问的端口,15672是web访问的端口。
【怎样创建新的用户】
【注意:用户名admin,密码123456】
【创建用户成功,如下:】
【点击进入,新建用户 admin里】
【设置admin用户的虚拟主机,/ 表示所有。】
5.4、创建交换机、队列和绑定
5.4.1、创建交换机
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = DemoHelloworldApplication.class)
@SuppressWarnings("all")
public class RabbitMqTest {@Autowired private AmqpAdmin amqpAdmin; /** * String name:交换机名称 * boolean durable:是否持久化 * boolean autoDelete:是否自动删除 */ @Test public void createExchange() { String name = "helloworld"; boolean durable = true; boolean autoDelete = false; DirectExchange directExchange = new DirectExchange(name, durable, autoDelete); amqpAdmin.declareExchange(directExchange); log.info("create exchanges {}", "hello"); }
}
5.4.2、创建队列
/** * 队列名称 * 是否持久化 * 是否排他 * 是否自动删除 */ @Test public void createQueue() { Queue queue = new Queue("helloword-queue", true, false, false); String queueName = amqpAdmin.declareQueue(queue); log.info("create queue {}", queueName); }
5.4.3、创建绑定
/** * 目的地名字 * 绑定类型,交换机或队列 * 交换机名称 * 路由键 * 参数(很少使用) */ @Test public void createBinding() { Binding binding = new Binding("helloworld-queue", Binding.DestinationType.QUEUE, "helloworld", "helloworld.queue", null); amqpAdmin.declareBinding(binding); log.info("create binding routeKey {}", "helloworld.queue"); }
5.5、发送消息
5.5.1、发送字符串
@Autowired private RabbitTemplate rabbitTemplate; /** * 交换机名称 * 路由键 * 内容 */ @Test public void sendMessage(){ rabbitTemplate.convertAndSend("helloworld", "helloworld.queue", "hello world !"); }
不是常用功能,一般都会发送对象。
5.5.2、发送对象
是工作中最常用的功能,也是以后代码中90%以上的操作。
(1)添加配置类
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MyMqConfig {@Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
}
(2)创建一个对象类
package org.jsoft.demo.vo;
import lombok.Data;
@Data
public class User {private String name; private String password;
}
(3)发送消息
/** * 交换机名称 * 路由键 * 对象 */ @Test public void sendObject(){ User user = new User(); user.setName("zhangsan"); user.setPassword("123456"); rabbitTemplate.convertAndSend("helloworld", "helloworld.queue", user); }
发送消息时参数传对象即可,根据配置类会自动转成JSON字符串。
6、接受消息
6.1、@RabbitListener
配置了注解的类一定要放到IoC容器中,参数类型要和发送的对象匹配上。
import org.jsoft.demo.vo.User; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMqListener { @RabbitListener(queues = {"helloworld-queue"}) public void getMessage(Message message, User user) { System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println(user); System.out.println("--------------------------------------------------"); System.out.println(message); System.out.println("--------------------------------------------------"); System.out.println(new String(message.getBody())); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); System.out.println("--------------------------------------------------"); } }
package org.jsoft.demo.vo;
import lombok.Data;
@Data
public class User {private String name; private String password;
}
版权归原作者 weixin_65462805 所有, 如有侵权,请联系我们删除。