✨ RabbitMQ工作模式
📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件
Work queues工作队列模式
基本介绍
- 多个消费者共同消费同一个队列中的消息,可以实现快速消费,避免消息积压,但是多个消费者消费队列的消息的时候,是互斥的,同一个消息只能被一个消费者消费,不可以被多个消费者消费。
- 应用:对于一些任务比较多的情况,使用工作队列可以提高任务处理的速度
编写代码
抽取公共部分,写一个工具类
- 我们知道像连接等操作,其实基本上代码都是一样的,每一次写重复的代码其实没有什么意义,我们可以写一个工具类来封装这些操作。
publicclassConnectionUtil{publicstaticConnectiongetConnection()throwsException{//定义连接工厂ConnectionFactory factory =newConnectionFactory();//设置服务地址
factory.setHost("192.168.137.118");//端口
factory.setPort(5672);//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
生产者
publicclassProducer{staticfinalString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);for(int i =1; i <=10; i++){String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());}
channel.close();
connection.close();}}
消费者1
publicclassConsumer1{staticfinalString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));}};
channel.basicConsume(QUEUE_NAME,true,consumer);}}
消费者2
代码和上面相同
测试
我们先启动两个消费者,然后再启动生产者发送消息,到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
发布订阅模式
订阅模式类型
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特定队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型: - Fanout:广播,将消息交给所有绑定到交换机的队列- Direct:定向,把消息交给符合指定routing key 的队列- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- 发布订阅模式 - 每个消费者都监听自己的队列- 生产者把消息发送给broker,然后交换机把消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
- 交换机只负责转发消息,并没有存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
代码编写
生产者
publicclassProducer{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/String exchangeName ="test_fanout";//5. 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);//6. 创建队列String queue1Name ="test_fanout_queue1";String queue2Name ="test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");String body ="日志信息:张三调用了findAll方法...日志级别:info...";//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());//9. 释放资源
channel.close();
connection.close();}}
消费者1
publicclassConsumer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name ="test_fanout_queue1";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
channel.basicConsume(queue1Name,true,consumer);}}
消费者2
publicclassConsumer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name ="test_fanout_queue2";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
channel.basicConsume(queue2Name,true,consumer);}}
测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;达到广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
发布订阅模式和工作队列模式的区别
- 工作队列模式不需要定义交换机,而发布订阅模式需要定义交换机
- 发布订阅模式需要设置队列和交换机的绑定,而工作队列模式不需要设置,事实上是因为工作队列模式会把队列绑定到默认的交换机
- 发布订阅模式中,生产者向交换机发送消息;工作队列模式则是生产者向队列发送消息(底层使用默认交换机)
Routing 路由模式
基本介绍
- P:生产者,向交换机发送消息的时候,会指定一个routing key
- X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列
- C1:消费者,它所在队列指定了需要routing key为error的信息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
路由模式的特点
- 队列和交换机的绑定是需要指定routing key的,不可以随意绑定
- 消息的发送方向交换机发送消息的时候,也需要指定消息的routing key
- 交换机不再把消息交给每一个绑定的队列,而是根据消息的routing key来进行判断,只有队列的routing key和消息的routing key完全一样才会接收到消息。
编写代码
生产者
publicclassProducer{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName ="test_direct";// 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);// 创建队列String queue1Name ="test_direct_queue1";String queue2Name ="test_direct_queue2";// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);// 队列绑定交换机// 队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");String message ="日志信息:张三调用了delete方法.错误了,日志级别warning";// 发送消息
channel.basicPublish(exchangeName,"warning",null,message.getBytes());System.out.println(message);
channel.close();
connection.close();}}
运行
消费者1
publicclassConsumer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name ="test_direct_queue1";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
channel.basicConsume(queue1Name,true,consumer);}}
消费者2
publicclassConsumer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name ="test_direct_queue2";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));System.out.println("将日志信息存储到数据库.....");}};
channel.basicConsume(queue2Name,true,consumer);}}
Topics 通配符模式
基本介绍
- Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
- 通配符规则: - #:匹配0个或者多个词- *:刚好可以匹配一个词
代码
生产者
publicclassProducer{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName ="test_topic";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);String queue1Name ="test_topic_queue1";String queue2Name ="test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);// 绑定队列和交换机/**
* 参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/// routing key 系统的名称.日志的级别。//需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");String body ="日志信息:张三调用了findAll方法...日志级别:info...";//发送消息goods.info,goods.error
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
channel.close();
connection.close();}}
消费者1
publicclassConsumer1{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue1Name ="test_topic_queue1";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));}};
channel.basicConsume(queue1Name,true,consumer);}}
消费者2
publicclassConsumer2{publicstaticvoidmain(String[] args)throwsException{Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();String queue2Name ="test_topic_queue2";Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body:"+newString(body));}};
channel.basicConsume(queue2Name,true,consumer);}}
版权归原作者 不断前进的皮卡丘 所有, 如有侵权,请联系我们删除。