0


MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型

书接上文,展示一下五种模型我使用的是spring could 微服务的框架

文章说明:

    本文章我会分享总结5种实用的rabbitMQ的实用模型

1、hello world简单模型

2、work queues工作队列

3、Publish/Subscribe发布订阅模型

4、Routing路由模型

5、Topics****主题模型

(赠送)6、消息转换器****

开局重要介绍(一定一定要知道的)

RabbitTemplate的主要作用是用来简化与RabbitMQ消息代理之间的通信过程。RabbitMQ是一种类似于消息队列的消息代理系统,可以实现应用程序之间的异步通信。

使用RabbitTemplate,我们可以通过其提供的方法直接向RabbitMQ发送消息,而无需编写其他低层级的代码。这样可以减少开发人员的工作量,同时提高代码的可读性和可维护性。

具体来说,RabbitTemplate提供了以下几种类型的方法:

  • 发送简单的消息
  • 发送带有附加信息的消息
  • 发送带有事务支持的消息
  • 发送响应式消息

通过这些方法,我们可以更方便地与RabbitMQ进行交互,同时也可以更灵活地使用RabbitMQ进行消息通信。因此,RabbitTemplate是Spring框架中非常重要的一个组件,也为开发人员提供了很多便利。

使用方法很简单注入就完事了,这个是自带的

 @Autowired
    RabbitTemplate rabbitTemplate;

经常使用的方法

rabbitTemplate.convertAndSend()

给大伙解释一下:

rabbitTemplate.convertAndSend() 是 RabbitTemplate 类中的一个方法,它可以将一个 Java 对象转换为 RabbitMQ 可以接受的消息格式,并将其发送给指定的消息队列或交换机。

通常情况下,我们会将一个 Java对象转换为一个 JSON 字符串,然后将该字符串作为消息发送给 RabbitMQ。在这个过程中,RabbitTemplate 会自动将 JSON 字符串加上一些消息头,以便 RabbitMQ 能够正确地理解和解析这个消息。

convertAndSend() 方法可以接受多个参数,用于指定消息的目的地、消息内容和其他一些选项。例如,您可以指定消息应该发送到哪个队列(或者交换机),或者是否应该启用事务处理等。

总的来说,convertAndSend() 方法是一个非常方便的工具,可以让我们更方便地与 RabbitMQ 进行交互,同时也可以更灵活地使用 RabbitMQ 进行消息通信。

五种模型实例

springboot依赖配置

1、首先在pom文件中导入依赖

我这里直接给代码给大家直接CV即可

<!-- amqp依赖,包含Rabbitmq-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置你的yml配置文件

因为我使用的nacos所以有两种方法

(1)直接在文件中的配置文件中添加相应的配置

(2)在nacos线上注册系统中添加配置

运行一下看能不能正常启动

(注意:发现如果配置文件写错会直接连接失败)所以我直接给大家提供代码

spring:
  rabbitmq:
    host:自己的Ip
    port: 5672
    username: guest
    password: guest
    virtualHost: /
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
      acknowledge-mode: manual # 设置消费端手动ack确认
      retry:
        enabled: true # 是否支持重试
    publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
    publisher-returns: true  #确认消息已发送到队列(Queue)

温馨提示一定是在bootstrap.yml或者在nacos中的yml格式下才能使用

已经可以正常运行了!

nacos注册服务中心能正常看到

hello world简单模型(使用简单队列)

一对一消费,只有一个消费者能接收到

消费者

消费者代码

@Component
public class SimpleConsumer {
    @RabbitListener(queuesToDeclare = {@Queue("simple.queue")})// queuesToDeclare 自动声明队列
    public void simple(String message){
        System.out.println("message"+message);
    }
}

生产者代码

@RestController
public class TestController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/test/{massage}")
    public void simpleTest(@PathVariable String massage){
        rabbitTemplate.convertAndSend("simple.queue",massage);
    }
}

其中的massage是要消费的信息,正常要使用JSON的方法给要传递的信息变成JSON字符串来进行如果需要对象的传递。根据自己的需要来进行。

这样一个简单的模型就写好了

要注意的是先使用

@RabbitListener(queuesToDeclare = {@Queue("simple.queue")})来指定队列的名称

让我们来运行一下看一下效果

看到客户端里面已经有一个队列

使用我路径进行传参对我要消费的信息进行消费,测试一下

没有报错现在看看控制台有没有打印hello word

ok已经完成了基本的简单队列,看懂了吗,快去试试吧!

work queues工作队列

阿丹小解读:

好几个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置能者多劳模式,谁完成的快,谁多做一点。我称之为内耗队列。

话不多说上代码!

配置和依赖同上

关键配置(yml):

取消预取机制,能者多劳配置
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener: 
      simple:
        prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条

还是一个消费者,但是这个消费者有两个,所以这两消费者都要指定监听同一个队列

@Component
public class WorkQueues {
    @RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
    public void holloWordListener(String message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println("message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("workQueue")) // queuesToDeclare 自动声明队列
    public void holloWordListener1(String message) throws InterruptedException {
        Thread.sleep(400);
        System.out.println("message2 = " + message);
    }
}

这样就完成了两个消费者

然后进行一个生产者开始进行生产需要处理的信息

 @GetMapping("/testworkqueue")
    public void testWorkQueue(){
        String queueName = "workQueue";
        String message = "hello,work.queue";
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(queueName,message+i);
            System.out.println("i = " + i);
        }
    }

最后让我们运行开始查看效果

使用postman对我写的方法进行访问

能看到有不同的消费者消费了不同的信息

这就是work queues工作队列了,如果你看明白了去试试吧!

Publish/Subscribe发布订阅模型

阿丹解读:

这个模式使用到了交换机,真的泰酷辣!但是要注意exchange(交换机)是不给缓存信息的

它会将同一个消息给多个消费者

使用了fanout交换机,会将接收到的信息路由给每一个绑定的queue队列

它可以使用在的场景:这个业务需要异步发送短信也要发送邮件那么就可以通过这个来使用它

来吧上代码!

还是消费者

@Component
public class Publish {
// 不指定队列,消息过了就没了
    //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})

    // 指定队列,可以接收缓存到队列里的消息
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test",durable = "true" ),
            exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
    public void reveivel(String message){
        System.out.println("message在我这里发送了短信 = " + message);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue,
            exchange = @Exchange(value = "fanoutTest",type = ExchangeTypes.FANOUT))})
    public void reveivel2(String message){
        System.out.println("message1在我这里发送了邮件 = " + message);
    }
}

给大家解释解读一下:

在这个例子中,@RabbitListener 注解被绑定到了一个名为 "test" 的队列上,同时指定了该队列使用 durable 模式(即持久化队列)。

这个队列绑定了一个名为 "fanoutTest" 的 fanout 类型的交换机,通过 @QueueBinding 和 @Exchange 注解来指定。这个交换机分发消息到所有与之绑定的队列中,而队列则用来存储这些消息。

当 RabbitMQ 服务接收到相关消息时,会自动触发绑定了 @RabbitListener 注解的方法,即使该方法在应用程序启动时并没有手动调用。

总的来说,@RabbitListener 注解和 @QueueBinding / @Exchange 注解的组合可以帮助开发者更方便地实现 RabbitMQ 的监听功能,从而更加灵活地实现消息传递和处理的流程。

重点解读:

"fanout" 是 RabbitMQ 中的一种交换机类型。在 RabbitMQ 中,消息可以被发送到交换机,交换机再将消息分发到相关的队列中。

"fanout" 类型的交换机会将它接收到的消息广播给所有与之绑定的队列,即所有队列都会收到相同的消息。这种交换机类型适用于我们需要将消息传递给多个消费者的场景。

除了 "fanout" 类型外,RabbitMQ 还支持多种其他类型的交换机,例如:direct、topic、headers 等。不同类型的交换机有不同的消息路由规则,适用于不同的场景。开发者可以根据实际情况来选择合适的交换机类型。

生产者:

@GetMapping("/testPushQueue")
    public void tesyPubSubQueue(){
        // 参数1:交换机名称 ,
        // 参数2 routingKey,(fanout类型可不写) ,
        // 参数3,消息内容
        rabbitTemplate.convertAndSend("fanoutTest","","阿丹的个人信息");
    }

重点是这三个参数

运行测试一下看看效果

老朋友postman

效果实现:

总结一下使用这个模型的逻辑:

有些网站可能在用户注册或者登录时要发送短信和邮件那么如果写在一个里面的话。发送短信和发送邮件不是一个业务。所以要分开写才对。

如果放在一起会导致运行速度和处理速度被拖慢。

Routing路由模型

阿丹解读:

routing模型也是将消息发送到交换机

但是使用的的类型和刚才的不一样,这个使用的是direct(直接)类型的交换机,它会将接到的消息按照规则路由到指定的queue队列,所以是路由模式

上代码!~

@Component
// 消费者直接绑定交换机,指定类型为direct,并指定key表示能消费的key
public class Routing {
    // 不指定队列,消息过了就没了
    //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),key = {"info","error"})})

    // 指定队列,可以接收缓存到队列里的消息
    // key = {"info","error"} 表示我能接收到routingKey为 info和error的消息
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test1",durable = "true" ),
            exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),
            key = {"info", "error"})})
    public void receivel(String message){
        System.out.println("message = " + message);
    }
    // key = {"error"} 表示我只能接收到routingKey为 error的消息
    @RabbitListener(bindings = {@QueueBinding(value = @Queue,
            exchange = @Exchange(value = "direstTest",type = ExchangeTypes.DIRECT),
            key = {"error"})})
    public void receivel1(String message){
        System.out.println("message1 = " + message);
    }
}

代码解读

@RabbitListener 注解绑定到了一个名为 "direstTest" 的直连类型的交换机上,同时还指定了消息被路由到 "error" 队列中。

直连类型的交换机会根据消息的路由键(即 key 参数)将消息路由到指定的队列中。在这个例子中,当消息的路由键包含 "error" 时,消息就会被发送到 "error" 队列中。

这个 @RabbitListener 注解没有指定队列的持久化模式(即是否将队列存储到磁盘上),因此默认使用的是非持久化队列。

总的来说,通过 @QueueBinding 和 @Exchange 注解组合使用,@RabbitListener 注解可以指示 RabbitMQ 监听指定的消息队列,并在收到相关消息时触发相应的方法。这个方法可以根据特定的业务逻辑来处理消息,从而实现不同的消息传递和处理流程。

生产者!

@GetMapping("/testRoutinginfo")
    public void direstExchangeTest(){
        rabbitTemplate.convertAndSend("direstTest","info","发送info的key的路由消息");
    }
    @GetMapping("/testRoutinginfo1")
    // 路由模型
    public void direstExchangeTest1(){
        rabbitTemplate.convertAndSend("direstTest","error","发送error的key的路由消息");
    }

要注意的就是

"error"就是指定路由了,如果使用的是fanout类型则可以不写,但是这个模式下要写本次使用的是
direst类型。

开始运行测试!

使用场景解读:

在一个电商网站中,我们需要将用户提交的订单消息分类到不同的队列中,以便我们可以在不同的处理队列中执行不同的业务流程。

Routing 模型的具体实现方式是将消息发送到一个 direct 类型的交换机上,该交换机会根据消息的 routing key(即路由键)将消息路由到绑定了对应 routing key 的队列中。通过这种方式,我们可以实现对不同类型的消息进行区分,并在不同的消费者之间进行分流和负载均衡。

当然,在实际应用中,Routing 模型也有一些局限性,例如不能实现消息的完全随机路由以及不能支持消息的匹配模式等。因此,在选取消息传递模型时,开发人员需要结合具体场景来选择合适的方案。

这就是以上的路由模型了现在你自己可以去试试了!

Topics主题模型

阿丹解读:

Topics 主题模型适用于一些需要实现高级的消息路由匹配功能的业务场景。例如,我们可能需要将不同种类的消息路由到不同的队列中,同时还需要根据消息主题、标签或其他属性来过滤和分类消息。

Topics 模型的具体实现方式是将消息发送到一个 topic 类型的交换机上,该交换机会根据消息的 routing key(即路由键)进行模糊匹配,并将符合条件的消息发送到对应的队列中。这种模糊匹配方式通常使用通配符("*" 和 "#")来实现。

例如,我们可以使用 ".error" 的路由键将所有错误类型的消息发送到一个 "error" 队列中;或者使用 "user." 的路由键将所有与用户相关的消息发送到一个名为 "user" 的队列中。

在实际应用中,Topics 模型通常用于一些消息特别多的系统,例如新闻网站、电商平台、社交媒体等,这些系统需要对不同类型的消息进行灵活的分类和过滤,以便更好地实现业务逻辑。

使用重点

topicExchange与directExchange类型,区别在于routingKey必须是多个单词的列表,并且以 . 分隔

(代表通配符,任意一个字段) user.name user. [user.age, user.xxx]

#(号代表一个或多个字段 user.name user.name.age)

上代码!!!!!!!!

消费者!

@Component
public class Topics {
    // 不指定队列,消息过了就没了
    //  @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})

    // 指定队列,可以接收缓存到队列里的消息
    // key = {"user.save","user.*"} 表示能消费 routingkey为  user.save 和 user.任意一个字符  的消息
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value ="test2",durable = "true" ),exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"user.save","user.*"})})
    public void recevicel(String message){
        System.out.println("message = " + message);
    }
    // key = {"order.#","user.*"} 表示能消费 routingkey为  order.一个或多个字符   和  user.任意一个字符  的消息
    @RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(name = "topicList",type = ExchangeTypes.TOPIC),key = {"order.#","user.*"})})
    public void recevicel1(String message){
        System.out.println("message1 = " + message);
    }
}

解读代码:

@RabbitListener 注解绑定到了一个名为 "topicList" 的 topic 类型的交换机上,同时还指定了消息应该路由到 "test2" 队列中。

在这个例子中,交换机的名字为 "topicList",类型为 TOPIC。 key 参数指定了路由规则,它由两部分组成:"user.save" 和 "user.*"。这个规则表示将所有路由键以 "user.save" 开头的消息路由到 "test2" 队列中,同时也会将所有路由键以 "user." 开头的消息发送到 "test2" 队列中。

这个 @RabbitListener 注解还指定队列的持久化模式为 true,即将队列存储到磁盘上,以便在 RabbitMQ 重启后消息可以得到恢复。

总的来说,通过 @QueueBinding 和 @Exchange 注解组合使用,@RabbitListener 注解可以协助我们更加灵活地监听和处理 RabbitMQ 中的消息。在实际应用中,我们可以根据具体的业务需求来定义不同的路由规则,以便更好地实现消息传递和处理的流程。

生产者!

 @GetMapping("/topicTest")
    public void topicTest(){
        rabbitTemplate.convertAndSend("topicTest","user.save","topic路由消息,use.save");
    }
    @GetMapping("/topicTest1")
    public void topicTest1(){
        rabbitTemplate.convertAndSend("topicTest","order.select.getone","topic路由消息,order.select.getone");
    }

运行测试!

(有点小瑕疵后期补上)

消息转换器(赠送内容)

代码里直接发送对象,虽然接收的到消息,但是rabbitmq的界面上看到的消息会是乱码

依赖
 <dependency>
     <groupId>com.fasterxml.jackson.dataformat</groupId>
     <artifactId>jackson-dataformat-xml</artifactId>
     <version>2.9.10</version>
 </dependency>
配置
@Configuration
public class RabbitmqConfig {
     // 消息转换配置
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

再次发送就会是转换好的消息


本文转载自: https://blog.csdn.net/weixin_72186894/article/details/130482387
版权归原作者 一单成 所有, 如有侵权,请联系我们删除。

“MQ消息队列,以及RabbitMQ详细(中1)五种rabbitMQ实用模型”的评论:

还没有评论