RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用,不做讲解说明)
一、简单模式(Simple)
特点:①一个生产者对应一个消费者,通过队列进行消息传递。
②该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。
图解:
二、工作队列模式(Work Queue)
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:
①一个队列对应多个消费者。
②一条消息只会被一个消费者消费。
③消息队列默认采用轮询的方式将消息平均发送给消费者。
图解:

三、发布订阅模式(Publish/Subscribe)
在开发过程中,有一些消息需要不同消费者进行不同的处理
特点:
①生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
②工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。
图解

四、路由模式(Routing)
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。特点:
①每个队列绑定路由关键字RoutingKey
②生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。
图解:

五、 通配符模式(Topics)
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
①消息设置RoutingKey时,RoutingKey由多个单词构成,中间以
.
分割。
②队列设置RoutingKey时,
#
可以匹配任意多个单词,
*
可以匹配任意一个单词。
六、Java实现五种模式
简单模式生产者
package com.tmh.mq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * rabbitmq工作模式:简单模式,消息生产者 */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、创建连接 Connection connection = connectionFactory.newConnection(); //3、创建信道 Channel channel = connection.createChannel(); //4、创建队列 /** * 参数一:队列名称 * 参数二:是否持久化,true表示当MQ重启后队列还在 * 参数三:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问 * 参数四:是否自动删除 * 参数五:其他参数 */ channel.queueDeclare("simplqueue", false,false,false,null); //5、发送消息 String messges="hello simple queue"; /** * 参数1:交换机名,""表示默认交换机 * 参数2:路由键,简单模式就是队列名 * 参数3:其他额外参数 * 参数4:要传递的消息字节数组 */ channel.basicPublish("","simplqueue",null,messges.getBytes()); //6、关闭信道和连接 channel.close(); connection.close(); System.out.println("消息发送成功"); } }简单模式消费者
package com.tmh.mq.simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //simple队列消息消费者 public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、创建连接 Connection connection = connectionFactory.newConnection(); //3、创建信道 Channel channel = connection.createChannel(); //4、监听队列 /** * 参数1:监听的队列名 * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息 * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费 */ channel.basicConsume("simplqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String messge = new String(body, "UTF-8"); System.out.println("接收消息,消息为:"+messge); } }); } }
工作队列模式生产者
package com.tmh.mq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; //工作队列模式 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //建立连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列,如果队列已经存在,则使用该队列 channel.queueDeclare("workqueue",true,false,false,null); //发送大量消息 for (int i = 0; i <100 ; i++) { channel.basicPublish("","workqueue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("hello 这是今天的第"+(i+1)+"条消息").getBytes()); } //关闭资源 channel.close(); connection.close(); } }工作模式消费者
package com.tmh.mq.work; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //建立连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //监听队列 channel.basicConsume("workqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者1消费消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //建立连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //监听队列 channel.basicConsume("workqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者2消费消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class Consumer3 { public static void main(String[] args) throws IOException, TimeoutException { //创建工厂连接 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //建立连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //监听队列 channel.basicConsume("workqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者1消费消息:"+s); } }); } }
发布订阅模式生产者
package com.tmh.mq.publish; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、创建交换机 /** * 参数一:交换机名 * 参数二:交换机类型 * 参数三:交换机持久化 */ channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true); //5、创建队列 channel.queueDeclare("MAIL_QUEUE", true,false,false,null); channel.queueDeclare("MESSAGE_QUEUE", true,false,false,null); channel.queueDeclare("STATION_QUEUE", true,false,false,null); //6、队列绑定交换机 /** * 参数一:队列名 * 参数二:交换机名字 * 参数三:路由关键字,发布订阅模式写“”即可 */ channel.queueBind("MAIL_QUEUE","exchange_fanout",""); channel.queueBind("MESSAGE_QUEUE","exchange_fanout",""); channel.queueBind("STATION_QUEUE","exchange_fanout",""); //7、发送消息 for (int i = 1; i <=10 ; i++) { channel.basicPublish("exchange_fanout","",null, ("你好,发布订阅模式"+i).getBytes(StandardCharsets.UTF_8)); } //8、关闭资源 channel.close(); connection.close(); } }发布订阅模式消费者
package com.tmh.mq.publish; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerMail { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("MAIL_QUEUE",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("邮件消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class ConsumerMessage { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("MESSAGE_QUEUE",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("短信消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("STATION_QUEUE",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("站内消息:"+s); } }); } }
路由模式生产者
package com.tmh.mq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、创建交换机 /** * 参数一:交换机名 * 参数二:交换机类型 * 参数三:交换机持久化 */ channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true); //5、创建队列 channel.queueDeclare("MAIL_QUEUE2", true,false,false,null); channel.queueDeclare("MESSAGE_QUEUE2", true,false,false,null); channel.queueDeclare("STATION_QUEUE2", true,false,false,null); //6、队列绑定交换机 /** * 参数一:队列名 * 参数二:交换机名字 * 参数三:路由关键字,发布订阅模式写“”即可 */ channel.queueBind("MAIL_QUEUE2","exchange_routing","important"); channel.queueBind("MESSAGE_QUEUE2","exchange_routing","important"); channel.queueBind("STATION_QUEUE2","exchange_routing","important"); channel.queueBind("STATION_QUEUE2","exchange_routing","normal"); //7、发送消息 channel.basicPublish("exchange_routing","important",null, ("你好,路由模式,这是重要消息").getBytes(StandardCharsets.UTF_8)); channel.basicPublish("exchange_routing","normal",null, ("你好,路由模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8)); //8、关闭资源 channel.close(); connection.close(); } }路由模式消费者
package com.tmh.mq.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerMail { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("MAIL_QUEUE2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("邮件消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class ConsumerMessage { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("MESSAGE_QUEUE2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("短信消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("STATION_QUEUE2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("站内消息:"+s); } }); } }
通配符模式 生产者
package com.tmh.mq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、创建交换机 /** * 参数一:交换机名 * 参数二:交换机类型 * 参数三:交换机持久化 */ channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true); //5、创建队列 channel.queueDeclare("MAIL_QUEUE3", true,false,false,null); channel.queueDeclare("MESSAGE_QUEUE3", true,false,false,null); channel.queueDeclare("STATION_QUEUE3", true,false,false,null); //6、队列绑定交换机 /** * 参数一:队列名 * 参数二:交换机名字 * 参数三:路由关键字,发布订阅模式写“”即可 */ channel.queueBind("MAIL_QUEUE3","exchange_topic","#.mail.#"); channel.queueBind("MESSAGE_QUEUE3","exchange_topic","#.message.#"); channel.queueBind("STATION_QUEUE3","exchange_topic","#.station.#"); //7、发送消息 channel.basicPublish("exchange_topic","mail.message.station",null, ("你好,通配符模式,这是重要消息").getBytes(StandardCharsets.UTF_8)); channel.basicPublish("exchange_topic","station",null, ("你好,通配符模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8)); } }通配符模式消费者
package com.tmh.mq.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerMail { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("MAIL_QUEUE3",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("邮件消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class ConsumerMessage { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("MESSAGE_QUEUE3",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("短信消息:"+s); } }); } } ----------------------------------------------------------------------------------------- public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { //1、创建连接工厂 final ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.8"); connectionFactory.setPort(5672); connectionFactory.setUsername("tangminghao"); connectionFactory.setPassword("tangminghao"); connectionFactory.setVirtualHost("/"); //2、建立连接 final Connection connection = connectionFactory.newConnection(); //3、建立信道 final Channel channel = connection.createChannel(); //4、监听队列 channel.basicConsume("STATION_QUEUE3",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String s = new String(body, "UTF-8"); System.out.println("站内消息:"+s); } }); } }
在Java实现中有很多注释没有改过来,因为很多模式之间变化不大,所以就复制粘贴了😄~~~~~,引用时注意修改。
版权归原作者 网恋被骗四块五 所有, 如有侵权,请联系我们删除。