0


RabbitMq的五大消息模型及Java代码演示

RabbitMq一共有6种消息模型,分别为:

1.基本模型(Hello World)

2.消息分发(工作队列)模型(Work queues)

3.Fanout订阅模型-消息订阅模式(Publish/Subscribe)

4.Direct订阅模型-路由模式(Routing)

5.Topic订阅模型-匹配模式(Topic)

6.RPC模式(RPC)

而其中的RPC模式,它并不完全属于消息队列(MQ)的定义,而是使用MQ的某些功能来实现RPC的效果。

所以简单的介绍下以下五种消息模型:

一.基本模型(Hello World)

1.这是个非常简单的模型,即一个生产者发送消息到一个队列,一个消费者从该队列中接收并处理消息。

2.主要适用于RabbitMq的测试和日常我们学习的场景,在需要快速搭建轻量级消息队列系统的应用中,简单模式也是个很好的选择。

代码实现:

在RabbitMq的webUI界面去创建,为了方便我直接在RabbitMq管理页面创建一个名为basic1的队列(我们也可以在Java代码中去配置queue,交换机及其绑定,为了方便展示用了管理页面)

用postMan请求,生产者发送消息

package com.org.product.controller;

import com.org.product.config.MqProductCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * Created with IntelliJ IDEA.
 * @Author: 你的名字
 */
@RestController
@RequestMapping("/product")
@Slf4j
public class PushMessageController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MqProductCallBack mqProductCallBack;

    @GetMapping("/basic")
    public String basic(){
        log.info("-------------消息推送开始--------------");
        //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
        CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
        //消息确认与返回
        rabbitTemplate.setConfirmCallback(mqProductCallBack);
        rabbitTemplate.setReturnsCallback(mqProductCallBack);
        /**
         *消息发送
         *在简单模式下,由于使用默认的交换机,则要保持路由名称和队列名称一致,才能把消息发送到队列中去;
         *也就是routingKey写队列名称
         */
        rabbitTemplate.convertAndSend("basic1","小飞棍来喽!",
                //Lambda表达式,实现MessagePostProcessor接口
                message -> {
                    //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //返回修改后的消息
                    return message;
                }, correlationData);
        log.info("--------------消息推送结束------------------");
        return "消息发送成功了!!!";
    }
}

控制台

消费者监听

package com.org.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: 你的名字
 * @Description:监听消息
 */
@Component
public class MessageListener {

    @RabbitListener(queues ="basic1")
    public void handleMessage(String message) {
        //处理接收到的消息
        System.out.println("接收到的消息是:" + message);
    }

    //手动ack
//    @RabbitListener(queues = "queue2")
//    public void handleMessage1(String message,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long tag) throws IOException {
//        System.out.println("接收到的消息是:" + message);
//        try {
//            Thread.sleep(2000);
//            //发生异常
//            int i = 1/0;
//            //手动ack
//            channel.basicAck(tag,false);
//        } catch (Exception e) {
//            //手动ack,让消息重回队列,参数三表示是否重回队列
//         channel.basicNack(tag,false,false);
//
//        }
//
//    }
}

控制台

二 .消息分发(工作队列)模型(Work queues)

1.工作队列将一些耗时的任务封装为消息,并将其发送到一个中心队列,多个消费者同时从队列中获取任务,每个任务只会被一个消费者获取并处理。

2.工作队列模型主要适用于长时间运行的任务场景,例如订单系统,当单个订单的处理时间较长(比如15秒),我们可以将多个订单同时放入消息队列中,让多个消费者同时处理这些订单,从而实现并行处理而不是串行处理;还可以实现需要异步处理任务的场景,如更新数据库等。

代码实现:

和上边一样我们先创建名为work1的队列

生产者代码

    @GetMapping("/work")
    public String work(){
        log.info("-------------消息推送开始--------------");
        //发送10条消息测试
        for (int i = 1; i <= 10; i++) {
            //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
            CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
            //消息确认与返回
            rabbitTemplate.setConfirmCallback(mqProductCallBack);
            rabbitTemplate.setReturnsCallback(mqProductCallBack);
            /**
             *消息发送
             *在工作队列模式下,由于使用默认的交换机,则要保持路由名称和队列名称一致,才能把消息发送到队列中去;
             *也就是routingKey写队列名称
             */
            rabbitTemplate.convertAndSend("work1","小飞棍来喽"+i+"!",
                    //Lambda表达式,实现MessagePostProcessor接口
                    message -> {
                        //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //返回修改后的消息
                        return message;
                    }, correlationData);
        }
        log.info("--------------消息推送结束------------------");

        return "消息发送成功了!!!";
    }

控制台

消费者代码

 @RabbitListener(queues = "work1")
    public void workQueue1(String message){
        System.out.println("消费者--1--接收到的消息是:" + message);
    }

    @RabbitListener(queues = "work1")
    public void workQueue2(String message){
        System.out.println("消费者--2--接收到的消息是:" + message);
    }

控制台

三. Fanout订阅模型-消息订阅模式(Publish/Subscribe)

1.Fanout订阅模型和工作队列模式比较,前者用到了fanout交换机,并广播到多个队列,消息生产者将消息发送到交换机(Exchange),而交换机以广播的形式将消息发送给所有与该交换机绑定的队列,从而实现了消息的广播。

2.Fanout交换机:主要用来将消息广播给绑定该交换机的队列,不关心消息的路由键是什么,也就是无论消息路由键是什么,Fanout交换机都将消息发送给绑定的队列,与Direct交换机相比,不需要设置路由键就可以将消息广播到所有队列,与Topic交换机相比,它不支持通配符匹配路由键。

3.主要适用于将消息广播给多个消费者,例如:日志记录,实时消息推送等

代码实现:

(1)首先我们在web界面创建名为fanout_queue1,fanout_queue2队列。

(2)在创建Fanout类型交换机,名为fanout_exchange。

(3)绑定队列fanout_queue1,fanout_queue2

路由键Routing key 设不设置都可以,我这边就不设置了

生产者代码

 @GetMapping("/fanout")
    public String fanout() {
        for (int i = 1; i <= 4; i++) {
            //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
            CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
            //消息确认与返回
            rabbitTemplate.setConfirmCallback(mqProductCallBack);
            rabbitTemplate.setReturnsCallback(mqProductCallBack);
            /**
             *消息发送
             *在fanout订阅模式我们可以随意定义路由
             *也就是routingKey写队列名称
             */
            rabbitTemplate.convertAndSend("fanout_exchange", "666", "小飞棍来喽" + i + "!",
                    //Lambda表达式,实现MessagePostProcessor接口
                    message -> {
                        //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //返回修改后的消息
                        return message;
                    }, correlationData);
        }
        return "消息发送成功了!!!";
    }

控制台:4条消息发送成功

消费者代码:定义两个分别监听两个队列的消费者

 //fanout订阅模式
    @RabbitListener(queues = "fanout_queue1")
    public void fanout1(String message){
        System.out.println("fanout_queue1--的消息是:" + message);
    }

    @RabbitListener(queues = "fanout_queue2")
    public void fonout2(String message){
        System.out.println("fanout_queue2--的消息是:" + message);
    }

控制台:4条消息,分别广播到两个队列

四**.Direct订阅模型-路由模式(Routing)**

1.Direct订阅模式用到了Direct交换机,不同于Fanout订阅模式,它必须指定路由键,允许消费者只接收与特定路由键匹配的消息,从而实现了有选择性接收,它的匹配方式是精确匹配,即路由键和绑定键必须完全相同,不同则无法将消息正确路由到对应的队列,消息可能会被丢弃或者根据RabbitMq配置进行处理(如返回给生产者或者进入死信队列)。

2.Direct订阅模式适用于日志处理,订单系统等,比如我们根据订单号将订单消息路由到特定的处理队列中。

代码实现:

(1)我们在RabbitMq的web管理页面创建Direct交换机,名为direct_exchange。

(2)然后创建两个队列,分列为direct_queue1,direct_queue2。(你也可以用之前创建过的)

(3)交换机direct_exchange和direct_queue1的绑定路由键为direct_key1。

     交换机direct_exchange与direct_queue2绑定的路由键为direct_key2。

生产者代码:将 "小飞棍来喽A、B、C" 发送到direct_queue2队列,将 "小飞棍来喽A" 发送到direct_queue1队列

 @GetMapping("/direct")
    public String direct() {
        //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
        CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
        //消息确认与返回
        rabbitTemplate.setConfirmCallback(mqProductCallBack);
        rabbitTemplate.setReturnsCallback(mqProductCallBack);
        /**
         *消息发送
         *在direct订阅模式我们要精准路由
         */
        rabbitTemplate.convertAndSend("direct_exchange", "direct_key2", "小飞棍来喽A", message -> {
            //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            //返回修改后的消息
            return message;},correlationData);
        rabbitTemplate.convertAndSend("direct_exchange","direct_key2","小飞棍来喽B",message -> {
            //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            //返回修改后的消息
            return message;},correlationData);
        rabbitTemplate.convertAndSend("direct_exchange","direct_key2","小飞棍来喽C",message -> {
            //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            //返回修改后的消息
            return message;},correlationData);
        rabbitTemplate.convertAndSend("direct_exchange", "direct_key1", "小飞棍来喽A",message -> {
            //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            //返回修改后的消息
            return message;},correlationData);
        return "消息发送成功了!!!";
    }

控制台:4条消息发送成功

消费者代码:

 //direct订阅模式
    @RabbitListener(queues = "direct_queue1")
    public void direct1(String message){
        System.out.println("direct_queue1--的消息是:" + message);
    }

    @RabbitListener(queues = "direct_queue2")
    public void direct2(String message){
        System.out.println("direct_queue2--的消息是:" + message);
    }

控制台:消费者把对应的队列消息消费下来了

**五.**5.Topic订阅模型-匹配模式(Topic)

1.Topic订阅模型相比Direct订阅模型,分组更细致,也更灵活,他允许在路由键上使用通配符,并根据发送消息时携带的路由键进行模糊匹配。需要注意的是用通配符匹配,单词之间要用(.)分隔。

2.通配符:
*****(星号)匹配一个单词:例如,我们代码中配置的路由键是superman.A或superman.B或superman.C那么我们绑定的键(Binding Key)为superman.*的队列将会匹配到这个路由键,但是如果是superman.A.D,就不会匹配到

#(井号)匹配零个或者多个单词:例如,路由键是superman.A.D,那么绑定的superman.#的队列会匹配到这个路由键,当然如果是superman.A.D.E等都会匹配到(注意,不建议用#.#它会导致接收到大量的消息)

3.Topic订阅模型适用于根据消息内容或者类别来进行灵活分类。例如分布式任务调度,新闻订阅等。

代码实现:

先在RabbitMq管理页面新建topic_exchange交换机

然后建立两个队列topic_queue1,topic_queue2

最后topic_exchange交换机绑定topic_queue1的路由键为topic.*;绑定topic_queue2的路由键为topic.#

生产者代码

   @GetMapping("/topic")
    public String topic() {
        //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
        CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
        //消息确认与返回
        rabbitTemplate.setConfirmCallback(mqProductCallBack);
        rabbitTemplate.setReturnsCallback(mqProductCallBack);
        /**
         * 发送给topic_queue1
         * 因为topic_queue1我们绑定的路由键是topic.*,所以我们routingKey可以写(topic.任何一个单词)
         */
        rabbitTemplate.convertAndSend("topic_exchange", "topic.A", "小飞棍来喽A!", message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);
        /**
         * 发送给topic_queue2
         * 因为topic_queue2我们绑定的路由键是topic.#,所以我们routingKey可以写(topic.一个或者多个单词)
         */
        rabbitTemplate.convertAndSend("topic_exchange", "topic.B.B.B", "小飞棍来喽B", message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);
        
        return "消息发送成功了!!!";
    }

消费者代码

  //topic订阅模式
    @RabbitListener(queues = "topic_queue1")
    public void topic1(String message){
        System.out.println("topic_queue1--的消息是:" + message);
    }

    @RabbitListener(queues = "topic_queue2")
    public void topic2(String message){
        System.out.println("topic_queue2--的消息是:" + message);
    }

消费者控制台

由上边的消费结果可以看出,topic_queue1接收到一条消息,topic_queue2接收到两条消息,

是因为topic.*的路由匹配topic.A,而匹配不到topic.B.B.B,

而topic.#的路由匹配到了topic.A以及topic.B.B.B

以上五种模型涵盖了RabbitMQ的主要消息传递方式,从简单的点到点通信到复杂的发布/订阅模式,可以满足不同应用场景的需求。同时,RabbitMQ还提供了消息确认机制、消息持久化、消息优先级和消息过期时间等特性,以保证消息的可靠性、持久性和灵活性,我们可以根据我们业务的需求场景来决定使用哪一种消息模式。

上边的SpringBoot代码工程在上一篇SpringBoot集成RabbitMq

链接:一文搞定springBoot集成RabbitMq-CSDN博客

欲渡黄河冰塞川,将登太行雪满山~~

文章有不足之处欢迎各位纠正,希望能和大家一起砥砺前行!!!加油


本文转载自: https://blog.csdn.net/Aa5107033/article/details/139967139
版权归原作者 一拳打穿地球o 所有, 如有侵权,请联系我们删除。

“RabbitMq的五大消息模型及Java代码演示”的评论:

还没有评论