原生java操作RabbitMQ
导入jar依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>
代码结构
工具类MQUtil.java
packagecom.example;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 参考: https://blog.csdn.net/zuzhiang/article/details/117618105
*/publicclassMQUtil{privatestaticConnection connection;publicstaticConnectiongetConnection()throwsException{ConnectionFactory connectionFactory =newConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
connectionFactory.setVirtualHost("/ems");
connection = connectionFactory.newConnection();return connection;}publicstaticvoidcloseChannelAndConnection(Channel channel,Connection connection)throwsException{
channel.close();
connection.close();}}
(一)基础模式
生产者 MqProduct .java
packagecom.example.basic;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 基础模式
*/publicclassMqProduct{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory connectionFactory =newConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare("hello_queue",false,false,false,null);//第二个参数为队列名(idea编辑器提示有错误)
channel.basicPublish("","hello_queue",null,"hello RabbitMQ!".getBytes());
channel.close();
connection.close();}}
消费者 MqConsumer .java
packagecom.example.basic;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassMqConsumer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory connectionFactory =newConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
connectionFactory.setVirtualHost("/ems");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare("hello_queue",false,false,false,null);
channel.basicConsume("hello_queue",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("收到消息: "+newString(body));}});}}
(二)工作模式
生产者 ProductMQ .java
packagecom.example.workQueue;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
* work 模式下, 会根据消费能力,能者多劳.效率高的消费者consumer2消费较多的消息, 消息较低的consumer1消费较少的消息
*/publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work_queue",true,false,false,null);for(int i =0; i <10; i++){
channel.basicPublish("","work_queue",null,(i +" hello work queue!!!").getBytes());}MQUtil.closeChannelAndConnection(channel, connection);}}
消费者1 ConsumerMQ1.java
packagecom.example.workQueue;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ1{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();finalChannel channel = connection.createChannel();// 每次只能消费1个消息
channel.basicQos(1);
channel.queueDeclare("work_queue",true,false,false,null);//关闭自动确认机制
channel.basicConsume("work_queue",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{Thread.sleep(1000);}catch(InterruptedException e){
e.printStackTrace();}System.out.println("MQ-1 接收到了消息, 消息是: "+newString(body));System.out.println(envelope.getDeliveryTag());System.out.println("**************");// 手动确认// 参数:确认队列中哪个具体消息、是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);}});}}
消费者2 ConsumerMQ2.java
packagecom.example.workQueue;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work_queue",true,false,false,null);
channel.basicConsume("work_queue",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("MQ-2 接收到了消息, 消息是: "+newString(body));}});}}
(三)fanout模式
生产者 ProductMQ .java
packagecom.example.fanout;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
* fanout 模式下, consumer1和consumer2都会收到消息
*/publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","fanout");
channel.basicPublish("logs","",null,"fanout type message!".getBytes());MQUtil.closeChannelAndConnection(channel, connection);}}
消费者1 ConsumerMQ.java
packagecom.example.fanout;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","fanout");//临时队列String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","");
channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("收到消息: "+newString(body));}});}}
消费者2 ConsumerMQ2.java
packagecom.example.fanout;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","fanout");//临时队列String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","");
channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("收到消息: "+newString(body));}});}}
(四)direct模式
生产者 ProductMQ.java
packagecom.example.direct;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
* direct模式 基于routing_key发送消息
*/publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");//依次注释掉下面几个进行测试String routing_key ="info";// String routing_key = "error";// String routing_key = "warning";// String routing_key = "debug";
channel.basicPublish("logs_direct", routing_key,null,("direct模式基于routing_key发送消息,routing_key是: ["+ routing_key +"]").getBytes());MQUtil.closeChannelAndConnection(channel, connection);}}
消费者1 ConsumerMQ1.java
packagecom.example.direct;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ1{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs_direct","error");
channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});}}
消费者2 ConsumerMQ2.java
packagecom.example.direct;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","warning");
channel.queueBind(queue,"logs_direct","debug");
channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者2: "+newString(body));}});}}
(五)topic模式
生产者 ProductMQ.java
packagecom.example.topic;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
* topic模式下动态routing_key
*/publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");String routing_key ="user.save";// String routing_key = "user.save.item";// String routing_key = "user.save.item.good_id";
channel.basicPublish("topics", routing_key,null,("topic模式下动态模型: ["+ routing_key +"]").getBytes());MQUtil.closeChannelAndConnection(channel, connection);}}
消费者1 ConsumerMQ1.java
packagecom.example.topic;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ1{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");String queue = channel.queueDeclare().getQueue();// #号可以匹配任意多个String routing_key ="user.#";
channel.queueBind(queue,"topics", routing_key);
channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: 当前为'#'符号 "+newString(body));}});}}
消费者2 ConsumerMQ2.java
packagecom.example.topic;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");String queue = channel.queueDeclare().getQueue();//*号只能匹配一个String routing_key ="user.*";
channel.queueBind(queue,"topics", routing_key);
channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者2: 当前为'*'符号 "+newString(body));}});}}
springboot操作RabbitMQ
(一)基础模式
生产者
importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes =MainApplication.class)@RunWith(SpringRunner.class)publicclassTestRabbitMQ{// 注入RabbitTemplate@AutowiredprivateRabbitTemplate rabbitTemplate;// hello world@Testpublicvoidtest(){//参数2表示: 队列名称(idea编辑器提示有误, 总是提示第二个参数是routingKey,真实是队列名称)
rabbitTemplate.convertAndSend("","hello","我是发送消息端");System.out.println("发送完成...");}}
消费者
importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component// 指定监听hello队列// 默认是持久化、非独占、不自动删除队列的@RabbitListener(queuesToDeclare =@Queue(value ="hello", durable ="false", exclusive ="false", autoDelete ="true"))publicclassConsumer{@RabbitHandlerpublicvoidreceive(String message){System.out.println("message: "+ message);}}
操作说明: 重启服务会自动加载消费者程序, 然后再启动生产者的测试代码,控制台就可以看到消息了
(二)work模式
生产者
importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.core.env.Environment;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassHelloController{@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/test")publicvoidtestWork(){for(int i =0; i <10; i++){
rabbitTemplate.convertAndSend("work","work模型"+ i);}}}
消费者
importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassWorkConsumer{// 第1个消费者@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceive1(String message){System.out.println("message1: "+ message);}// 第2个消费者@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceive2(String message){System.out.println("message2: "+ message);}}
使用说明: 重启服务后,自动加载@Component注解下面的两个消费者代码; 然后浏览器访问 http://127.0.0.1:8080/test 得到控制台打印如下:
(三)Fanout广播模型
生产者
版权归原作者 hello php 所有, 如有侵权,请联系我们删除。