RabbitMQ
整体机构及核心概念
- publisher:消息发送者
- consumer:消息的发送者
- queue:队列,存储消息
- exchange:交换机
通过 生产者 到 交换机 到 队列 到 消费者
数据隔离,只能用户只能操作自己的虚拟主机
SpringAMQP
AMQP一种消息通讯协议,该协议与语言和平台无关
消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询给绑定在队列上的每一个消费者。但这并没有考虑消费者是否已经处理完消息(因为每个消费者的消费能力并不一定相同),可能会出现消息堆积。
因此需要修改application。yml,设置preFetch的值为1,确保同一时刻最多投递给消费者一条消息
Work模型
- 多个消费者绑定到一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条在处理下一条(能者多劳),通过配置文件中的prefetch
如何解决消息堆积的问题
- work模型:一个队列绑定多个消费者,加快对信息的处理
- 优化代码,提高信息处理速度。(使用缓存或异步(线程池等)等)【这里不做详细讲述】
交换机
真正的生产环境都会经过exchange来发送消息,而不是直接发送到队列
交换机的作用:
接受publisher发送的消息
将消息按照股则路由到与之绑定的队列
- Fanout:广播Fanout Exchange会将收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
- Direct:定向Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。1. 每一个Queue都与Exchange设置一个BindingKey2. 发布者发送消息时,指定消息的RoutingKey3 .Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
- Topic:话题与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并以.分割
声明队列和交换机
SpringAMQP提供了几个类,用来声明队列,交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExcahngeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
@Configuration
public class FanoutConfig{
//声明FanoutExchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");//交换机名称
}
//声明第1个队列
@Bean
public Queue fanoutQueue1{
return new Queue("fanout.queue1")//队列名称
}
//绑定队列1和交换机
@Bean
public Bindling bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutchange){
return Bindingbuilder.blind(fanout.queue1).to(fanout.Exchange);
}
}
@RabbitListener更便于声明队列和交换机,无需写多次@Bean,更简洁好用
具体格式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name="direct.queue1",durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = Exchange。DIRECT)
key = {"red","blue"}
))
消息转换器
实现不同协议或格式之间消息的转换
- 通过消息转换器可以实现与其他队列系统的互通eg. Kafka
- 也可以将消息转换为不同数据格式eg. JSON,XML
版权归原作者 Ustinianersog 所有, 如有侵权,请联系我们删除。