在上一篇中我们介绍了什么是消息队列(MQ)和消息队列的作用已经常见的几种实现消息代理的技术。这篇我们来详细将以下RabbitMq的使用。
1.RabbitMq的框架
在RabbitMq中有这么几个概念
1.Pubisher(发布者) 发送消息的
2.exchange(交换机)可通过路由将消息进行转发
3.queue(队列)存储信息,等待消费者进行消费信息
4.vistrualHost(虚拟主机) 起到隔离数据的作用,使每个虚拟机都有自己的交换机和队列
2.在控制台中使用RabbitMq
下面我们来发布一条“hello,world”通过simple.direct交换机转发到simple.queue队列中 并被消费
2.1 创建交换机
创建一个名为simple.direct的交换机,可持久化,类型为direct。
2.2 创建队列
创建一个队列名为simple.queue,可持久化的队列。
2.3绑定关系
在上面我们创建了一个队列和一个交换机,而交换机想要把消息进行投送就要和队列绑定关系。
成功之后就会显示
然后我们就发布“hello,world”看simple.queue能收到吗?
结果显示
可以看到使用RabbtiMq控制台发送消息,还是挺简单,只需要按照RabbitMq的架构流程来就可以了。
3.Java中使用RabbitMq
在前面的例子中我们使用了交换机,并且声明了类型是direct,但我们并没介绍。
3.1交换机的四种类型
- Fanout(广播) 将消息交给所有绑定到交换机的队列
- Direct(订阅)基于Routingkey发送给订阅了该消息的队列
- Topic(主题)通配符订阅,将消息发送给RoutingKey匹配通配符的的队列
- 头匹配,基于MQ的消息头匹配,用的较少
3.2 Spring AMQP
官方原生使用:RabbitMQ 教程 - “Hello World!” |兔子MQ
官方的原生使用还是挺麻烦的,而Spring为我们提供了一个基于AMQP协议(高级消息队列协议 Advanced Message Queuing Protocol)的消息队列框架SpringAMQP。
而它的底层正是用RabbtiMq实现的
SpringAMQP提供了三个功能:
- 自动声明队列,交换机和绑定关系
- 基于注解的监听器模式,异步接收信息
- 封装了RabbitTemplate工具,用于发送信息
3.3使用SpringAMQP
下面我们继续来发布一条“hello,world”通过lx.direct交换机继续转发,并创建direct.queue1和direct.queue2接受消息
direct.queue1和交换机的RoutingKey为red
direct.queue2和交换机的Routingkey为blue
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
spring:
rabbitmq:
host: 192.168.198.134 # 主机名
port: 5672 #端口
virtual-host: /?
username: ???
password: ???
目录结构
2.在consumer声明队列和交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class directConfig {
@Bean
public Queue directQueue1() {
return new Queue("direct.queue1");
}
@Bean
public Queue directQueue1() {
//或者 ExchangeBuilder.directExchange()
return new Queue("direct.queue2");
}
@Bean
public DirectExchange(){
return new DirectExchange("lx.direct");
}
@Bean
public Binding BindingDirect() {
return BindingBuilder.bind(directQueue1()).to(DirectExchange).with("red");
}
@Bean
public Binding BindingDirect() {
return BindingBuilder.bind(directQueue2()).to(DirectExchange).with("blue");
}
}
3.发送信息
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage2Queue() {
String exchange = "lx.direct";
String msg = "hello, world";
rabbitTemplate.convertAndSend(exchange,"red", msg);
}
}
而direct.queue2中是没有的,因为交换机是direct类型,direct.queue2中绑定的路由是blue,而我们发送信息到交换机用的路由key是red。
4.消费信息
@Component
public class MqListener {
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue(String msg){
System.out.println("消费者收到" + msg);
}
4.WorkQueues模式
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
在上面的消费信息中我们只定义了一个监听器去监听这个队列,实际上,一个队列往往需要多个监听器(消费者)去消费信息。不然会导致消息的堆积,和影响消息的处理速度
而每个监听器它的处理速度可能是不同的
@RabbitListener(queues = "work.queue")
public void listenSimpleQueue1(String msg) throws InterruptedException {
System.err.println("消费者1 收到" + msg);
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenSimpleQueue2(String msg) throws InterruptedException {
System.out.println("消费者2 收到" + msg);
Thread.sleep(200);
}
在Rabbit中它默认消费信息是轮询的,这样机会导致速度快的和速度慢的处理的消息是一样的多的。而我们希望的是加快数据的处理,所以要添加以下配置
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
5.消息转换器
@Test
void testMsg(){
Map<String, Object> hashMap = new HashMap<>();
hashMap.put("age",20);
hashMap.put("name","lx");
rabbitTemplate.convertAndSend("direct.queue1",hashMap);
}
在控制台中的结果会被序列化,这很不方便阅读和存储。
翻阅源码我们可以看到这个Map类型的类型会被转化为Message类型,
而它默认实现的就是SerializerMessageConverter,这就导致上述结果
解决方法:在Springboot启动类中添加
@Bean
public MessageConverter messageConverter(){
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
版权归原作者 我实在太想进步了 所有, 如有侵权,请联系我们删除。