0


RabbitMQ 消息队列使用

文章目录

MQ

同步调用和异步调用

同步调用优点:

时效性强,立即得到结果

缺点:

  • 耦合度高 新业务新需求到来时,需要修改代码
  • 性能和吞吐能力下降 调用服务的响应时间为所有服务的时间之和
  • 资源浪费 调用链中的服务在等待时不会释放请求占用的资源
  • 级联失败 一个服务执行失败会导致调用链后续所有服务失败

异步调用优点:

  • 服务解耦 便于扩展
  • 性能提高 吞吐量提高
  • 不会级联失败
  • 流量削峰

RabbitMQ

基础概念

  • channel: 操作MQ工具
  • exchange: 交换机, 将消息路由到队列中
  • queue: 保存消息的队列
  • virtual host: 虚拟主机, 相当于namespace,隔离的环境,对queue和exchange的逻辑分组

模型

基于 Spring Amqp

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
简单队列 (Hello-World)

在这里插入图片描述

消息发送者和接收者都需要以下配置:

spring:rabbitmq:port:5672host: localhost
    virtual-host: /
    username: guest
    password: guest

发送消息:

@SpringBootTest@RunWith(SpringRunner.class)publicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidhelloWorldModel(){String queueName="zbq.queue1";String message="hello, spring amqp";
        rabbitTemplate.convertAndSend(queueName,message);}}

消息接受:

@Component@Slf4jpublicclassSpringRabbitListener{@RabbitListener(queues ="zbq.queue1")publicvoidlistenSimpleQueue(String msg){
        log.info("收到消息: "+msg);}}

在这里插入图片描述

工作队列 (Work Queue)

在这里插入图片描述

发送者:

@TestpublicvoidworkQueueModel(){String queueName="zbq.work.queue";String msg="hello, amqp ";for(int i=0;i<1000;i++){
            rabbitTemplate.convertAndSend(queueName,msg+i);}}

接收者:

@RabbitListener(queues ="zbq.work.queue")publicvoidlistenWorkQueue1(String msg)throwsInterruptedException{
    log.info("消费者1号接收到消息: "+msg);Thread.sleep(20);}@RabbitListener(queues ="zbq.work.queue")publicvoidlistenWorkQueue2(String msg)throwsInterruptedException{
    log.info("消费者2号接收到消息: "+msg);Thread.sleep(100);}

在这里插入图片描述

Pub/Sub (Fanout exchange)

在这里插入图片描述

Fanout交换将将消息发送到每一个绑定到它的队列中

  1. 声明一个FanoutExchange,声明2个队列, 绑定队列到FanoutExchange上
@ConfigurationpublicclassFanoutConfig{@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("zbq.fanout");}@BeanpublicQueuefanoutQueue1(){returnnewQueue("zbq.fanout.queue1");}@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicQueuefanoutQueue2(){returnnewQueue("zbq.fanout.queue2");}@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

2.消费者监听这两个队列

@RabbitListener(queues ="zbq.fanout.queue1")publicvoidlistenFanoutQueue1(String msg){
    log.info("消费者1收到Fanout消息: "+msg);}@RabbitListener(queues ="zbq.fanout.queue2")publicvoidlistenFanoutQueue2(String msg){
    log.info("消费者2收到Fanout消息: "+msg);}

3.发消息到fanoutexchange

@TestpublicvoidfanoutModel(){String exchangeName="zbq.fanout";String msg="hello, fanout ";for(int i=0;i<10;i++){
        rabbitTemplate.convertAndSend(exchangeName,"",msg+i);}}

查看消费者输出信息

在这里插入图片描述

Direct Exchange

Direct交换机会将消息按照路由规则发送到指定的队列

在这里插入图片描述

1.声明交换机, 队列,并绑定,添加routingkey

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="zbq.direct.queue1"),
    exchange =@Exchange(name="zbq.direct",type =ExchangeTypes.DIRECT),
    key ={"girlfriend","family"}))publicvoidlistenDirectQueue1(String msg){
    log.info("消费者1收到Direct消息: "+msg);}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="zbq.direct.queue2"),
    exchange =@Exchange(name="zbq.direct",type =ExchangeTypes.DIRECT),
    key ={"friend","family"}))publicvoidlistenDirectQueue2(String msg){
    log.info("消费者2收到Direct消息: "+msg);}

2.发送消息

@TestpublicvoiddirectModel(){String exchangeName="zbq.direct";String msg="晚上回去吃饭 ";for(int i =0; i <10; i++){
        rabbitTemplate.convertAndSend(exchangeName,"family",msg+i);}}@TestpublicvoiddirectModel2(){String exchangeName="zbq.direct";String msg="hello, direct";for(int i =0; i <10; i++){
        rabbitTemplate.convertAndSend(exchangeName,"girlfriend",msg+i);}}

在这里插入图片描述

在这里插入图片描述

Topic Exchange

话题交换机的routingkey 必须是多个单词的列表,并以

.

分隔

可以使用通配符

#

*
#

:代表0个或者多个单词

*

:代表1个单词

在这里插入图片描述

1.定义

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="zbq.topic.queue1"),
    exchange =@Exchange(name="zbq.topic",type =ExchangeTypes.TOPIC),
    key ="China.#"))publicvoidlistenTopicQueue1(String msg){
    log.info("消费者1收到Topic消息: "+msg);}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="zbq.topic.queue2"),
    exchange =@Exchange(name="zbq.topic",type =ExchangeTypes.TOPIC),
    key ="#.weather"))publicvoidlistenTopicQueue2(String msg){
    log.info("消费者2收到Topic消息: "+msg);}

2.发消息

@TestpublicvoidtopicModel(){String exchangeName="zbq.topic";String msg="首都北京,今日气温10摄氏度";
    rabbitTemplate.convertAndSend(exchangeName,"China.weather",msg);}

在这里插入图片描述

序列化方式

@Testpublicvoidtest(){Map<String,Object> map=newHashMap<>();
    map.put("hair","long");
    map.put("eyes","big");
    rabbitTemplate.convertAndSend("zbq.queue1",map);}

发送对象类型过去, 查看序列化后的值

在这里插入图片描述

RabbitMQ默认使用JDK自带序列化

引入以下依赖修改序列化方法:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

注入Bean

@BeanpublicMessageConvertercustomMC(){returnnewJackson2JsonMessageConverter();}

在这里插入图片描述

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/yirenDM/article/details/135700023
版权归原作者 怡人蝶梦 所有, 如有侵权,请联系我们删除。

“RabbitMQ 消息队列使用”的评论:

还没有评论