- 什么是消息中间件 简单的来说就是消息队列中间件,生产者发送消息到中间件,消息中间件用于 保存消息并发送消息到消费者。
- 消息中间件RabbitMQ的基本组件 1)producer -生产者 2)customer -消费者 3)broker (经纪人)- MQ服务器,管理消息对列、消息及相关消息。(接收并存储生产者发送的消息,发送消息到消费者) 4)exchange-交换机,将生产者的消息按照一定规则发送给对应的消息对列queue 5)queue-消息对列,队列,消息存放的容器,消息先进先出 6)Message-消息,程序间的通信的数据
- 什么是消息队列queue(生产者生产msg-queue,消费者监听queue-消费) 消息对列是一种分布式中的通信方式,它通过异步传输消息的方式,来解耦消 息的 生产者和消费者。在消息中间件中,生产者将消息发送到消息对列中,以为先进先出的方式,消费者从对列中取出消息(可以监听对列是否有消息-@RabblitListener和@RabbitHandler)
- 消息中间件的作用 主要有三个作用:分别是服务解耦、实现异步通信、流量削峰 1). 服务解耦:(场景-用户下订单、库存服务工作) 例如订单服务-用户下订单,库存服务处理对应减库存,才返回给用户下单成功的消息。如果说库存服务出现了问题,就会造成订单丢失等问题。如果使用消息中间件(消息对列),可以把下的订单信息—> mq就返回用户下单这个,mq再发送给库存服务,这样生产者发送消息和消费者接收处理消息相互不影响,即使宕机了,消息还在中间件中。
2). 异步通信/异步调用:(用户注册新用户,服务发送短信和邮件)
传统的模式,用户注册系统新用户,服务给用户发送短信和邮件,三个操作都完成之后才返回用户下单注册的消息。因为短信和邮箱和注册信息是没有关系的服务,用户注册后消息发送给mq,用户不需要等邮件和短信发送成功,mq直接返回用户注册成功,至此用户注册业务完成。至于短信和邮件交给mq发送给短信业务-去发送。
注意:
异步就是某线程发出请求,不需要等其他线程完成就接着完成操作。用户注册,消息发送给mq,不需要等短信服务完成,短信发布发送都与注册无关,两者是异步关系。异步不是并发,所有操作同时进行,异步是各过各的。
3). 流量削峰:(商品秒杀)
例如商品秒杀的时候,这时候数据库并不能承受这么大的请求。可以把请求下订单的信息暂存在mq中,返回给用户下单成功,之后的操作由mq发送给对应的服务处理。缓存数据减少数据库的压力。
- 为什么需要使用消息中间件 服务解耦、异步通信、流量削峰
- 消息中间件在分布式系统中使用场景(异步) 6.1 服务解耦-订单和库存服务。用户下订单,消息发给mq,mq返回用户下订单成功,消费者-库存服务接收mq消息再去调用减少库存的消息。 6.2 异步通信-用户注册新账户 用户注册和admin发送短信和邮件异步 6.3 流量削峰-商品秒杀,先mq先存储订单信息,返回订单服务下单成功,后慢慢处理。减少大并发对数据库的影响/。
- RabbitMQ的五种消息模型/工作模式、 1) simple 简单的一对一模式,producce-queue-customer 2) word模式,一个消息对列queue—> 多个消费者,消费者争抢消息队列里面消息,注意一个消息只能被一个消费者消费。 3) fanout-广播、订阅者模式。交换机将消息发送给所有binding的对列,消费端可以有多个customer使用word模式消费对列的消息。 4) topic-主体模式,生产者的消息按照不同的路由规则,模糊匹配给不同满足条件的消息对列,消费者再去消费对列中消息 5)routeKey,路由键(exchange-type-direct),按照不同的路由键发送到对应的queue中。
- 消息中间件是异步还是同步 异步,各干各的,互不影响。(异步并不是并发-同时请求一个请求,而是互不影响个干各的,没有约束和先后顺序)。received生产者的message,send消息到消费者。二者是异步,解耦合互不影响。
- mq的消息确认机制confirm(MQ如何避免消息丢失?)1. . 对于生产者端来说,主要有两种确认机制 a. message到broker后,mq立马确认confirm并返回消息告知生产者消息发送成功,如果失败也告知生产者,并重新发送。 b. message到MQ之后,如果消息对列没有received成功(queue存储msg成功),会确认并返回消息接收失败到生产者 a b 保证了生产者端不会丢失消息。2). 对于消费者来说。 a. 消费者接收到queue的消息后,默认自动确认,queue删除该message。 b. 消费者接收到msg后,对数据进行逻辑处理,如果直接confirm-queue直接删除msg,处理数据过程中可能会宕机消息丢失。 ----设置为手动confirm确认收货,数据处理完再收货成功,queue再去删除msg。也可以对数据不满,退回到queue重新入队,也可以直接删除数据。 c. 接收失败告知queue,不会删除数据,MQ重新发送消息-这种操作很常见 这样避免数据在消费者端丢失
1、2两种方式避免了mq的消息丢失。
- MQ重复消费 1)如何造成重复消费 (1) 生产者端,传输到MQ-queue消息对列接收成功,MQ因为网络问题没有ack->producer,导致生产者又发送了一次消息到MQ。queue-customer-这样msg就被消费了两次。 (2)消费者端,MQ-queue消息对列消息传到customer。一种是消费者没有接收成功,因为网络问题没有ack queue,queue重复发送,这种不会造成msg重复消费。另一种是消费者消费成功,但是因为不可控因素没有ack queue,消息对列重复发送mgs-to-customer-重复消费。2)解决方法 对于幂等性消息(查询),消费者重复消费也没有关系。 对于非幂等性消息,消费者重复消费就会有影响了。 方法:消费者在消费消息之前,获取msg唯一id,到redis进行存储判断setnx(判断是已经存在并存储key-value)。
Boolen flag=stringRadisTemplate.opsForValue.setAbsent(id,value);
1-flag=true,key不存在,未被消费,c正常消费msg
2-false,key存在,已经被消费(两种可能-正在消费或者完成消费-忘记告知ack-queue了),无论哪种情况都直接丢弃。
注意一个问题:如果redis显示有消费记录-且消费者正在消费,此时消费者执行业务宕机了,redis分布式锁会成死锁-解决方法在IfAbsent方法加上过期时间和单位。
一句话就是:消费之前,缓存中有消费记录则丢弃消息,不二次消费。
redis缓存中没有消费记录则,重复存入缓存并消费(设计锁过期时间)。
以下是消息中间件MQ的相关代码和配置信息
- 使用MQ的步骤 1)在pom文件中加上依赖amqp
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency
2) 配置文件配置rabbit服务器的对应信息(spring.rabbitmq host、port,username,ps等)
spring.rabbitmq.host=rabbitmq服务器地址信息
spring.rabbitmq.port=端口号
spring.rabbitmq.username=账户name
spring.rabbitmq.password=密码
spring.rabbitmq.virtual-host=/
#1. 生产者发送message, mq收到消息就确认回复到生产者
spring.rabbitmq.publisher-confirms=tr
#2. queue消息对列接收生产者的消息失败,就确认返回消息到生产操者
spring.rabbitmq.publisher-returns=true
#3. 消费者接收queue消息对列的消息之后,手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3) 服务启动类上面加上注解@EnableRabbit-开启MQ
在springboot启动类加上 @EnableRabbit-开启MQ
4) 业务使用消息中间件存储消息的时候
(1) 创建交换机(注意有不同类型的交换机 direct-fanout-topic)
publicvoidcreateExchange(){// 1. 创建direct类型的exchange 交换机的名字-hello.java.exchangeDirectExchange directExchange =newDirectExchange("hello.java.exchange",true,false);// 2. 声明交换机
amqpAdmin.declareExchange(directExchange);
log.info("exchange创建成功1111","hello.java.exchange");}
(2)创建消息队列queue
publicvoidcreateQueue(){// 1. 创建队列-queue 队列名称-hello-java-queueQueue queue =newQueue("hello.java.queue",true,false,false);// 2. 声明mq队列
amqpAdmin.declareQueue(queue);
log.info("queue创建成功1111","hello.java.queue");}
(3)交换机和消息队列直接关系绑定
publicvoidbindEQ(){// 1. 创建绑定对象( "hello.java.queue"--消息对列, "hello.java.exchange"--交换机,"hello.java"-绑定关系的route-key)Binding binding =newBinding("hello.java.queue",Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);// 2. 声明绑定关系(这个关系实际也是一个对象)
amqpAdmin.declareBinding(binding);
log.info("Binding创建成功1111","hello.java.binding");}
(4)使用MQ的操作工具类 RabbitTemplate-操作发送消息
对象注入
@AutowiredRabbitTemplate rabbitTemplate;
生产者发送消息,需要携带消息-mgs和发送给哪个queue的route-key。注意发送消息需要一个唯一id,后面防止重复发送需要此id判断
publicvoidsendMessageStr()throwsInterruptedException{String msg ="测试数据测试数";// 发送10条message到exchange中// new CorrelationData(UUID.randomUUID().toString() 发送的消息的唯一id mq可以接收并处理
rabbitTemplate.convertAndSend("hello.java.exchange","hello.java", msg+"11111111111111",newCorrelationData(UUID.randomUUID().toString()));
rabbitTemplate.convertAndSend("hello.java.exchange","hello.java", msg +"222222222222",newCorrelationData(UUID.randomUUID().toString()));
log.info("交换机消息发送成功----------->");}
(4)消费者监听消息对列消息,消费消息
使用@RabbitListener监听消息对列,使用RabbitHandler接收对应类型的消息。前者放在类上面,后者放到监听方法上面。
queues是消息对列名称的集合
@RabbitListener(queues ={"hello.java.queue"})
使用@RabbitHandler监听不同类型的消息
// 消息是TestEntity2 类型,会自动匹配到对应方法接收@RabbitHandlerpublicvoidreceiveOfSecond(TestEntity2 testEntity2)throwsInterruptedException{System.out.println("receiveOfSecond-监听接受queue的数据是----->"+ testEntity2);}@RabbitHandlerpublicvoidreceiveOfFirst(TestEntity testEntity)throwsInterruptedException{System.out.println("receiveOfFirst-监听接受queue的数据是----->"+ testEntity);}
版权归原作者 我们一起搬砖吧 所有, 如有侵权,请联系我们删除。