0


RabbitMQ的安装使用

RabbitMQ是什么?

MQ全称为Message Queue,消息队列,在程序之间发送消息来通信,而不是通过彼此调用通信。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

为什么使用RabbitMQ?

优点:
1、实现应用系统的解耦,客户端只关心发送消息,而不关心处理。
2、异步提升效率,在主业务逻辑发送消息,异步去处理消息
3、流量削峰,将请求放到mq消息队列中,mysql每秒去拉取请求消费,避免请求全部一下子全部打到mysql,请求过多而崩溃

怎么使用RabbitMQ?

1.安装windows的客户端,参考链接3

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.java 代码引入相关jar包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.3.RELEASE</version></dependency>
3.编写发送,接收消息的工具类
延迟队列配置
packagecom.next.mq;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * @desc 延迟队列配置
 */@ConfigurationpublicclassRabbitDelayMqConfig{@Bean("delayDirectExchange")publicDirectExchangedelayDirectExchange(){DirectExchange directExchange =newDirectExchange(QueueConstants.DELAY_EXCHANGE,true,false);//交换机开启延迟设置true,延迟才会生效
        directExchange.setDelayed(true);return directExchange;}@Bean("delayNotifyQueue")publicQueuedelayNotifyQueue(){returnnewQueue(QueueConstants.DELAY_QUEUE);}@Bean("delayBindingNotify")publicBindingdelayBindingNotify(@Qualifier("delayDirectExchange")DirectExchange delayDirectExchange,@Qualifier("delayNotifyQueue")Queue delayNotifyQueue){returnBindingBuilder.bind(delayNotifyQueue).to(delayDirectExchange).with(QueueConstants.DELAY_ROUTING);}}
队列配置
packagecom.next.mq;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Primary;/**
 * @desc 队列配置
 */@ConfigurationpublicclassRabbitMqConfig{@Bean("directExchange")@PrimarypublicDirectExchangedirectExchange(){returnnewDirectExchange(QueueConstants.COMMON_EXCHANGE,true,false);}@Bean("notifyQueue")@PrimarypublicQueuenotifyQueue(){returnnewQueue(QueueConstants.COMMON_QUEUE);}@Bean("bindingNotify")@PrimarypublicBindingbindingNotify(@Qualifier("directExchange")DirectExchange directExchange,@Qualifier("notifyQueue")Queue notifyQueue){returnBindingBuilder.bind(notifyQueue).to(directExchange).with(QueueConstants.COMMON_ROUTING);}}
发送消息工具类
packagecom.next.mq;importcom.next.util.JsonMapper;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageDeliveryMode;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.util.UUID;/**
 * @desc 客户端工具类 -- 发送消息
 */@Component@Slf4jpublicclassRabbitMqClient{@ResourceprivateRabbitTemplate rabbitTemplate;//发送同步消息publicvoidsend(MessageBody messageBody){try{//生成唯一的消息idString uuid =UUID.randomUUID().toString();//初始话消息CorrelationData correlationData =newCorrelationData(uuid);//使用模板工具类rabbitTemplate 来发消息
            rabbitTemplate.convertAndSend(QueueConstants.COMMON_EXCHANGE,QueueConstants.COMMON_ROUTING,JsonMapper.obj2String(messageBody),newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{// 消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//记录日志
                            log.info("message send, {}", message);return message;}}, correlationData);}catch(Exception e){//日志打印,以便定位问题
            log.error("message send exception, msg:{}", messageBody.toString(), e);}}/**
     * @desc 发送延迟消息
     */publicvoidsendDelay(MessageBody messageBody,int delayMillSeconds){try{//设置消息延迟时间
            messageBody.setDelay(delayMillSeconds);String uuid =UUID.randomUUID().toString();CorrelationData correlationData =newCorrelationData(uuid);//延迟交换机和路由
            rabbitTemplate.convertAndSend(QueueConstants.DELAY_EXCHANGE,QueueConstants.DELAY_ROUTING,JsonMapper.obj2String(messageBody),newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化//设置消息延迟的时间(毫秒值)
                            message.getMessageProperties().setDelay(delayMillSeconds);
                            log.info("delay message send, {}", message);return message;}}, correlationData);}catch(Exception e){
            log.error("delay message send exception, msg:{}", messageBody.toString(), e);}}}
接收消息工具类
packagecom.next.mq;importcom.next.dto.RollbackSeatDto;importcom.next.model.TrainOrder;importcom.next.service.TrainOrderService;importcom.next.service.TrainSeatService;importcom.next.util.JsonMapper;importlombok.extern.slf4j.Slf4j;importorg.codehaus.jackson.type.TypeReference;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * @desc rabbitmq的server端 - 延迟接收消息
 * 用处:在主流程里面发送消息,异步流程里面接收消息,处理。提升代码性能
 */@Component@Slf4jpublicclassRabbitDelayMqServer{@ResourceprivateTrainSeatService trainSeatService;@ResourceprivateTrainOrderService trainOrderService;@RabbitListener(queues =QueueConstants.DELAY_QUEUE)publicvoidreceive(String message){
        log.info("delay queue receive message, {}", message);try{MessageBody messageBody =JsonMapper.string2Obj(message,newTypeReference<MessageBody>(){});if(messageBody ==null){return;}switch(messageBody.getTopic()){caseQueueTopic.SEAT_PLACE_ROLLBACK:RollbackSeatDto dto =JsonMapper.string2Obj(messageBody.getDetail(),newTypeReference<RollbackSeatDto>(){});
                    trainSeatService.batchRollbackSeat(dto.getTrainSeat(), dto.getFromStationIdList(), messageBody.getDelay());break;caseQueueTopic.ORDER_PAY_DELAY_CHECK:TrainOrder trainOrder =JsonMapper.string2Obj(messageBody.getDetail(),newTypeReference<TrainOrder>(){});
                    trainOrderService.delayCheckOrder(trainOrder);break;default:
                    log.warn("delay queue receive message, {}, no need handle", message);}}catch(Exception e){
            log.error("delay queue message handle exception, msg:{}", message, e);}}}

参考链接:
1.rabbitMQ到底是个啥东西?
2.超详细!!!Windows下安装RabbitMQ的步骤详解
3.windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)
4.RabbitMQ安装或启动后,无法访问http://localhost:15672/

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_48164819/article/details/135597179
版权归原作者 q琦一 所有, 如有侵权,请联系我们删除。

“RabbitMQ的安装使用”的评论:

还没有评论