0


RabbitMQ支持的消息模型

  • RabbitMQ基础
  • RabbitMQ支持的消息模型

一、第一种模型(直连)

我们将用

Java

编写两个程序,发送单个消息的生成者和接收消息并打印出来的消费者。
在下图,

“P”

是生成者,

“C”

消费者。中间框是一个队列

RabbitMQ

保留的消息缓冲区 。

首先构建一个

Maven

项目,然后引入依赖。

<!-- 导入rabbitmq原生依赖--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>
定义生产者
importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.MessageProperties;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
* @author db
* @version 1.0
* @description Provider 生产者代码
* @since 2022/12/29
*/publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//        // 1.创建连接工厂//        ConnectionFactory connectionFactory = new ConnectionFactory();//        // 2.设置连接属性//        connectionFactory.setHost("192.168.137.120");//        connectionFactory.setPort(5672);//        connectionFactory.setVirtualHost("/");//        connectionFactory.setUsername("admin");//        connectionFactory.setPassword("123456");//        connectionFactory.setHandshakeTimeout(60000);////        // 3.从连接工厂获得连接//        Connection connection = connectionFactory.newConnection();// 从工具类中获得连接Connection connection =RabbitMqUtil.getConnection();// 4.从连接中获得channelChannel channel = connection.createChannel();// 5.声明队列queue存储消息/**
         * 参数s:队列名称 如果队列不存在就自动创建
         * 参数b:用来定义队列特性是否要持久化 true 持久化队列  false 不持久化
         * 参数b1: exclusive 是否独占队列  true 独占队列 false 不独占
         * 参数b2:autoDelete 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
         * 参数5:额外附加参数
         *
         */
        channel.queueDeclare("hello",true,false,false,null);// 7.发送消息给中间件// 参数1:交换机名称 参数2:队列名称 参数3:传递消息的额外设置 参数4:
        channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());System.out.println("消息发送成功");//        // 8.关闭连接//        channel.close();//        connection.close();RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}

执行发送,这个时候可以在

web

控制台查看到这个队列

queue

的信息。

定义消费者
importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * @author db
 * @version 1.0
 * @description Consumer  消费者
 * @since 2022/12/29
 */publicclassConsumer{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//        ConnectionFactory connectionFactory = new ConnectionFactory();//        connectionFactory.setHost("192.168.137.120");//        connectionFactory.setPort(5672);//        connectionFactory.setVirtualHost("/");//        connectionFactory.setUsername("admin");//        connectionFactory.setPassword("123456");//        connectionFactory.setHandshakeTimeout(60000);////        // 创建连接//        Connection connection = connectionFactory.newConnection();// 从工具类中获得连接Connection connection =RabbitMqUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 消费者成功消费时的回调DeliverCallback deliverCallback =(consumerTag,message)->{System.out.println(newString(message.getBody()));};// 消费者取消消费时的回调CancelCallback callback = consumerTag ->{System.out.println("消费者取消消费接口的回调");};// 参数1:消费队列的名称// 参数2:消息的自动确认机制(已获得消息就通知MQ消息已被消费)true 打开 false 关闭// 参数3:
        channel.basicConsume("hello",true,deliverCallback,callback);//        channel.close();//        connection.close();}}

工具类的包装

packagecom.duan.rabbitmq.utils;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * @author db
 * @version 1.0
 * @description RabbitMqUtil
 * @since 2023/1/2
 */publicclassRabbitMqUtil{// 定义提供连接对象的方法publicstaticConnectiongetConnection(){try{// 1.创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2.设置连接属性
            connectionFactory.setHost("192.168.137.120");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");

            connectionFactory.setHandshakeTimeout(60000);return connectionFactory.newConnection();}catch(Exception e){
            e.printStackTrace();}returnnull;}// 关闭连接通道和关闭连接的工具方法publicstaticvoidcloseConnectionAndChannel(Channel channel,Connection connection){try{if(channel !=null){
                channel.close();}if(connection !=null){
                connection.close();}}catch(Exception e){
            e.printStackTrace();}}}

报连接超时错误

解决方案:原因是连接超时,加超时时间。

maevn

项目设置超时时间:

factory.setHandshakeTimeout\(60000\)

二、第二种模型(work quene)

work queues

被称为任务队列

(Task queues)

。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用

work

模型: 让多个消费者绑定到一个队列,共同消费队列中的消息。 队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

  • P:生产者
  • C1:消费者1
  • C2:消费者2
定义生成者
packagecom.duan.rabbitmq.work;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Producer
 * @since 2023/3/24
 */publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();/**
         * 声明队列queue存储信息
         * 参数1: 队列名称
         * 参数2: 用来定义队列是否要持久化
         * 参数3: exclusion 是否是独占队列
         * 参数4: autoDelete 是否再消费完成后自动删除队列
         * 参数5: 额外附加参数
         */
        channel.queueDeclare("work",true,false,false,null);for(int i =0; i<10; i++){// 参数1:交换机名称 参数2:队列名称 参数3:消息传递的额外设置
            channel.basicPublish("","work",null,(i+"work").getBytes());}RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}
定义消费者1
packagecom.duan.rabbitmq.work;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Consumer1
 * @since 2023/3/24
 */publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMqUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 消费者消费成功时的回调
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}
定义消费者2
packagecom.duan.rabbitmq.work;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Consumer1
 * @since 2023/3/24
 */publicclassConsumer2{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMqUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 消费者消费成功时的回调
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{Thread.sleep(2000);}catch(Exception e){
                    e.printStackTrace();}System.out.println("消费者2: "+newString(body));}});RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}
rabbitMQ

讲按照顺序将每个消息发给下一个使用者,每个消费者都会收到相同数量的消息。
测试结果

消息确认机制

前面看到的是所有的消费者均分消息,会有一个问题,如果一个消费者宕机了,会出现消息丢失现场,希望当出现消费者宕机时,消息被另一个消费者消费,也就是多劳多得生产者代码。

packagecom.duan.rabbitmq.work;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Producer
 * @since 2023/3/24
 */publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();/**
         * 声明队列queue存储信息
         * 参数1: 队列名称
         * 参数2: 用来定义队列是否要持久化
         * 参数3: exclusion 是否是独占队列
         * 参数4: autoDelete 是否再消费完成后自动删除队列
         * 参数5: 额外附加参数
         */
        channel.queueDeclare("work",true,false,false,null);for(int i =0; i<20; i++){// 参数1:交换机名称 参数2:队列名称 参数3:消息传递的额外设置
            channel.basicPublish("","work",null,(i+"work").getBytes());}RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}

生成者1

packagecom.duan.rabbitmq.work;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Consumer3
 * @since 2023/11/27
 */publicclassConsumer3{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMqUtil.getConnection();// 创建信道Channel channel = connection.createChannel();

        channel.basicQos(1);// 每次只消费一个消息// 消费者消费成功时的回调
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));// 手动确认,参数1:消息标识  参数2:每次确认1个
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

生成者2

packagecom.duan.rabbitmq.work;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Consumer4
 * @since 2023/11/27
 */publicclassConsumer4{publicstaticvoidmain(String[] args)throwsIOException{Connection connection =RabbitMqUtil.getConnection();// 创建信道Channel channel = connection.createChannel();

        channel.basicQos(1);// 每次消费一个消息// 消费者消费成功时的回调
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{Thread.sleep(2000);}catch(Exception e){
                    e.printStackTrace();}System.out.println("消费者2: "+newString(body));
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

测试结果

三、第三种模型(Fanout)

广播模式下:发送消息流程是可以有多个消费者每个消费者都有自己的队列

(queue)

每个队列都要绑定交换机

(exchange)

生成者发送消息,只能发送到交换机,交换机决定把消息发给哪个队列,生成者无法决定交换机把消息发给绑定过的所有队列,队列的消费者都能拿到消息,一条消息可以被多个消费者消费。

生产者
packagecom.duan.rabbitmq.fanout;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Provider
 * @since 2023/11/28
 */publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{// 获取连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 将通道声明交换机  参数1:交换机名称 参数2:交换机类型
        channel.exchangeDeclare("logs","fanout");// 发送消息
        channel.basicPublish("logs","",null,"fanout type message".getBytes());// 释放资源RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}
消费者1
packagecom.duan.rabbitmq.fanout;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Customer1
 * @since 2023/11/28
 */publicclassCustomer1{publicstaticvoidmain(String[] args)throwsIOException{// 获得连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 通道绑定交换机
        channel.exchangeDeclare("logs","fanout");// 绑定临时队列String queue = channel.queueDeclare().getQueue();// 绑定交换机和队列
        channel.queueBind(queue,"logs","");// 消费消息
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});}}
消费者2
packagecom.duan.rabbitmq.fanout;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Customer1
 * @since 2023/11/28
 */publicclassCustomer2{publicstaticvoidmain(String[] args)throwsIOException{// 获得连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 通道绑定交换机
        channel.exchangeDeclare("logs","fanout");// 绑定临时队列String queue = channel.queueDeclare().getQueue();// 绑定交换机和队列
        channel.queueBind(queue,"logs","");// 消费消息
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者2: "+newString(body));}});}}
消费者3
packagecom.duan.rabbitmq.fanout;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Customer1
 * @since 2023/11/28
 */publicclassCustomer3{publicstaticvoidmain(String[] args)throwsIOException{// 获得连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 通道绑定交换机
        channel.exchangeDeclare("logs","fanout");// 绑定临时队列String queue = channel.queueDeclare().getQueue();// 绑定交换机和队列
        channel.queueBind(queue,"logs","");// 消费消息
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者3: "+newString(body));}});}}
测试结果

四、第四种模型(Routing)

fanout

模式中,一条消息,会被所有绑定的队列都能消费,但是,在某些场景下,希望不同的消息被不同的队列消费,就需要

Direct

类型的

exchange

Direc

t模型下:队列与交换机的绑定,不是任意绑定的,而是要指定一个

RoutingKey

(路由

key

)消息的发送方在向

Exchange

发送消息时,也必须指定消息的

RoutingKey

Exchange

不再把消息交给每一个绑定的队列,而是根据消息的

Routing Key

进行判断,只有队列的

Routingkey

与消息的

Routing key

完全一致,才会接收到消息。

生产者
packagecom.duan.rabbitmq.direct;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Provider
 * @since 2023/11/28
 */publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{// 建立连接Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs_direct","direct");String routingKey ="error";
        channel.basicPublish("logs_direct",routingKey,null,("这是direct模型发布的基于route key: ["+routingKey+"] 发送的消息").getBytes());RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}
消费者1
packagecom.duan.rabbitmq.direct;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Consumer1
 * @since 2023/11/28
 */publicclassConsumer1{publicstaticvoidmain(String[] args)throwsIOException{// 获得连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");// 绑定临时队列String queue = channel.queueDeclare().getQueue();// 绑定交换机和队列
        channel.queueBind(queue,"logs_direct","error");// 消费消息
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});}}
消费者2
packagecom.duan.rabbitmq.direct;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Consumer1
 * @since 2023/11/28
 */publicclassConsumer2{publicstaticvoidmain(String[] args)throwsIOException{// 获得连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");// 绑定临时队列String queue = channel.queueDeclare().getQueue();// 绑定交换机和队列
        channel.queueBind(queue,"logs_direct","info");
        channel.queueBind(queue,"logs_direct","error");
        channel.queueBind(queue,"logs_direct","warning");// 消费消息
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});}}
测试结果

routingKey

info

时,消费者1和消费者2结果如下:

routingKey

error

时,消费者1和消费者2结果如下:

五、第五种模型(topic)

Topic

类型的

Exchange

Direct

相比,都是可以根据

RoutingKey

把消息路由到不同的队列。只不过

Topic

类型

Exchange

可以让队列在绑定

BindingKey

的时候使用通配符!

BindingKey

一般都是有一个或多个单词组成,多个单词之间以.分割,例如:

item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu
生产者
packagecom.duan.rabbitmq.topic;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Provider
 * @since 2023/11/30
 */publicclassProvider{publicstaticvoidmain(String[] args)throwsIOException{//获取连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();//声明交换机以及交换机类型 topic
        channel.exchangeDeclare("topics","topic");//发布消息String routekey ="save.user.delete";

        channel.basicPublish("topics",routekey,null,("这里是topic动态路由模型,routekey: ["+routekey+"]").getBytes());//关闭资源RabbitMqUtil.closeConnectionAndChannel(channel,connection);}}
消费者1
packagecom.duan.rabbitmq.topic;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Customer1
 * @since 2023/11/30
 */publicclassCustomer1{publicstaticvoidmain(String[] args)throwsIOException{// 获得连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 通道绑定交换机
        channel.exchangeDeclare("topics","topic");// 绑定临时队列String queue = channel.queueDeclare().getQueue();// 绑定交换机和队列
        channel.queueBind(queue,"topics","*.user.*");// 消费消息
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});}}
消费者2
packagecom.duan.rabbitmq.topic;importcom.duan.rabbitmq.utils.RabbitMqUtil;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author db
 * @version 1.0
 * @description Customer1
 * @since 2023/11/30
 */publicclassCustomer2{publicstaticvoidmain(String[] args)throwsIOException{// 获得连接对象Connection connection =RabbitMqUtil.getConnection();Channel channel = connection.createChannel();// 通道绑定交换机
        channel.exchangeDeclare("topics","topic");// 绑定临时队列String queue = channel.queueDeclare().getQueue();// 绑定交换机和队列
        channel.queueBind(queue,"topics","*.user.#");// 消费消息
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});}}
测试结果

代码地址:

https://gitee.com/duan138/practice-code/tree/dev/rabbitmq-java

六、总结

以上就是

rabbitMQ

中常见的几种模式,这些模型通过交换机

(Exchange)

和队列

(Queue)

的不同组合与绑定方式实现。本文只是初步了解

RabbitMQ

相关知识。后续会讲解怎么在

SpringBoot

中应用。


改变你能改变的,接受你不能改变的,关注公众号:程序员康康,一起成长,共同进步。


本文转载自: https://blog.csdn.net/abc_138/article/details/139484010
版权归原作者 cxykk1217 所有, 如有侵权,请联系我们删除。

“RabbitMQ支持的消息模型”的评论:

还没有评论