0


RabbitMQ 消息类型

RabbitMQ 消息类型

下面我们简单介绍下RabbitMQ的一些消息种类,并结合Java代码进行学习。

如果需要执行代码,需要下载RabbitMQ的客户端(例如java客户端: https://www.rabbitmq.com/java-client.html)
使用maven:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>

注意先启动消费者,在启动生产者
5.x 版本系列需要 JDK 8
java-client 的文档:https://rabbitmq.github.io/rabbitmq-java-client/api/current/index.html

创建一个连接工具类ConnectionUtil

importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 连接工具类
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 13:49
 */publicclassConnectionUtil{/**
     * 获取MQ的连接
     * @return
     */publicstaticConnectiongetConnection()throwsIOException,TimeoutException{// 定义一个连接工厂ConnectionFactory factory =newConnectionFactory();// 设置服务地址
        factory.setHost("localhost");// AMQP 5672
        factory.setPort(5672);// vhost
        factory.setVirtualHost("/vhost01");// 用户名
        factory.setUsername("admin");// 密码
        factory.setPassword("123456");return factory.newConnection();}}

simple 简单队列

在这里插入图片描述

P:消息生产者
红色:队列
C:消费者
3个对象:生产者 队列 消费者

生产者直接发送消息到队列,消费者直接从队列获取消息。发送消息时,只需要指定队列,不需要指定交换机,以及路由key,只有一个消费者

示例

  1. 消息生产者:Send
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 简单队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 13:58
 */publicclassSend{privatestaticfinalStringQUEUE_NAME="test_simple_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取一个连接Connection connection =ConnectionUtil.getConnection();// 从连接中获取一个通道Channel channel = connection.createChannel();// 声明一个队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message ="hello simple !";// 第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
        channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println("send message");

        channel.close();
        connection.close();}}
  1. 消息消费者:Recv
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 简单队列,消息消费者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:00
 */publicclassRecv{privatestaticfinalStringQUEUE_NAME="test_simple_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定义队列的消费者DefaultConsumer defaultConsumer =newDefaultConsumer(channel){// 获取到达的消息@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msgString =newString(body,"utf-8");System.out.println("new api recv: "+ msgString);}};// 监听队列
        channel.basicConsume(QUEUE_NAME,true, defaultConsumer);}}

简单队列的不足:耦合性高,生产者一一对应消费者(如果我想有多个消费者消费队列中消息,这时候就不行),队列名变更,这时候得同时变更。

work queues 工作队列

在这里插入图片描述

工作队列可以细分为轮询分发和公平分发。发送消息时,只需要指定队列,不需要指定交换机,以及路由key,设定多个消费者

为什么会出现工作队列,因为simple 队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,可能需要花费时间,这时候队列就会积压了很多消息。

round robin 轮询分发

  1. 消息生产者:Send
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 工作队列,轮询分发,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:12
 */publicclassSend{publicstaticfinalStringQUEUE_NAME="test_round_robin_work_queue";/**
     *                  |--> C2
     * P ---> Queue ----|
     *                  |--> C1
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 发送消息for(int i =0; i <50; i++){String msg="send hello "+ i;
            channel.basicPublish("",QUEUE_NAME,null, msg.getBytes());System.out.println("【WQ】 send msg = "+ msg);Thread.sleep(i*20);}// 关闭资源
        channel.close();
        connection.close();}}
  1. 消息消费者:Recv1
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 工作队列,轮询分发,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */publicclassRecv1{publicstaticfinalStringQUEUE_NAME="test_round_robin_work_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定义一个消费者Consumer consumer =newDefaultConsumer(channel){// 消息到达,触发这个方法@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [1] msg = "+ msg);try{Thread.sleep(2000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("Recv [1] done!");}}};// 监听队列boolean autoAck =true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);}}
  1. 消息消费者:Recv2
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 工作队列,轮询分发,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */publicclassRecv2{publicstaticfinalStringQUEUE_NAME="test_round_robin_work_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 定义一个消费者DefaultConsumer defaultConsumer =newDefaultConsumer(channel){// 消息到达,触发这个方法@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [2] msg = "+ msg);try{Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("Recv [2] done!");}}};// 监听队列boolean autoAck =true;
        channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);}}

现象:

  • 消费者1 和消费者2处理消息的数量是一样的。
  • 消费者1:偶数。
  • 消费者2:奇数。
  • 这种方式叫做轮询分发(round-robin)结果就是不管谁忙或者谁清闲 都不会多给一个消息,任务总是你一个我一个。

fair dispatch 公平分发

公平分发,需要消费者进行手动回执

// MQ一次只发一个请求给消费者,当消费者处理完消息后会手动回执,然后MQ再发一个消息给消费者
channel.basicQos(1);boolean autoAck =false;//false 手动回执,处理完消息后,告诉MQ
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
  1. 消息生产者:Send
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 工作队列,公平分发,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:30
 */publicclassSend{publicstaticfinalStringQUEUE_NAME="test_fair_dispatch_work_queue";/**
     *                  |--> C2
     * P ---> Queue ----|
     *                  |--> C1
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,InterruptedException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);/**
         * 每个消费者 发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
         * 限制发送给同一个消费者 不得超过一条消息
         */
        channel.basicQos(1);// 发送消息for(int i =0; i <50; i++){String msg="send hello "+ i;
            channel.basicPublish("",QUEUE_NAME,null, msg.getBytes());System.out.println("【WQ】 send msg = "+ msg);Thread.sleep(i*5);}// 关闭资源
        channel.close();
        connection.close();}}
  1. 消息消费者:Recv1
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 工作队列,公平分发,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */publicclassRecv1{publicstaticfinalStringQUEUE_NAME="test_fair_dispatch_work_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1);// 定义一个消费者Consumer consumer =newDefaultConsumer(channel){// 消息到达,触发这个方法@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [1] msg = "+ msg);try{Thread.sleep(2000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("Recv [1] done!");// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列// boolean autoAck = true; //自动应答boolean autoAck =false;//手动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);}}
  1. 消息消费者:Recv2
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 工作队列,公平分发,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:14
 */publicclassRecv2{publicstaticfinalStringQUEUE_NAME="test_fair_dispatch_work_queue";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1);// 定义一个消费者DefaultConsumer defaultConsumer =newDefaultConsumer(channel){// 消息到达,触发这个方法@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [2] msg = "+ msg);try{Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("Recv [2] done!");// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列boolean autoAck =false;//false 手动回执
        channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);}}

现象:消费者2 处理的消息比消费者1多,能者多劳

publish/subscribe 发布-订阅模式

在这里插入图片描述

  • 一个生产者,多个消费者,需要新建fanout交换机
  • 每个消费者都有自己的队列,并绑定到交换机上
  • 生产者没有直接把消息发送到队列,而是发送到交换机
  • 消息发送时需要指定交换机,消息接收时需要指定队列
  • 生产者发送的消息,经过交换机,到达队列。就能实现一个消息被多个消费者消费

注册成功时,既要发邮件,又要发短信

示例

  1. 消息生产者:Send
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 发布订阅模式队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:51
 */publicclassSend{publicstaticfinalStringEXCHANGE_NAME="test_exchange_fanout";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分发// 发送消息String msg ="hello ps";
        channel.basicPublish(EXCHANGE_NAME,"",null, msg.getBytes());System.out.println("Send msg = "+ msg);
        channel.close();
        connection.close();}}

在这里插入图片描述

消息哪去了?? 丢失了,因为交换机没有存储的能力,在rabbitmq里面只有队列有存储的能力。因为还没有队列绑定到这个交换机,所以数据丢失了。

  1. 消息消费者:Recv1
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 发布订阅模式队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:59
 */publicclassRecv1{publicstaticfinalStringQUEUE_NAME="test_queue_fanout_email";publicstaticfinalStringEXCHANGE_NAME="test_exchange_fanout";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 保证一次只分发一个
        channel.basicQos(1);// 绑定到交换机 转发器
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");// 定义一个消费者Consumer consumer =newDefaultConsumer(channel){// 消息到达,触发这个方法@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [1] msg = "+ msg);try{Thread.sleep(2000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("Recv [1] done!");// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列boolean autoAck =false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);}}
  1. 消息消费者:Recv2
importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 发布订阅模式队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 14:59
 */publicclassRecv2{publicstaticfinalStringQUEUE_NAME="test_queue_fanout_sms";publicstaticfinalStringEXCHANGE_NAME="test_exchange_fanout";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取连接Connection connection =ConnectionUtil.getConnection();// 获取channelChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定到交换机 转发器
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");// 保证一次只分发一个
        channel.basicQos(1);// 定义一个消费者Consumer consumer =newDefaultConsumer(channel){// 消息到达,触发这个方法@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [2] msg = "+ msg);try{Thread.sleep(2000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("Recv [2] done!");// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列boolean autoAck =false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);}}

现象:消费者1和消费者2都受到了消息。

在这里插入图片描述

routing 路由选择通配符模式

  • direct交换机类型
  • 生产者需要将交换机和routing key绑定
  • 消费者需要将队列,交换机,routing key 三者绑定
  • 每个消息会根据不同的Routing key,发送到不同的消费者队列
  • 不支持通配符(*,#

在这里插入图片描述

示例

  1. 消息生产者:Send
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 路由模式队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:12
 */publicclassSend{publicstaticfinalStringEXCHANGE_NAME="test_exchange_direct";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();// exchange direct:直连
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");String msg ="hello direct !";// routing keyString routingKey ="info";
        channel.basicPublish(EXCHANGE_NAME, routingKey,null, msg.getBytes());System.out.println("send :"+ msg);

        channel.close();
        connection.close();}}
  1. 消息消费者:Recv1
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 路由模式队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:13
 */publicclassRecv1{publicstaticfinalStringEXCHANGE_NAME="test_exchange_direct";publicstaticfinalStringQUEUE_NAME="test_queue_direct_1";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机,并绑定 routingKey
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");// 保证一次只分发一个
        channel.basicQos(1);// 定义一个消费者Consumer consumer =newDefaultConsumer(channel){// 消息到达,触发这个方法@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [1] msg = "+ msg);try{Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);System.out.println("Recv [1] done!");}}};// 监听队列 autoAck(消息应答):false 手动回执 (消息回执 channel.basicAck(envelope.getDeliveryTag(), false);)
        channel.basicConsume(QUEUE_NAME,false, consumer);}}
  1. 消息消费者:Recv2
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 路由模式队列,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:13
 */publicclassRecv2{publicstaticfinalStringEXCHANGE_NAME="test_exchange_direct";publicstaticfinalStringQUEUE_NAME="test_queue_direct_2";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"waring");

        channel.basicQos(1);Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("Recv [2] msg = "+ msg);try{Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);System.out.println("Recv [2] done!");}}};

        channel.basicConsume(QUEUE_NAME,false, consumer);}}

现象:队列绑定的路由key和消息发送时指定的路由key匹配时,才会接收到消息。

Topics 主题

  • Topic交换机类型,生产者需要申明topic交换机,并指定routing key。
  • ​和路由模式类似,但是Topic可以支持通配符,# 匹配一个或者多个字符;​* 匹配一个字符

在这里插入图片描述

在这里插入图片描述

商品:发布、删除、修改、查询

  1. 消息生产者:Send
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 主题队列,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:21
 */publicclassSend{publicstaticfinalStringEXCHANGE_NAME="test_exchange_topic";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明交换机 topic:主题模式
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");String message ="商品...";// goods.delete 只有消费者2号能收到(goods.#)
        channel.basicPublish(EXCHANGE_NAME,"goods.delete",null, message.getBytes());// 两个消费者都能收到// channel.basicPublish(EXCHANGE_NAME, "goods.add", null, message.getBytes());System.out.println("send message = "+ message);

        channel.close();
        connection.close();}}
  1. 消息消费者:Recv1
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 主题队列,消息消费者1号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:21
 */publicclassRecv1{publicstaticfinalStringEXCHANGE_NAME="test_exchange_topic";publicstaticfinalStringQUEUE_NAME="test_queue_topic_1";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定 商品新增 goods.add
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");

        channel.basicQos(1);//定义消费者Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("[1] recv msg: "+ msg);try{Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("[1] recv done!");// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列
        channel.basicConsume(QUEUE_NAME,false, consumer);}}
  1. 消息消费者:Recv2
importcom.goudong.modules.rabbitmq.demo.util.ConnectionUtil;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * 类描述:
 * 主题队列,消息消费者2号
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 15:21
 */publicclassRecv2{publicstaticfinalStringEXCHANGE_NAME="test_exchange_topic";publicstaticfinalStringQUEUE_NAME="test_queue_topic_2";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定 goods.# (`#` 匹配一个或者多个字符)
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");

        channel.basicQos(1);//定义消费者Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body,"utf-8");System.out.println("[2] recv msg: "+ msg);try{Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println("[2] recv done!");// 手动回执 false 表示只确认 envelope.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
                    channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列
        channel.basicConsume(QUEUE_NAME,false, consumer);}}

现象:只有满足生产者指定的路由模式,才会将消息发送到队列。


本文转载自: https://blog.csdn.net/qq_42428264/article/details/128575444
版权归原作者 胸大的请先讲 所有, 如有侵权,请联系我们删除。

“RabbitMQ 消息类型”的评论:

还没有评论