0


spring-boot对rabbitMQ的操作

一、安装

rabbitMQ
  • 1、直接使用docker拉取镜像docker pull rabbitmq:3.8
  • 2、启动容器docker run \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=123456 \ -v mq-plugins:/plugins \ --name rabbit01 \ --hostname rabbit01 --restart=always \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8
  • 3、关于端口的介绍- 15672的给浏览器控制台使用的- 5672是给程序调用的
  • 4、进入到rabbit01容器中docker exec -it rabbit01 /bin/bash
  • 5、开启可视化界面操作rabbitmq-plugins enable rabbitmq_management
  • 6、客户端直接访问xx:15672
  • 7、或者直接用别人搞好的镜像docker run \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=123456 \ -v mq-plugins:/plugins \ --name rabbit02 \ --hostname rabbit02 --restart=always \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management

二、在

spring-boot

中整合

  • 1、引入依赖包<!-- spring boot和junit整合单元测试包 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- rabbitmq的包引入 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 2、在配置文件中引入配置server.port=8000spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/

三、简单模式

  • 1、简单模式就是一个生产者一个消费者
  • 2、生产者代码,运行下面的代码,查看可视化界面,并不存在消息,原因是因为需要手动创建simple_queue这个队列@SpringBootTest(classes =ProducerApplication.class)publicclassProducerTest01{@AutowiredprivateRabbitTemplate rabbitTemplate;@Testpublicvoidtest01(){/** * 第一个参数:表示队列的名称 * 第二个参数:表示要发送的数据 */ rabbitTemplate.convertAndSend("simple_queue","hello world");}}
  • 3、运行后查看可视化界面在这里插入图片描述
  • 4、定义一个消费者来消费消息packagecom.example.listener01;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassConsumerListener01{@RabbitListener(queues ="simple_queue")publicvoidlistener01(Message message){String msg =newString(message.getBody());System.out.println("接收到的消息:"+ msg);}}

四、

work

工作模式

  • 1、简单的来理解,就是在上面简单模式下增加几个消费者,如同搬砖一样的,一个搬运工搬不过来,多叫几个人来干活的性质,避免消息堆积
  • 2、同样的要先手动创建队列,在生产者端循环发送数据@Testpublicvoidtest02(){for(int i =0; i <10; i++){ rabbitTemplate.convertAndSend("work_queue","hello world");}}
  • 3、定义2个消费者来一起消费消息@ComponentpublicclassConsumerListener01{@RabbitListener(queues ="work_queue")publicvoidlistener01(Message message){String msg =newString(message.getBody());System.out.println("消费者1接收到的消息:"+ msg);}}
  • 4、先运行消费者,然后运行生产者在这里插入图片描述

五、发布模式

  • 1、发布模式是指发送一个消息,希望在几个消费者那边都能接收到,上面的工作模式,一条消息被一个消费者消费了,另外一个消费者就接收不到消息,在一些场景需要给每个消费者就要用发布者模式
  • 2、根据交换机的模式可以分为以下几种 - Fanout,广播模式,将消息全部交给所有与之绑定的队列,这里的router key为空字符串- Direct,将消息指定到对应的routing key上- Topic,通配符模式,这里的routing key根据规则匹配 - *表示一个单词- #表示多个单词
一、
Fanout

模式

  • 1、使用配置文件的方式创建交换机和队列,并且将他们绑定在一起packagecom.example.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitFanoutExchangeConfiguration{// 交换机@BeanpublicExchangefanoutExchange(){returnExchangeBuilder.fanoutExchange("fanout_exchange").durable(true).build();}// 创建一个队列@BeanpublicQueuefanoutQueue1(){returnQueueBuilder.durable("fanout_queue1").build();}// 创建一个队列@BeanpublicQueuefanoutQueue2(){returnQueueBuilder.durable("fanout_queue2").build();}// 队列和交换机绑定@BeanpublicBindingfanoutExchangeQueue01(){// with表示路由keyreturnBindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()).with("").noargs();}@BeanpublicBindingfanoutExchangeQueue02(){// with表示路由keyreturnBindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()).with("").noargs();}}
  • 2、生产者发送消息@Testpublicvoidtest03(){ rabbitTemplate.convertAndSend("fanout_exchange","","hello world");}
  • 3、查看可视化界面,会自动创建一个交换机和两个路由key在这里插入图片描述
  • 4、定义消费者@ComponentpublicclassConsumerListener01{@RabbitListener(queues ="fanout_queue1")publicvoidlistener01(Message message){String msg =newString(message.getBody());System.out.println("消费者1接收到的消息:"+ msg);}}
  • 5、这时候会发现,几个消费者都会同时输出
二、
Direct

模式

  • 1、其实就是在上面的模式上修改创建交换机的类型,及指定类型,别的都保持不变
三、
Topic

模式

  • 1、在定义路由key的时候使用*或者#来表示

六、直接在监听上使用注解的方式来创建交换机等

  • 1、正常创建交换机和队列的方式有三种方式- 直接在可视化界面手动创建- 使用java api方式一个一个创建需要先创建交换机、队列,并且让队列和交换机绑定在一起- 直接使用注解的方式来实现
  • 2、使用注解的方式直接来创建交换机和队列packagecom.example.listener04;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassConsumerListener01{@RabbitListener(bindings =@QueueBinding( value =@Queue(value ="queue_01", durable ="true"), exchange =@Exchange(value ="direct_rabbit_exchange", type =ExchangeTypes.DIRECT), key ={"info","error"}))publicvoidlistener01(Message message){String msg =newString(message.getBody());System.out.println("消费者1接收到的消息:"+ msg);}}
  • 3、运行后查看rabbitmq可视化界面
  • 4、定义发送消息的方法publicvoidtest04(){ rabbitTemplate.convertAndSend("direct_rabbit_exchange","error","hello world");}

七、消息丢失

  • 1、消息丢失主要存在的场景 - 生产者投递消息的时候就丢失,比如写错了交换机的名字- 交换机到队列丢失,比如写错了队列名称- 队列到接收者数据丢失
一、开启生产者确认机制
  • 1、开启生产者确认机制server.port=9000spring.rabbitmq.host=123.56.103.229spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/# 开启生产者确认机制spring.rabbitmq.publisher-confirm-type=correlated
  • 2、重写RabbitTemplate,只要我们在容器中有一个RabbitTemplate,那么spring boot就不会用对RabbitTemplate自动化配置packagecom.example.config;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitmqConfiguration{/** * ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象 */@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String s){if(ack){System.out.println("消息正常投递到交换机中");}else{System.out.println("消息投递到交换机失败:"+s);}}});return rabbitTemplate;}}
  • 3、发送消息的时候故意写错交换机的名字@Testpublicvoidtest04()throwsInterruptedException{ rabbitTemplate.convertAndSend("direct_rabbit_exchange_xx","error","hello world");Thread.sleep(2000);}
  • 4、处理消息投送失败的方式- 使用数据库表来保存发送失败的消息,主要字段有:消息唯一id、消息内容、重试次数、当前消息发送状态- 在消息投送失败的时候重试几次- 定时任务将失败的批量再次发送
  • 5、在发送消息的时候传递当前唯一的识别id,这里使用uuid``````@Testpublicvoidtest04()throwsInterruptedException{String msgUuid =UUID.randomUUID().toString().replace("-","");CorrelationData correlationData =newCorrelationData(msgUuid); rabbitTemplate.convertAndSend("direct_rabbit_exchange","error","hello world", correlationData);Thread.sleep(2000);}
  • 6、获取当前消息的唯一识别@ConfigurationpublicclassRabbitmqConfiguration{/** * ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象 */@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String s){if(ack){System.out.println("消息正常投递到交换机中");}else{String mesId = correlationData.getId();System.out.println(mesId);System.out.println("消息投递到交换机失败:"+s);}}});return rabbitTemplate;}}
二、交换机到队列的时候出现问题
  • 1、当队列名称写错了,或者不存在的时候会出现这个情况
  • 2、开启生产者回调机制server.port=9000spring.rabbitmq.host=123.56.103.229spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/# 开启生产者确认机制spring.rabbitmq.publisher-confirm-type=correlated# 开启生产者回调机制spring.rabbitmq.publisher-returns=true
  • 3、绑定回退函数packagecom.example.config;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitmqConfiguration{/** * ConnectionFactory 由于Spring Boot根据连接的配置信息实现自动化配置,在spring容器中是直接存在ConnectionFactory对象 */@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);...// 绑定回退机制的回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){System.out.println(returnedMessage.getMessage());System.out.println(returnedMessage.getReplyCode());System.out.println(returnedMessage.getReplyText());System.out.println(returnedMessage.getExchange());System.out.println(returnedMessage.getRoutingKey());}});return rabbitTemplate;}}
三、消息持久化
  • 1、rabbitmq默认是在内存中存储,当服务宕机后数据直接会丢失,消息在spring boot中持久化是因为框架帮你处理了,修改消息是否持久化可以参考下面@Testpublicvoidtest04()throwsInterruptedException{String msgUuid =UUID.randomUUID().toString().replace("-","");CorrelationData correlationData =newCorrelationData(msgUuid);MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{MessageProperties messageProperties = message.getMessageProperties();// 获取到消息属性对象 messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);// 设置消息不缓存return message;}}; rabbitTemplate.convertAndSend("direct_rabbit_exchange","error","hello world",messagePostProcessor, correlationData);Thread.sleep(2000);}
四、消费者消费消息不丢失
  • 1、在spring boot中消费者应答模式主要有以下几种- none自动应答,消费者获取到消息以后直接给rabbitmq返回ack- auto(默认模式),由spring boot框架根据业务执行特点决定给rabbitmqack还是uack,业务执行正常完成后返回ack,业务执行中出现异常的时候返回uack- manual手动应答模式,由程序员自己根据业务执行特点给rabbitmq返回对应的ack还是uack
  • 2、配置应答模式server.port=8000spring.rabbitmq.host=123.56.103.229spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/# 配置应答模式spring.rabbitmq.listener.simple.acknowledge-mode=auto
八、消费限流
  • 1、在消费者端添加配置server.port=8000spring.rabbitmq.host=123.56.103.229spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/spring.rabbitmq.listener.simple.acknowledge-mode=auto# 每次处理10个spring.rabbitmq.listener.simple.prefetch=10

九、死信队列

  • 1、在下面几种情况下会产生死信队列- 消息的存活时间到了- 队列满了,比如队列只能放10个消息,这时候发送11个消息过来,就有一个消息为死信,在队列中时间最久的那个将为成为死信队列- 消费被拒绝了,或者rabbitmq返回uack
  • 2、死信队列的架构图在这里插入图片描述
  • 3、创建死信队列packagecom.example.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitDlxExchangeConfiguration{// 创建一个死信交换机@BeanpublicExchangedlxExchange(){returnExchangeBuilder.fanoutExchange("dlx_exchange").durable(true).build();}// 创建一个死信队列@BeanpublicQueuedlxQueue(){returnQueueBuilder.durable("dlx_queue").maxLength(10).build();}// 死信交换机和死信队列绑定@BeanpublicBindingdlxQueueBinding(){returnBindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dead").noargs();}// 创建一个正常的交换机@BeanpublicExchangeorderExchange(){returnExchangeBuilder.directExchange("order_exchange").durable(true).build();}// 创建一个正常队列@BeanpublicQueueorderQueue(){returnQueueBuilder.durable("order_queue").maxLength(10).deadLetterExchange("dlx_exchange").// 死信队列的交换机deadLetterRoutingKey("dead").// 死信队列的routingKeybuild();}// 正常交换机和正常队列绑定@BeanpublicBindingorderQueueBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with("info").noargs();}}
  • 4、发送消息@Testpublicvoidtest05()throwsInterruptedException{for(int i =0; i <15; i++){ rabbitTemplate.convertAndSend("order_exchange","info","hello world"+ i);}Thread.sleep(2000);}
  • 5、查看可视化界面,进入死信队列的是时间最早的(也就是最先发送的)在这里插入图片描述
  • 6、定义消费者packagecom.example.listener05;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassConsumerListener01{@RabbitListener(queues ="dlx_queue")publicvoidlistener01(Message message){String msg =newString(message.getBody());System.out.println("接收到的死信队列消息:"+ msg);}@RabbitListener(queues ="order_queue")publicvoidlistener02(Message message){String msg =newString(message.getBody());System.out.println("接收到的订单队列消息:"+ msg);}}

十、延迟任务

  • 1、在rabbitmq中没有真正意义上的延迟队列任务,只是采用ttl+死信队列来完成的
  • 2、延迟任务主要用于场景- 文章定时发布- 订单多少分钟后取消支付
  • 3、延迟任务的结构图在这里插入图片描述
  • 4、创建一个延迟任务的队列@ConfigurationpublicclassRabbitDlxExchangeConfiguration{...@BeanpublicQueueorderQueue(){returnQueueBuilder.durable("order_queue").// maxLength(10).ttl(2000).// 过期时间deadLetterExchange("dlx_exchange").// 死信队列的交换机deadLetterRoutingKey("dead").// 死信队列的routingKeybuild();}}
  • 5、发送消息,观察可视化界面,时间到了就会进入到死信队列中@Testpublicvoidtest06()throwsInterruptedException{ rabbitTemplate.convertAndSend("order_exchange","info","hello world");Thread.sleep(2000);}
  • 6、在死信队列中监听数据来改变数据库状态
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/kuangshp128/article/details/134676339
版权归原作者 水痕01 所有, 如有侵权,请联系我们删除。

“spring-boot对rabbitMQ的操作”的评论:

还没有评论