0


rabbitmq相关总结

文章目录

Windows服务启动

这里只记录每次怎么在本地开启服务,不涉及具体安装细节,工作时一般由运维人员安装在linux环境上

服务开启命令

开启服务时,需要切到本地的rabbitmq的\sbin目录下

Rabbitmq-server

如果遇到端口占用的问题,windows开始面板上搜索rabbitmq-stop,点击后再次输入开启服务命令即可

管理页面入口(测试是否正常启动)

在这里插入图片描述

http://127.0.0.1:15672/

默认账号:guest
默认密码:guest
创建账号

rabbitmqctl add_user admin 123

设置用户角色

rabbitmqctl set_user_tags admin administrator

设置用户权限

set_permissions [-p <vhostpath>]<user><conf><write><read>
rabbitmqctl set_permissions -p "/" admin ".*"".*"".*"

当前用户和角色

rabbitmqctl list_users

rabbitmq依赖

<!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>

队列模型

简单队列

生产者

生产者首先获得连接,之后获得信道,信道初始化队列,然后发送信息

publicclassProducer{publicstaticfinalStringQUENE_NAME="QUEUE_SIMPLE";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");//连接Connection connection = connectionFactory.newConnection();//信道Channel channel = connection.createChannel();
        channel.queueDeclare(QUENE_NAME,false,false,false,null);String message ="hello world rabbitmq";
        channel.basicPublish("",QUENE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}}

消费者

消费者首先获得连接,产生信道,

publicclassConsumer{publicstaticfinalStringQUEUE_NAME="QUEUE_SIMPLE";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");//连接Connection connection = connectionFactory.newConnection();//信道Channel channel = connection.createChannel();System.out.println("等待接受消息");推送的消息如何进行消费的接口回调DeliverCallback deliverCallback =(consumerTag,delivery)->{String message =newString(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback =(counsumerTag)->{System.out.println("消息消费被中断");};//1. 消费哪个队列//2. 消费成功后是否要自动应答//3. 消费者未成功消费的回调
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}

工作队列

核心思想就是轮训分发消息

抽取工具类

首先对获取信道的方法抽取成工具类
调用静态方法getChanel就可以获取一个信道

/**
 * 此类为连接工厂创建信道的工具类
 */publicclassRabbitMqUtils{//得到一个连接的 channelpublicstaticChannelgetChannel()throwsException{//创建一个连接工厂ConnectionFactory factory =newConnectionFactory();
 factory.setHost("127.0.0.1");
 factory.setUsername("admin");
 factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}}

启动两个工作线程

工作线程c1

/**
 * 工作线程(相当于消费者)
 */publicclassWorker01{//队列的名称publicstaticfinalStringQUEUE_NAME="hello";//接收消息publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("接收:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消息者取消消费接口回调逻辑");};//接收System.out.println("c1启动...");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}

工作线程c2

/**
 * 工作线程(相当于消费者)
 */publicclassWorker02{//队列的名称publicstaticfinalStringQUEUE_NAME="hello";//接收消息publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("接收:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消息者取消消费接口回调逻辑");};//接收System.out.println("c2启动...");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}

启动发送线程

采用scanner输入流的方式从控制台去输入数据

/**
 * 生产者 发送大量的消息
 */publicclassTask01{publicstaticfinalStringQUEUE_NAME="hello";//发送大量的消息publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();//队列的声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受消息Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成:"+message);}}}

发送结果:
在这里插入图片描述
接收结果:
work01
在这里插入图片描述
work02
在这里插入图片描述
总结:生产者一共发送四条消息,两个消费者各接收两条消息,并且是有序的

消息应答

rabbitmq引入消息应答机制,保证消息在发送过程中不丢失,消息应答就是消费者在接收到消息并处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了

自动应答

容易丢失消息

手动应答

Channel.basicAck(用于肯定确认)

rabbitmq已知道该消息并且成功的处理消息,可以将其丢弃了

Multiple

Multiple参数一般设置为false
在这里插入图片描述
true
在这里插入图片描述
false
在这里插入图片描述

消息自动重新入队

若消费者失去连接(宕机、连接关闭等),导致消息未发送ACK确认,Rabbitmq将了解到消息未完全处理,并将对其重新排队,此时会分发给别的消费者。这样,即使某个消费者偶尔死亡,也可以确认不会丢失任何消息
在这里插入图片描述
此图代表了消息自动重新入队的具体执行流程

代码实现

生产者

/**
 * 消息在手动应答时不丢失,放回队列中重新消费
 */publicclassTask2{//队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();//声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);//从控制台输入Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());System.out.println("生产者发出消息:"+ message);}}}

消费者

消费者c1
沉睡时间1秒,模拟快速处理业务

publicclassWork03{//队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("c1等待接受消息处理较短");DeliverCallback deliverCallback =(consumerTag,message)->{//沉睡1sSleepUtils.sleep(1);System.out.println("接收到的消息:"+newString(message.getBody()));//手动应答/**
             * 1. 消息的标记 tag
             * 2. 是否批量应答 false:不批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck =false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{System.out.println("消费者取消消费接口回调");}));}}

消费者c2
沉睡时间30秒,模拟慢速处理业务

publicclassWork04{//队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("c2等待接受消息处理较长");DeliverCallback deliverCallback =(consumerTag,message)->{//沉睡1sSleepUtils.sleep(30);System.out.println("接收到的消息:"+newString(message.getBody()));//手动应答/**
             * 1. 消息的标记 tag
             * 2. 是否批量应答 false:不批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck =false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{System.out.println("消费者取消消费接口回调");}));}}

睡眠工具类

publicclassSleepUtils{publicstaticvoidsleep(int second){try{Thread.sleep(1000*second);}catch(InterruptedException _ignored){Thread.currentThread().interrupt();}}}

效果

发送消息a,b,测试可知c1接收后,c2隔一段时间接收,此时发送c,d消息,c1接收了c消息,然后把c2服务停掉,模拟宕机,此时c1会去接收d消息,消息并没有丢失,会重新发送
发送消息:
在这里插入图片描述
c1
d消息成功接收到
在这里插入图片描述
c2
接收到b消息后模拟宕机
在这里插入图片描述

Exchanges类型

fagout-发布订阅模式

不处理routingkey,每个队列都会得到
在这里插入图片描述

direct-路由模式

根据routingkey去转发消息给指定的队列
在这里插入图片描述

topic-主题模式

根据特殊符号去匹配多个,类似于正则表达式

发布订阅模式

在这里插入图片描述

  • 一个生产者对应多个消费者
  • 生产者将消息发送到X(Exchange),每个消费者需要绑定一个队列
  • Exchage:接收生产者的消息 推送给消费者

生产者

/**
 * 发布订阅模式-生产者
 */publicclass producer{//交换机publicstaticfinalStringEXCHAGE_NAME="exchange_ps";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHAGE_NAME,"fanout");String msg ="hello fbdy";//发送消息到交换机
        channel.basicPublish(EXCHAGE_NAME,"",null,msg.getBytes());System.out.println("publish msg:"+msg);}}

消费者

消费者1

/**
 * 消费者1
 */publicclass cousumer1 {//队列publicstaticfinalStringQUEUE_NAME="queue_mail";//交换机publicstaticfinalStringEXCHAGE_NAME="exchange_ps";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHAGE_NAME,"");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("cousumer1:"+msg);}};
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}

消费者2

/**
 * 消费者2
 */publicclass cousumer2 {//队列publicstaticfinalStringQUEUE_NAME="queue_mail";//交换机publicstaticfinalStringEXCHAGE_NAME="exchange_ps";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHAGE_NAME,"");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("cousumer2:"+msg);}};
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}

路由模式

在这里插入图片描述

  • 生产者将消息发送到交换机,消息附带routingkey
  • 消费者声明队列与交换机绑定,因为生产者附带routingKey,所以消费者如果想要接收来自交换机的消息,也需要绑定一致的routingKey,这样才可以接收来自生产者的消息。
  • 只要消费者的绑定的交换机的路由键与交换机的路由键一致,则可以收到生产者的消息

生产者

  • 生产者发布消息给交换机时需要指定routingKey
  • 交换机的初始化是由生产者进行的
/**
 * 路由模式
 */publicclass producer {publicstaticfinalStringEXCHAGE_NAME="direct_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHAGE_NAME,"direct");String rountingKey ="warning";
        channel.basicPublish(EXCHAGE_NAME,rountingKey,null,"hello,gjj".getBytes());System.out.println("发送消息:hello,gjj,routingKey:"+rountingKey);}}

消费者

消费者1

  • 队列的初始化是需要消费者进行的
  • 每个队列需要绑定到交换机上的routingKey上,当与交换机的routingKey一致,则可以消费消息。
publicclass consumer1 {//声明交换机和队列类型publicstaticfinalStringEXCHANGE_NAME="direct_exchange";publicstaticfinalStringQUEUE_NAME="direct_queue1";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定队列至交换机,可以通过多个routingKey绑定一个队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("counsumer1:"+msg);}};
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}

消费者2

publicclass consumer2{//声明交换机和队列类型publicstaticfinalStringEXCHANGE_NAME="direct_exchange";publicstaticfinalStringQUEUE_NAME="direct_queue2";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定队列至交换机,可以通过多个routingKey绑定一个队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("counsumer2:"+msg);}};
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}

死信队列

延迟队列


本文转载自: https://blog.csdn.net/H1212uhh/article/details/128227417
版权归原作者 宽宽rrr 所有, 如有侵权,请联系我们删除。

“rabbitmq相关总结”的评论:

还没有评论