一、简单模型
创建一个父工程和两个子工程consumer和publisher
1、首先控制台创建一个队列
命名为simple.queue
2、父工程导入依赖
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
3、生产者配置文件
spring:
rabbitmq:
host: 192.168.200.129 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
4、写测试类
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, rabbitmq!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
查看消息
5、消费者配置文件
spring:
rabbitmq:
host: 192.168.200.129 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
6、消费者接收消息
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
结果
二、WorkQueues模型
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了
1、在控制台创建一个新的队列
命名为
work.queue
2、生产者生产消息
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "work.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
3、创建两个消费者接收消息
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】");
}
结果
如果消费者睡眠时间不同
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】");
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】");
Thread.sleep(200);
}
- 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
- 消费者2 sleep了200毫秒,相当于每秒处理5个消息
消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
4、能者多劳充分利用每一个消费者的能力
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢。而最终总的执行耗时也大大提升。 正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
三、交换机
在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
四、Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。 在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
1、 声明队列
创建两个队列
fanout.queue1
和
fanout.queue2
,绑定到交换机
hmall.fanout
2、 创建交换机
创建一个名为
fanout
的交换机,类型是
Fanout
3、 绑定交换机
4、示例
生产者
/**
* Fanout交换机
*/
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "mq.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消费者
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
五、Diect交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
1、 声明队列
首先在控制台声明两个队列
direct.queue1
和
direct.queue2
2、创建交换机
3、绑定交换机
一个队列绑定两个
RoutingKey
4、示例
生产者:
RoutingKey为red
/**
* Direct交换机
*/
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "mq.direct";
// 消息
String message = "hello direct red";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
消费者
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
两个队列都收到消息
RoutingKey为blue
/**
* Direct交换机
*/
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "mq.direct";
// 消息
String message = "hello direct blue";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
只有队列2收到消息
六、Topic交换机
Topic
类型的
Exchange
与
Direct
相比,都是可以根据
RoutingKey
把消息路由到不同的队列。 只不过
Topic
类型
Exchange
可以让队列在绑定
BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以
.
分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
示例
publisher发送的消息使用的
RoutingKey
共有四种:
china.news
代表有中国的新闻消息;china.weather
代表中国的天气消息;japan.news
则代表日本新闻japan.weather
代表日本的天气消息;
解释:
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括: -china.news
-china.weather
topic.queue2
:绑定的是#.news
,凡是以.news
结尾的routing key
都会被匹配。包括: -china.news
-japan.news
1、创建队列
2、创建交换机
3、绑定队列
4、示例
生产者:
RoutingKey为
china.news
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "mq.topic";
// 消息
String message = "hello topis china.news";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消费者
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
RoutingKey为
china.people
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "mq.topic";
// 消息
String message = "hello topis china.people";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.people", message);
}
只有消费者1收到消息
7、、声明队列交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
1、Queue:用于声明队列,可以用工厂类QueueBuilder构建
2、Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
3、Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
1、SpringAMQP提供的类声明
示例:创建Fanout交换机队列
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange2(){
return new FanoutExchange("mq.fanout2");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue3(){
return new Queue("fanout.queue3");
}
/**
* 绑定队列和交换机1
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange3){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue4(){
return new Queue("fanout.queue4");
}
/**
* 绑定队列和交换机2
*/
@Bean
public Binding bindingQueue2(){
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange2());
}
}
direct示例
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("mq.direct2").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding
2、基于注解声明
/**
* 注解声明交换机
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"),//队列名称
exchange = @Exchange(name = "mq.direct", //交换机名称
type = ExchangeTypes.DIRECT),//交换机类型
key = {"red", "blue"}//RoutingKey
))
public void listenDirectQueue3(String msg){
System.out.println("消费者3接收到direct.queue3的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue4"),
exchange = @Exchange(name = "mq.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue4(String msg){
System.out.println("消费者4接收到direct.queue4的消息:【" + msg + "】");
}
队列
交换机
七、消息转换器
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "张三");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}
当发送的数据为Objiet类型时会出现乱码现象,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化
配置JSON转换器
在
publisher
和
consumer
两个服务中都引入依赖:
<dependencies>
<!--mq消息转换为json-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
</dependencies>
注意:如果项目中引入了
spring-boot-starter-web
依赖,则无需再次引入
Jackson
依赖。
配置消息转换器,在
publisher
和
consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
结果
消费者接收Object
我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}
版权归原作者 Poetry-Distance 所有, 如有侵权,请联系我们删除。