0


RabbitMQ工作模式

✨ RabbitMQ工作模式

📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

Work queues工作队列模式

基本介绍

在这里插入图片描述

  • 多个消费者共同消费同一个队列中的消息,可以实现快速消费,避免消息积压,但是多个消费者消费队列的消息的时候,是互斥的,同一个消息只能被一个消费者消费,不可以被多个消费者消费。
  • 应用:对于一些任务比较多的情况,使用工作队列可以提高任务处理的速度

编写代码

抽取公共部分,写一个工具类

  • 我们知道像连接等操作,其实基本上代码都是一样的,每一次写重复的代码其实没有什么意义,我们可以写一个工具类来封装这些操作。
publicclassConnectionUtil{publicstaticConnectiongetConnection()throwsException{//定义连接工厂ConnectionFactory factory =newConnectionFactory();//设置服务地址
        factory.setHost("192.168.137.118");//端口
        factory.setPort(5672);//设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}

生产者

publicclassProducer{staticfinalString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);for(int i =1; i <=10; i++){String body = i+"hello rabbitmq~~~";
            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());}
        channel.close();
        connection.close();}}

在这里插入图片描述

消费者1

publicclassConsumer1{staticfinalString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));}};
        channel.basicConsume(QUEUE_NAME,true,consumer);}}

消费者2

代码和上面相同

测试

我们先启动两个消费者,然后再启动生产者发送消息,到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。

发布订阅模式

订阅模式类型

在这里插入图片描述

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特定队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型: - Fanout:广播,将消息交给所有绑定到交换机的队列- Direct:定向,把消息交给符合指定routing key 的队列- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • 发布订阅模式 - 每个消费者都监听自己的队列- 生产者把消息发送给broker,然后交换机把消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
  • 交换机只负责转发消息,并没有存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

代码编写

生产者

publicclassProducer{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();/*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */String exchangeName ="test_fanout";//5. 创建交换机
        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);//6. 创建队列String queue1Name ="test_fanout_queue1";String queue2Name ="test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");String body ="日志信息:张三调用了findAll方法...日志级别:info...";//8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());//9. 释放资源
        channel.close();
        connection.close();}}

消费者1

publicclassConsumer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name ="test_fanout_queue1";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
        channel.basicConsume(queue1Name,true,consumer);}}

消费者2

publicclassConsumer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name ="test_fanout_queue2";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
        channel.basicConsume(queue2Name,true,consumer);}}

测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;达到广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
在这里插入图片描述

发布订阅模式和工作队列模式的区别

  • 工作队列模式不需要定义交换机,而发布订阅模式需要定义交换机
  • 发布订阅模式需要设置队列和交换机的绑定,而工作队列模式不需要设置,事实上是因为工作队列模式会把队列绑定到默认的交换机
  • 发布订阅模式中,生产者向交换机发送消息;工作队列模式则是生产者向队列发送消息(底层使用默认交换机)

Routing 路由模式

基本介绍

在这里插入图片描述

  • P:生产者,向交换机发送消息的时候,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列
  • C1:消费者,它所在队列指定了需要routing key为error的信息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

路由模式的特点

  • 队列和交换机的绑定是需要指定routing key的,不可以随意绑定
  • 消息的发送方向交换机发送消息的时候,也需要指定消息的routing key
  • 交换机不再把消息交给每一个绑定的队列,而是根据消息的routing key来进行判断,只有队列的routing key和消息的routing key完全一样才会接收到消息。

编写代码

生产者

publicclassProducer{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName ="test_direct";// 创建交换机
       channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);// 创建队列String queue1Name ="test_direct_queue1";String queue2Name ="test_direct_queue2";// 声明(创建)队列
       channel.queueDeclare(queue1Name,true,false,false,null);
       channel.queueDeclare(queue2Name,true,false,false,null);// 队列绑定交换机// 队列1绑定error
        channel.queueBind(queue1Name,exchangeName,"error");// 队列2绑定info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");String message ="日志信息:张三调用了delete方法.错误了,日志级别warning";// 发送消息
        channel.basicPublish(exchangeName,"warning",null,message.getBytes());System.out.println(message);

        channel.close();
        connection.close();}}

运行在这里插入图片描述

在这里插入图片描述

消费者1

publicclassConsumer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name ="test_direct_queue1";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
        channel.basicConsume(queue1Name,true,consumer);}}

消费者2

publicclassConsumer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name ="test_direct_queue2";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息存储到数据库.....");}};
        channel.basicConsume(queue2Name,true,consumer);}}

Topics 通配符模式

基本介绍

  • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则: - #:匹配0个或者多个词- *:刚好可以匹配一个词

在这里插入图片描述
在这里插入图片描述

代码

生产者

publicclassProducer{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName ="test_topic";
        channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);String queue1Name ="test_topic_queue1";String queue2Name ="test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);// 绑定队列和交换机/**
         *  参数:
             1. queue:队列名称
             2. exchange:交换机名称
             3. routingKey:路由键,绑定规则
                 如果交换机的类型为fanout ,routingKey设置为""
         */// routing key  系统的名称.日志的级别。//需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");String body ="日志信息:张三调用了findAll方法...日志级别:info...";//发送消息goods.info,goods.error
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
        channel.close();
        connection.close();}}

在这里插入图片描述

消费者1

publicclassConsumer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name ="test_topic_queue1";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));}};
        channel.basicConsume(queue1Name,true,consumer);}}

消费者2

publicclassConsumer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name ="test_topic_queue2";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));}};
        channel.basicConsume(queue2Name,true,consumer);}}

本文转载自: https://blog.csdn.net/qq_52797170/article/details/127049754
版权归原作者 不断前进的皮卡丘 所有, 如有侵权,请联系我们删除。

“RabbitMQ工作模式”的评论:

还没有评论