前 言
🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端
☕专栏简介:深入、全面、系统的介绍消息中间件
🌰 文章简介:本文将介绍RabbitMQ的工作模式
🍓文章推荐:【消息中间件】1小时快速上手RabbitMQ
文章目录
上一篇文章已经介绍RabbitMQ的基本概念、安装、管控台使用和基于简单模式的helloworld。这篇文章将介绍RabbitMQ的其它工作模式。
1.WorkQueue工作队列模式
代码实现也很简单,只需要多一个消费者即可。
生产者
publicclassProducer_WorkQueues{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,IOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/for(int i =1; i <=10; i++){String body = i+"hello rabbitmq~~~";//6. 发送消息
channel.basicPublish("","work_queues",null,body.getBytes());}//7.释放资源
channel.close();
connection.close();}}
消费者1.
publicclassConsumer_WorkQueues1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));}};
channel.basicConsume("work_queues",true,consumer);//关闭资源?不要}}
消费者2
publicclassConsumer_WorkQueues2{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException,IOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));}};
channel.basicConsume("work_queues",true,consumer);//关闭资源?不要}}
先启动consumer1,2;再启动producer,即可看到两个消费者会争抢消费生产者生产的消息。
小结下。
2.Pub/Sub订阅模式
生产者
publicclassProducer_PubSub{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/String exchangeName ="test_fanout";//5. 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);//6. 创建队列String queue1Name ="test_fanout_queue1";String queue2Name ="test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");String body ="日志信息:张三调用了findAll方法...日志级别:info...";//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());//9. 释放资源
channel.close();
connection.close();}}
消费者1
publicclassConsumer_PubSub1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name ="test_fanout_queue1";String queue2Name ="test_fanout_queue2";/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
channel.basicConsume(queue1Name,true,consumer);//关闭资源?不要}}
消费者2
publicclassConsumer_PubSub2{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name ="test_fanout_queue1";String queue2Name ="test_fanout_queue2";/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));System.out.println("将日志信息保存到数据库.....");}};
channel.basicConsume(queue2Name,true,consumer);//关闭资源?不要}}
启动生产者,可以看到rabbitmq主控台是这样的。
启动消费者1
启动消费者2.
主控台是这样的。
3.Routing工作模式
下面实现下列需求,对于error级别的log输出到控制台并保存到数据库,其它级别的log打印到控制台。
实现如下。
生产者。
/**
* 发送消息
*/publicclassProducer_Routing{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/String exchangeName ="test_direct";//5. 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);//6. 创建队列String queue1Name ="test_direct_queue1";String queue2Name ="test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*///队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");String body ="日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";//8. 发送消息
channel.basicPublish(exchangeName,"warning",null,body.getBytes());//9. 释放资源
channel.close();
connection.close();}}
消费者1.
publicclassConsumer_Routing1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name ="test_direct_queue1";String queue2Name ="test_direct_queue2";/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));System.out.println("将日志信息打印到控制台.....");}};
channel.basicConsume(queue2Name,true,consumer);//关闭资源?不要}}
消费者2
publicclassConsumer_Routing2{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name ="test_direct_queue1";String queue2Name ="test_direct_queue2";/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));System.out.println("将日志信息存储到数据库.....");}};
channel.basicConsume(queue1Name,true,consumer);//关闭资源?不要}}
请读者自测。
4.Topics模式
看图说明一切
实现如下需求。对Q1,error级别的信息,order系统的信息存入数据库;对Q2都打到控制台。
生产者。
/**
* 发送消息
*/publicclassProducer_Topics{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/String exchangeName ="test_topic";//5. 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);//6. 创建队列String queue1Name ="test_topic_queue1";String queue2Name ="test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/// routing key 系统的名称.日志的级别。//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");String body ="日志信息:张三调用了findAll方法...日志级别:info...";//8. 发送消息
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());//9. 释放资源
channel.close();
connection.close();}}
消费者1.
publicclassConsumer_Topic1{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name ="test_topic_queue1";String queue2Name ="test_topic_queue2";/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));System.out.println("将日志信息存入数据库.......");}};
channel.basicConsume(queue1Name,true,consumer);//关闭资源?不要}}
消费者2
publicclassConsumer_Topic2{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//1.创建连接工厂ConnectionFactory factory =newConnectionFactory();//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/ittest");//虚拟机 默认值/
factory.setUsername("guest");//用户名 默认 guest
factory.setPassword("guest");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name ="test_topic_queue1";String queue2Name ="test_topic_queue2";/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/// 接收消息Consumer consumer =newDefaultConsumer(channel){/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/System.out.println("body:"+newString(body));System.out.println("将日志信息打印控制台.......");}};
channel.basicConsume(queue2Name,true,consumer);//关闭资源?不要}}
请读者自测。
5.总结
总结下。
版权归原作者 半旧518 所有, 如有侵权,请联系我们删除。