0


RabbitMQ---订阅模型-Fanout

1、 订阅模型-Fanout

Fanout,也称为广播。
流程图:
在这里插入图片描述

在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

1.1、生产者

两个变化:
1) 声明Exchange,不再声明Queue
2) 发送消息到Exchange,不再发送到Queue

publicclassSend{privatefinalstatic String EXCHANGE_NAME ="fanout_exchange_test";publicstaticvoidmain(String[] argv)throws Exception {// 获取到连接
       Connection connection = ConnectionUtil.getConnection();// 获取通道
       Channel channel = connection.createChannel();// 声明exchange,指定类型为fanout
       channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 消息内容
       String message ="Hello everyone";// 发布消息到Exchange
       channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes());
       System.out.println(" [生产者] Sent '"+ message +"'");
       channel.close();
       connection.close();}}

1.2、消费者1

publicclassRecv{privatefinalstatic String QUEUE_NAME ="fanout_exchange_queue_1";privatefinalstatic String EXCHANGE_NAME ="fanout_exchange_test";publicstaticvoidmain(String[] argv)throws Exception {// 获取到连接
       Connection connection = ConnectionUtil.getConnection();// 获取通道
       Channel channel = connection.createChannel();// 声明队列
       channel.queueDeclare(QUEUE_NAME,false,false,false, null);// 绑定队列到交换机
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");// 定义队列的消费者
       DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body)throws IOException {// body 即消息体
               String msg =newString(body);
               System.out.println(" [消费者1] received : "+ msg +"!");}};// 监听队列,自动返回完成
       channel.basicConsume(QUEUE_NAME,true, consumer);}}

1.3、 消费者2

publicclassRecv2{privatefinalstatic String QUEUE_NAME ="fanout_exchange_queue_2";privatefinalstatic String EXCHANGE_NAME ="fanout_exchange_test";publicstaticvoidmain(String[] argv)throws Exception {// 获取到连接
       Connection connection = ConnectionUtil.getConnection();// 获取通道
       Channel channel = connection.createChannel();// 声明队列
       channel.queueDeclare(QUEUE_NAME,false,false,false, null);// 绑定队列到交换机
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");// 定义队列的消费者
       DefaultConsumer consumer =newDefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@OverridepublicvoidhandleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body)throws IOException {// body 即消息体
               String msg =newString(body);
               System.out.println(" [消费者2] received : "+ msg +"!");}};// 监听队列,手动返回完成
       channel.basicConsume(QUEUE_NAME,true, consumer);}}

1.4、 测试

我们应该先启动生产者,否则,先启动消费者时,由于要绑定交换机,此时,交换机并不存在所以会报错。
我们运行两个消费者,然后发送1条消息:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_39311377/article/details/132393168
版权归原作者 DKPT 所有, 如有侵权,请联系我们删除。

“RabbitMQ---订阅模型-Fanout”的评论:

还没有评论