0


docker启动rabbitmq及使用

搜索rabbitmq镜像

docker search rabbitmq:management

在这里插入图片描述

下载镜像

docker pull rabbitmq:management

在这里插入图片描述

启动容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

在这里插入图片描述
在这里插入图片描述

打印容器

docker logs rabbitmq

在这里插入图片描述
在这里插入图片描述

访问RabbitMQ Management

http://localhost:15672
账户密码默认:guest
在这里插入图片描述

编写生产者类

packagecom.xun.rabbitmqdemo.example;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{privatefinalstaticString QUEUE_NAME ="hello";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =newConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();/**
         * 生成一个queue队列
         * 1、队列名称 QUEUE_NAME
         * 2、队列里面的消息是否持久化(默认消息存储在内存中)
         * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
         * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
         * 5、其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message ="Hello world!";/**
         * 发送一个消息
         * 1、发送到哪个exchange交换机
         * 2、路由的key
         * 3、其他的参数信息
         * 4、消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println(" [x] Sent '"+message+"'");

        channel.close();
        connection.close();}}

运行该方法,可以看到控制台的打印
在这里插入图片描述
name=hello的队列收到Message
在这里插入图片描述

消费者

packagecom.xun.rabbitmqdemo.example;importcom.rabbitmq.client.*;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassReceiver{privatefinalstaticString QUEUE_NAME ="hello";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory factory =newConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setConnectionTimeout(600000);//milliseconds
        factory.setRequestedHeartbeat(60);//seconds
        factory.setHandshakeTimeout(6000);//milliseconds
        factory.setRequestedChannelMax(5);
        factory.setNetworkRecoveryInterval(500);Connection connection = factory.newConnection();Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);System.out.println("Waiting for messages. ");Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String message =newString(body,"UTF-8");System.out.println(" [x] Received '"+ message +"'");}};
        channel.basicConsume(QUEUE_NAME,true,consumer);}}

在这里插入图片描述
在这里插入图片描述

工作队列

RabbitMqUtils工具类

packagecom.xun.rabbitmqdemo.utils;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassRabbitMqUtils{publicstaticChannelgetChannel()throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}}

启动2个工作线程

packagecom.xun.rabbitmqdemo.workQueue;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xun.rabbitmqdemo.utils.RabbitMqUtils;publicclassWork01{privatestaticfinalString QUEUE_NAME ="hello";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag,delivery)->{String receivedMessage =newString(delivery.getBody());System.out.println("接收消息:"+receivedMessage);};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费....");/**
         * 消费者消费消息
         * 1、消费哪个队列
         * 2、消费成功后是否自动应答
         * 3、消费的接口回调
         * 4、消费未成功的接口回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}
packagecom.xun.rabbitmqdemo.workQueue;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xun.rabbitmqdemo.utils.RabbitMqUtils;publicclassWork02{privatestaticfinalString QUEUE_NAME ="hello";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag,delivery)->{String receivedMessage =newString(delivery.getBody());System.out.println("接收消息:"+receivedMessage);};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C2 消费者启动等待消费....");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}

启动工作线程
在这里插入图片描述

启动发送线程

packagecom.xun.rabbitmqdemo.workQueue;importcom.rabbitmq.client.Channel;importcom.xun.rabbitmqdemo.utils.RabbitMqUtils;importjava.util.Scanner;publicclassTask01{privatestaticfinalString QUEUE_NAME ="hello";publicstaticvoidmain(String[] args)throwsException{try(Channel channel=RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台接收消息Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成:"+message);}}}}

启动发送线程,此时发送线程等待键盘输入
在这里插入图片描述
发送4个消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
可以看到2个工作线程按照顺序分别接收message。

消息应答机制

rabbitmq将message发送给消费者后,就会将该消息标记为删除。
但消费者在处理message过程中宕机,会导致消息的丢失。
因此需要设置手动应答。

生产者

importcom.xun.rabbitmqdemo.utils.RabbitMqUtils;importjava.util.Scanner;publicclassTask02{privatestaticfinalString TASK_QUEUE_NAME ="ack_queue";publicstaticvoidmain(String[] args)throwsException{try(Channel channel =RabbitMqUtils.getChannel()){
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);Scanner scanner =newScanner(System.in);System.out.println("请输入信息");while(scanner.hasNext()){String message = scanner.nextLine();
                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());System.out.println("生产者task02发出消息"+ message);}}}}

消费者

packagecom.xun.rabbitmqdemo.workQueue;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xun.rabbitmqdemo.utils.RabbitMqUtils;importcom.xun.rabbitmqdemo.utils.SleepUtils;publicclassWork03{privatestaticfinalString ACK_QUEUE_NAME ="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("Work03 等待接收消息处理时间较短");DeliverCallback deliverCallback =(consumerTag,delivery)->{String message =newString(delivery.getBody());SleepUtils.sleep(1);System.out.println("接收到消息:"+message);/**
             * 1、消息的标记tag
             * 2、是否批量应答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};//采用手动应答boolean autoAck =false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}}
packagecom.xun.rabbitmqdemo.workQueue;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xun.rabbitmqdemo.utils.RabbitMqUtils;importcom.xun.rabbitmqdemo.utils.SleepUtils;publicclassWork04{privatestaticfinalString ACK_QUEUE_NAME ="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("Work04 等待接收消息处理时间较长");DeliverCallback deliverCallback =(consumerTag,delivery)->{String message =newString(delivery.getBody());SleepUtils.sleep(30);System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};//采用手动应答boolean autoAck =false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}}

工具类SleepUtils

packagecom.xun.rabbitmqdemo.utils;publicclassSleepUtils{publicstaticvoidsleep(int second){try{Thread.sleep(1000*second);}catch(InterruptedException _ignored){Thread.currentThread().interrupt();}}}

模拟
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
work04等待30s后发出ack
在这里插入图片描述
在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

不公平分发

上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
不公平分发:

int prefetchCount =1;
channel.basicQos(prefetchCount);

通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。
在这里插入图片描述

标签: docker rabbitmq

本文转载自: https://blog.csdn.net/qq_44402069/article/details/124303944
版权归原作者 Maackia 所有, 如有侵权,请联系我们删除。

“docker启动rabbitmq及使用”的评论:

还没有评论