0


RabbitMq实现解耦与削峰的方式

执行流程:

生产者者将消息发送到交换机上,交换机通过路由key(注意:广播模式和Header模式没有路由key)将消息分发给队列,然后队列再分发给消费者进行处理

四种基本常见模式:(解耦)

1.Direct Exchange

一对一完全匹配,将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配

工作原理为:

路由模式使用的是

Direct Exchange

上图说明:

交换器 X 是 direct 类型的交换器,绑定的两个队列中,一个队列的 bindingKey 是orange ,另一个队列的 bindingKey 是 black 和 green 。

routingKey 是 orange 的消息发送给队列Q1。

routingKey 是 black 和 green 的消息发送给Q2队列,其他消息丢弃。

2.Topic Exchange

多对多正则匹配,所有发送到Topic Exchange的消息被转发到匹配的RouteKey中指定Topic的Queue。

其中符号#匹配一个或多个词,符号*只能匹配其后面的一个词,比如:

  1. user.# # 可以匹配到 user.add user.add.batch
  2. user.* # 只能匹配到 user.add ,不能匹配到 user.add.batch

下图说明 Topic Exchange工作原理:

3.fanout Exchange(不需要路由key)

不处理路由键,只需将队列绑定到交换机上,发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。因为不处理路由键,所

fanout Exchange

速度最快

fanout

交换器很简单,从名字就可以看出来(用风扇吹出去),将所有收到的消息发送给它知道的所有的队列。

发布订阅模式(Publish/Subscribe)使用的就是

fanout

类型交换机

消息广播给所有订阅该消息的消费者

4.headers Exchange(不需要路由key)

headers Exchange与Direct、topic、fanout不同,它时通过匹配AMQP消息的header而非路由键。

headers Exchange与Direct Exchange类似,性能方面比Direct Exchange差很多,所以在实际项目中用的很少,通过一张图来直观感受一下其工作流程

注意:在声明消息队列时,需要指定header参数,其中

x-match

为特殊的

headers

,为

all

时则表示要匹配所有的header,如果为

any

则表示只要匹配其中的一个header即可

RabbitMq如何实现削峰:

实现原理:

从下游入手,将消息先存放到队列中,先不进行消费,等属于低峰的时候再将数据取出进行消费

如果使用redis如何处理削峰问题?

通过配置redis的过期监听机制

找到redis.config配置文件,找到 EVENT NOTIFICATION (事件通知)这个配置将

notify-keyspace-events ""

修改为

notify-keyspace-events "Ex"

notify-keyspace-events 的参数详解:

# K 键空间通知,所有通知以 __keyspace@__ 为前缀 
# E 键事件通知,所有通知以 __keyevent@__ 为前缀 
# g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知 
# $ 字符串命令的通知 
# l 列表命令的通知 
# s 集合命令的通知 
# h 哈希命令的通知 
# z 有序集合命令的通知 
# x 过期事件:每当有过期键被删除时发送 
# e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送 
# A 参数 g$lshzxe 的别名 
# 输入的参数中至少要有一个 K 或者 E,否则的话,不管其余的参数是什么,都不会有任何 通知被分发。

springBoot中设置RabbitMq的最大最小监听数

1.使用@RabbitListener注解中的concurrency,实现如下:
@RabbitListener(queues = "your.queue.name", concurrency = "5-10") 
public void listen(Message message) { 
        // message processing logic 
}
2.找不到conncurrency属性(猜测是因为版本问题)

解决方案:通过自定义Bean Factory的方式,通过Bean的方式注入最大最小连接数,实现如下:

通过创建**

SimpleRabbitListenerContainerFactory

**的Bean并设定并发消费者的数量来实现

@Bean 
public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) { 
           SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); 
           factory.setConcurrentConsumers(3); // 最小的消费者数量                         
           factory.setMaxConcurrentConsumers(10); // 最大的消费者数量 
           return factory; 
}

@RabbitListener

注解中指定这个工厂:

@RabbitListener(queues = "your.queue.name", containerFactory = "containerFactory") 
public void processMessage(String content) { 
            // 处理消息 
}
标签: rabbitmq java

本文转载自: https://blog.csdn.net/qq_56809170/article/details/138523570
版权归原作者 cabras 所有, 如有侵权,请联系我们删除。

“RabbitMq实现解耦与削峰的方式”的评论:

还没有评论