0


RabbitMQ工作模式(3) - 订阅模式

概念

发布/订阅模式(Publish/Subscribe)是 RabbitMQ 中常见的一种消息传递模式,用于将消息广播给多个消费者。在这种模式中,消息发送者(发布者)将消息发送到一个交换机(exchange),交换机将消息广播到所有与之绑定的队列,然后消费者(订阅者)可以从这些队列中接收消息。

工作流程

  1. 生产者发送消息: 生产者将消息发送到一个交换机,而不是直接发送到队列。
  2. 交换机将消息广播: 交换机接收到消息后,根据预定义的规则将消息广播到所有与之绑定的队列。这个过程称为消息路由。
  3. 多个消费者监听队列: 多个消费者可以分别监听不同的队列,或者监听同一个队列。
  4. 消息处理: 每个消费者接收到广播的消息后,进行相应的处理。每个消息只会被消费一次,但是可以被多个消费者同时处理。

特点

  • 消息广播:消息被广播到所有与交换机绑定的队列,而不是直接发送到特定的队列。
  • 解耦合:发布者和订阅者之间通过交换机进行解耦,发布者无需知道消息将被传递到哪些队列。
  • 多播:支持多个消费者同时处理同一条消息,以实现消息的多播效果。
  • 灵活性:可以根据需要使用不同类型的交换机和绑定规则,以满足不同的消息传递需求。

发布/订阅模式适用于需要将消息广播给多个消费者的场景,例如实时通知、日志记录、事件处理等。

Springboot集成

这里为了方便和速度就不配置yml文件中,直接编辑,这里配置两个队列

交换机名称: exchange_sub

队列一名称: queue_sub_01

队列一名称: queue_sub_02

1.创建队列和交换机并绑定

在SubConfig文件中配置

这里方便区分,新建了文件SubConfig,每个工作模式创建队列和交换机的过程区分开,全都配置到RabbitmqConfig文件中也是可以的,同时也可以通过RabbitAdmin进行绑定(另一种方式)。

package com.model.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: Haiven
 * @Time: 2024/4/19 16:29
 * @Description: TODO
 */
@Configuration
public class SubConfig {

    /**
     * 发布/订阅模式的交换机
     * @return exchange
     */
    @Bean(name = "subExchange")
    public Exchange getSubExchange(){
        return ExchangeBuilder
                .fanoutExchange("exchange_sub")
                .durable(true)
                .build();
    }

    /**
     * 发布/订阅模式的队列 1
     * @return 队列 1
     */
    @Bean(name = "subQueue01")
    public Queue getSubQueue01(){
        return QueueBuilder
                .durable("queue_sub_01")
                .build();
    }

    /**
     * 发布/订阅模式的队列 2
     * @return 队列 2
     */
    @Bean(name = "subQueue02")
    public Queue getSubQueue02(){
        return QueueBuilder
                .durable("queue_sub_02")
                .build();
    }

    /**
     * 绑定队列01
     * @return binding
     */
    @Bean
    public Binding getSubBinding01(){
        return BindingBuilder
                .bind(getSubQueue01())
                .to(getSubExchange())
                // 通配符模式 要匹配的路由键 此处为发布/订阅模式 填""就可以
                .with("")
                .noargs();
    }

    /**
     * 绑定队列02
     * @return binding
     */
    @Bean
    public Binding getSubBinding02(){
        return BindingBuilder
                .bind(getSubQueue02())
                .to(getSubExchange())
                // 通配符模式 要匹配的路由键 此处为发布/订阅模式 填""就可以
                .with("")
                .noargs();
    }
}

2.创建消费者

SubConsumer

package com.model.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author: Haiven
 * @Time: 2024/4/19 16:44
 * @Description: TODO
 */
@Component
public class SubConsumer {

    @RabbitListener(queues = {"queue_sub_01"})
    public void subConsumer01(String msg){
        System.out.println("消费者 -01- 接收消息:" + msg);
    }

    @RabbitListener(queues = {"queue_sub_02"})
    public void subConsumer02(String msg){
        System.out.println("消费者 -02- 接收消息:" + msg);
    }
}

3.创建生产者并发送消息

package com.model.controller;

import com.code.domain.Response;
import com.model.service.RabbitService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @Author: Haiven
 * @Time: 2024/4/19 9:46
 * @Description: TODO
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Resource
    private RabbitService rabbitService;

    @GetMapping("/simple")
    public Response<Void> simple(String msg){
        boolean res = rabbitService.simple(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/work")
    public Response<Void> work(String msg){
        boolean res = rabbitService.work(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/sub")
    public Response<Void> sub(String msg){
        boolean res = rabbitService.sub(msg);
        return res ? Response.success() : Response.fail();
    }
}
package com.model.service.impl;

import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @Author: Haiven
 * @Time: 2024/4/19 10:51
 * @Description: TODO
 */
@Service
@Slf4j
public class RabbitServiceImpl implements RabbitService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.simple.queue}")
    private String simpleQueue;

    @Value("${rabbitmq.work.queue}")
    private String workQueue;

    @Override
    public boolean simple(String msg) {
        try {
            rabbitTemplate.convertAndSend(simpleQueue, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean work(String msg) {
        try {
            rabbitTemplate.convertAndSend(workQueue, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean sub(String msg) {
        try {
            //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
            rabbitTemplate.convertAndSend("exchange_sub","", msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }
}

4.发送消息

发送成功

可以发现,发布/订阅模式下,推送到交换机的消息,会被所有绑定了交换机的队列接收

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_45751763/article/details/137967966
版权归原作者 一页书| 所有, 如有侵权,请联系我们删除。

“RabbitMQ工作模式(3) - 订阅模式”的评论:

还没有评论