✨ RabbitMQ:发布订阅模式
📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件
1.订阅模式基本介绍
- P:生产者,发送消息给交换机
- C:消费者,接收消息
- X:交换机,一方面接收生产者发送的消息,另一方面知道怎么处理消息,是否应将其附加到特定队列?是否应将其附加到多个队列中?或者它应该被丢弃。其规则由交换类型定义。
- Queue:消息队列,接收消息,缓存消息
- 每个消费者都监听自己的队列
- 生产者把消息发送给broker,然后交换机把消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
2.交换机
- RabbitMQ 中消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,很多时候,生产者甚至不知道消息是否会传递到任何队列。相反,生产者只能将消息发送到_交换机_。交换机的工作是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面则将它们推送到队列。交换必须确切地知道如何处理它收到的消息。是否应将其附加到特定队列?是否应将其附加到多个队列中?或者它应该被丢弃。其规则由_交换类型_定义。
- 交换机只负责转发消息,并没有存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机类型
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
3.发布订阅模式
3.1基本介绍
要配置一个fanout类型的交换机,不需要指定对应的路由key,同时会把消息路由到每一个消息队列中,每个消息队列都可以对相同的消息进行存储,然被由各自的消息队列相关联的消费者消费
3.2生产者
publicclassProducer{publicstaticString FANOUT_EXCHANGE =" fanout_exchange";publicstaticString FANOUT_QUEUE_1 ="fanout_queue_1";publicstaticString FANOUT_QUEUE_2 ="fanout_queue_2";publicstaticvoidmain(String[] args){try{Channel channel =ConnectUtil.getChannel();//声明交换机(交换机名称,交换机类型)
channel.exchangeDeclare(FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);//声明队列
channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);//把交换机和队列进行绑定
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");//发送消息for(int i =1; i <=10; i++){String msg="你好,小兔子,发布订阅模式 : "+i;
channel.basicPublish(FANOUT_EXCHANGE,"",null, msg.getBytes());}}catch(IOException e){
e.printStackTrace();}catch(TimeoutException e){
e.printStackTrace();}}}
3.3消费者
消费者1
publicclassConsumer1{publicstaticvoidmain(String[] args){try{Channel channel =ConnectUtil.getChannel();
channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);//把队列和交换机绑定 队列名称,交换机名称,路由key
channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");//接受消息DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 消费回调函数,当收到消息以后,会自动执行这个方法
* @param consumerTag 消费者标识
* @param envelope 消息包的内容(比如交换机,路由key,消息id等)
* @param properties 属性信息
* @param body 消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消息者1接受到的消息:"+newString(body,"UTF-8"));}};//监听消息(队列名称,是否自动确认消息,消费对象)
channel.basicConsume(Producer.FANOUT_QUEUE_1,true, consumer);}catch(IOException e){
e.printStackTrace();}catch(TimeoutException e){
e.printStackTrace();}}}
消费者2
publicclassConsumer2{publicstaticvoidmain(String[] args){try{Channel channel =ConnectUtil.getChannel();
channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null);
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);//把队列和交换机绑定 队列名称,交换机名称,路由key
channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");//接受消息DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 消费回调函数,当收到消息以后,会自动执行这个方法
* @param consumerTag 消费者标识
* @param envelope 消息包的内容(比如交换机,路由key,消息id等)
* @param properties 属性信息
* @param body 消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消息者2接受到的消息:"+newString(body,"UTF-8"));}};//监听消息(队列名称,是否自动确认消息,消费对象)
channel.basicConsume(Producer.FANOUT_QUEUE_2,true, consumer);}catch(IOException e){
e.printStackTrace();}catch(TimeoutException e){
e.printStackTrace();}}}
3.4测试
版权归原作者 不断前进的皮卡丘 所有, 如有侵权,请联系我们删除。