0


java操作RabbitMQ

原生java操作RabbitMQ

导入jar依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>

代码结构
在这里插入图片描述

工具类MQUtil.java

packagecom.example;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * 参考: https://blog.csdn.net/zuzhiang/article/details/117618105
 */publicclassMQUtil{privatestaticConnection connection;publicstaticConnectiongetConnection()throwsException{ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
        connectionFactory.setVirtualHost("/ems");

        connection = connectionFactory.newConnection();return connection;}publicstaticvoidcloseChannelAndConnection(Channel channel,Connection connection)throwsException{
        channel.close();
        connection.close();}}

(一)基础模式

生产者 MqProduct .java

packagecom.example.basic;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * 基础模式
 */publicclassMqProduct{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();

        channel.queueDeclare("hello_queue",false,false,false,null);//第二个参数为队列名(idea编辑器提示有错误)
        channel.basicPublish("","hello_queue",null,"hello RabbitMQ!".getBytes());
        channel.close();
        connection.close();}}

消费者 MqConsumer .java

packagecom.example.basic;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassMqConsumer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
        connectionFactory.setVirtualHost("/ems");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("hello_queue",false,false,false,null);
        channel.basicConsume("hello_queue",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("收到消息: "+newString(body));}});}}

(二)工作模式

生产者 ProductMQ .java

packagecom.example.workQueue;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
 * work 模式下, 会根据消费能力,能者多劳.效率高的消费者consumer2消费较多的消息, 消息较低的consumer1消费较少的消息
 */publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue",true,false,false,null);for(int i =0; i <10; i++){
            channel.basicPublish("","work_queue",null,(i +" hello work queue!!!").getBytes());}MQUtil.closeChannelAndConnection(channel, connection);}}

消费者1 ConsumerMQ1.java

packagecom.example.workQueue;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ1{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();finalChannel channel = connection.createChannel();// 每次只能消费1个消息
        channel.basicQos(1);

        channel.queueDeclare("work_queue",true,false,false,null);//关闭自动确认机制
        channel.basicConsume("work_queue",false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{try{Thread.sleep(1000);}catch(InterruptedException e){
                    e.printStackTrace();}System.out.println("MQ-1 接收到了消息, 消息是: "+newString(body));System.out.println(envelope.getDeliveryTag());System.out.println("**************");// 手动确认// 参数:确认队列中哪个具体消息、是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);}});}}

消费者2 ConsumerMQ2.java

packagecom.example.workQueue;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();

        channel.queueDeclare("work_queue",true,false,false,null);
        channel.basicConsume("work_queue",true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("MQ-2 接收到了消息, 消息是: "+newString(body));}});}}

(三)fanout模式

生产者 ProductMQ .java

packagecom.example.fanout;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
 * fanout 模式下, consumer1和consumer2都会收到消息
 */publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs","fanout");
        channel.basicPublish("logs","",null,"fanout type message!".getBytes());MQUtil.closeChannelAndConnection(channel, connection);}}

消费者1 ConsumerMQ.java

packagecom.example.fanout;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs","fanout");//临时队列String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs","");
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("收到消息: "+newString(body));}});}}

消费者2 ConsumerMQ2.java

packagecom.example.fanout;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs","fanout");//临时队列String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs","");
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("收到消息: "+newString(body));}});}}

(四)direct模式

生产者 ProductMQ.java

packagecom.example.direct;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
 * direct模式 基于routing_key发送消息
 */publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");//依次注释掉下面几个进行测试String routing_key ="info";// String routing_key = "error";// String routing_key = "warning";// String routing_key = "debug";
        channel.basicPublish("logs_direct", routing_key,null,("direct模式基于routing_key发送消息,routing_key是: ["+ routing_key +"]").getBytes());MQUtil.closeChannelAndConnection(channel, connection);}}

消费者1 ConsumerMQ1.java

packagecom.example.direct;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ1{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs_direct","error");
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: "+newString(body));}});}}

消费者2 ConsumerMQ2.java

packagecom.example.direct;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs_direct","direct");String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs_direct","info");
        channel.queueBind(queue,"logs_direct","warning");
        channel.queueBind(queue,"logs_direct","debug");
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者2: "+newString(body));}});}}

(五)topic模式

生产者 ProductMQ.java

packagecom.example.topic;importcom.example.MQUtil;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;/**
 * topic模式下动态routing_key
 */publicclassProductMQ{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics","topic");String routing_key ="user.save";// String routing_key = "user.save.item";// String routing_key = "user.save.item.good_id";
        channel.basicPublish("topics", routing_key,null,("topic模式下动态模型: ["+ routing_key +"]").getBytes());MQUtil.closeChannelAndConnection(channel, connection);}}

消费者1 ConsumerMQ1.java

packagecom.example.topic;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ1{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics","topic");String queue = channel.queueDeclare().getQueue();// #号可以匹配任意多个String routing_key ="user.#";
        channel.queueBind(queue,"topics", routing_key);
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者1: 当前为'#'符号 "+newString(body));}});}}

消费者2 ConsumerMQ2.java

packagecom.example.topic;importcom.example.MQUtil;importcom.rabbitmq.client.*;importjava.io.IOException;publicclassConsumerMQ2{publicstaticvoidmain(String[] args)throwsException{Connection connection =MQUtil.getConnection();Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics","topic");String queue = channel.queueDeclare().getQueue();//*号只能匹配一个String routing_key ="user.*";
        channel.queueBind(queue,"topics", routing_key);
        channel.basicConsume(queue,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("消费者2: 当前为'*'符号 "+newString(body));}});}}

springboot操作RabbitMQ

(一)基础模式

生产者

importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes =MainApplication.class)@RunWith(SpringRunner.class)publicclassTestRabbitMQ{// 注入RabbitTemplate@AutowiredprivateRabbitTemplate rabbitTemplate;// hello world@Testpublicvoidtest(){//参数2表示: 队列名称(idea编辑器提示有误, 总是提示第二个参数是routingKey,真实是队列名称)
        rabbitTemplate.convertAndSend("","hello","我是发送消息端");System.out.println("发送完成...");}}

消费者

importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component// 指定监听hello队列// 默认是持久化、非独占、不自动删除队列的@RabbitListener(queuesToDeclare =@Queue(value ="hello", durable ="false", exclusive ="false", autoDelete ="true"))publicclassConsumer{@RabbitHandlerpublicvoidreceive(String message){System.out.println("message: "+ message);}}

操作说明: 重启服务会自动加载消费者程序, 然后再启动生产者的测试代码,控制台就可以看到消息了

(二)work模式

生产者

importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.core.env.Environment;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassHelloController{@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/test")publicvoidtestWork(){for(int i =0; i <10; i++){
            rabbitTemplate.convertAndSend("work","work模型"+ i);}}}

消费者

importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassWorkConsumer{// 第1个消费者@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceive1(String message){System.out.println("message1: "+ message);}// 第2个消费者@RabbitListener(queuesToDeclare =@Queue("work"))publicvoidreceive2(String message){System.out.println("message2: "+ message);}}

使用说明: 重启服务后,自动加载@Component注解下面的两个消费者代码; 然后浏览器访问 http://127.0.0.1:8080/test 得到控制台打印如下:
在这里插入图片描述

(三)Fanout广播模型

生产者


本文转载自: https://blog.csdn.net/shj_php/article/details/129853213
版权归原作者 hello php 所有, 如有侵权,请联系我们删除。

“java操作RabbitMQ”的评论:

还没有评论