0


3、RabbitMQ_工作模式

一、简单模式

简介

简单模式 HelloWorld。一个生产者、一个消费者,不需要设置交换机使用默认的交换机。

代码示例

  • 生产者publicclassProducer{//队列名称privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] args){//建立连接工厂ConnectionFactory factory =newConnectionFactory();//设置目标主机ip factory.setHost("192.168.47.128");//设置账号名密码 factory.setUsername("yf"); factory.setPassword("123456");// 修改端口的设置// factory.setPort();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){//通道和队列的连接/* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数说明: queue:队列名称 durable:是否持久化 exclusive:是否独占,是否一个消费者监听一个队列 autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列 arguments:参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null);for(int i =0; i <10; i++){//需要发送的消息String message ="Hello RabbitMQ!"+i;//通过最基础的发布/* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数说明: exchange:指定交换机,如果使用默认模式,就使用“” routingKey:路由名称 props:配置信息 body:发送的消息(要求字节数组) */ channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");}}catch(Exception e){ e.printStackTrace();}}}
  • 消费者/** * 服务端,接收信息 */publicclassConsumer{//指定接收队列名称privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =newConnectionFactory();//设置目标主机ip factory.setHost("192.168.47.128");//设置用户密码 factory.setUsername("yf"); factory.setPassword("123456");//建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback =newDeliverCallback(){@Overridepublicvoidhandle(String consumerTag,Delivery message)throwsIOException{String msg =newString(message.getBody(),"UTF-8");System.out.println(" [x] Received '"+ msg +"'");}}; channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{});}}

二、工作队列模式

简介

  • 工作队列与简单模式相比,一个生产者、多个消费者(排它关系),多个消费端共同消费同一个队列中的消息
  • 使用场景:对于消息生产速度大于消费速度场景,可以增加消费者减少单个消费者压力

代码示例

  • 生产者publicclassProducer{//队列名称privatefinalstaticStringQUEUE_NAME="work_queues";publicstaticvoidmain(String[] args){//建立连接工厂ConnectionFactory factory =newConnectionFactory();//设置目标主机ip factory.setHost("192.168.47.128");//设置账号名密码 factory.setUsername("yf"); factory.setPassword("123456");// 修改端口的设置// factory.setPort();try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){//通道和队列的连接/* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数说明: queue:队列名称 durable:是否持久化 exclusive:是否独占,是否一个消费者监听一个队列 autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列 arguments:参数 */// channel.basicQos(1);// 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息 channel.queueDeclare(QUEUE_NAME,false,false,false,null);for(int i =0; i <10; i++){//需要发送的消息String message ="Hello RabbitMQ!"+i;//通过最基础的发布/* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数说明: exchange:指定交换机,如果使用默认模式,就使用“” routingKey:路由名称 props:配置信息 body:发送的消息(要求字节数组) */ channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");}}catch(Exception e){ e.printStackTrace();}}}
  • 消费者/** * 服务端,接收信息 */publicclassConsumer1{//指定接收队列名称privatefinalstaticStringQUEUE_NAME="work_queues";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =newConnectionFactory();//设置目标主机ip factory.setHost("192.168.47.128");//设置用户密码 factory.setUsername("yf"); factory.setPassword("123456");//建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel(); channel.basicQos(1);// 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息 channel.queueDeclare(QUEUE_NAME,false,false,false,null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback =newDeliverCallback(){@Overridepublicvoidhandle(String consumerTag,Delivery message)throwsIOException{String msg =newString(message.getBody(),"UTF-8");System.out.println(" [x] Received '"+ msg +"'");}};/* 这种写法和上面是一样的 使用的是lambda表达式 DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println(" [x] Received '" + msg + "'"); }; */ channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{});}}

小结

  • 分发机制:轮询分发机制 - 也就是说当生产者生产了10条消息,2个消费者分别消费5条消息。
  • 应用场景:同一条消息,在多个消费者之间只能有一个消费,应用于只需要单节点消费的场景 - 发送验证码- 发送生日提醒

三、发布订阅模式(Publish/Subscribe)

简介

  • 在订阅模型中,多了一个Exchange 角色:
  • Exchange:交换机(X)。接收生产者发送的消息; 处理投递消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。具体操作根据交换机类型来定义: - Fanout:广播,将消息交给所有绑定到交换机的队列- Direct:定向,把消息交给符合指定routing key 的队列- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

代码示例

  • 生产者

  • 消费者


该文章还没写完,先发布出来,后面会持续更新!!!

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/Byron__/article/details/136988756
版权归原作者 Byron丶 所有, 如有侵权,请联系我们删除。

“3、RabbitMQ_工作模式”的评论:

还没有评论