文章目录
1.场景描述
消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
1.1 场景1
什么意思呢?举个例子:一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。
这种情景就会出现消息可能被多次地投递。
1.2 场景2
还有一种场景是程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
以上两个场景对于消息队列来说就是同一个messageId的消息重复投递下来了。
我们利用消息id来判断消息是否已经消费过,如果该信息被消费过,那么消息表中已经 会有一条数据,由于消费时会先执行插入操作,此时会因为主键冲突无法重复插入,我们就利用这个原理来进行幂等的控制,消息内容可以用json格式来进行传输的。
3.实战开发
3.1 建表
DROPTABLEIFEXISTS`message_idempotent`;CREATETABLE`message_idempotent`(`message_id`varchar(50)NOTNULLCOMMENT'消息ID',`message_content`varchar(2000)DEFAULTNULLCOMMENT'消息内容',`status`intDEFAULT'0'COMMENT'消费状态(0-未消费成功;1-消费成功)',`retry_times`intDEFAULT'0'COMMENT'重试次数',`type`intDEFAULT'0'COMMENT'消费类型',PRIMARYKEY(`message_id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;
3.2 集成mybatis-plus
《springBoot集成mybatisPlus》
3.3 集成RabbitMq
3.3.1 安装mq
推荐使用docker安装rabbitmq,还未安装的可以参考以下信息:
- docker安装
3.3.2 springBoot集成mq
- 1.添加依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3.4 生产者具体实现
3.4.1 mq配置类
- DirectRabbitConfig 具体如何开启可以参考《rabbitMq实现死信队列》
import org.springframework.amqp.core.\*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;
@Configuration
public class RabbitmqConfig {
//正常交换机的名字
public final static String EXCHANGE\_NAME ="exchange\_name";
//正常队列的名字
public final static String QUEUE\_NAME="queue\_name";
//死信交换机的名字
public final static String EXCHANGE\_DEAD ="exchange\_dead";
//死信队列的名字
public final static String QUEUE\_DEAD="queue\_dead";
//死信路由key
public final static String DEAD\_KEY="dead.key";
//创建正常交换机
@Bean(EXCHANGE\_NAME)
public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE\_NAME)
//持久化 mq重启后数据还在
.durable(true)
.build();}
//创建正常队列
@Bean(QUEUE\_NAME)
public Queue queue(){
//正常队列和死信进行绑定 转发到 死信队列,配置参数
Map<String,Object>map=getMap();return new Queue(QUEUE\_NAME,true,false,false,map);}
//正常队列绑定正常交换机 设置规则 执行绑定 定义路由规则 requestmaping映射
@Bean
public Binding binding(@Qualifier(QUEUE\_NAME) Queue queue,
@Qualifier(EXCHANGE\_NAME) Exchange exchange){return BindingBuilder.bind(queue)
.to(exchange)
//路由规则
.with("app.#")
.noargs();}
//创建死信队列
@Bean(QUEUE\_DEAD)
public Queue queueDead(){return new Queue(QUEUE\_DEAD,true,false,false);}
//创建死信交换机
@Bean(EXCHANGE\_DEAD)
public Exchange exchangeDead(){return ExchangeBuilder.topicExchange(EXCHANGE\_DEAD)
.durable(true) //持久化 mq重启后数据还在
.build();}
//绑定死信队列和死信交换机
@Bean
public Binding deadBinding(){return BindingBuilder.bind(queueDead())
.to(exchangeDead())
//路由规则 正常路由key
.with(DEAD\_KEY)
.noargs();}
/\*\*
获取死信的配置信息
\*
\*\*/
public Map<String,Object>getMap(){
//3种方式 任选其一,选择其他方式之前,先把交换机和队列删除了,在启动项目,否则报错。
//方式一
Map<String,Object>map=new HashMap<>(16);
//死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
map.put("x-dead-letter-exchange", EXCHANGE\_DEAD);
//死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
map.put("x-dead-letter-routing-key", DEAD\_KEY);
//方式二
//消息的过期时间,单位:毫秒;达到时间 放入死信队列
// map.put("x-message-ttl",5000);
//方式三
//队列最大长度,超过该最大值,则将从队列头部开始删除消息;放入死信队列一条数据
// map.put("x-max-length",3);return map;}}
- 延迟队列配置 具体如何开启可以参考《rabbitMq实现死信队列》
由于rabbitMq中不直接支持死信队列,需要我们利用插件rabbitmq_delayed_messgae_exchage进行开启
/**
* 定义延迟交换机
*/@ConfigurationpublicclassRabbitMQDelayedConfig{//队列privatestaticfinalStringDELAYQUEUE="delayedqueue";//交换机privatestaticfinalStringDELAYEXCHANGE="delayedExchange";@BeanpublicQueuedelayqueue(){returnnewQueue(DELAYQUEUE);}//自定义延迟交换机@BeanpublicCustomExchangedelayedExchange(){Map<String,Object> arguments =newHashMap<>();
arguments.put("x-delayed-type","direct");/**
* 1、交换机名称
* 2、交换机类型
* 3、是否需要持久化
* 4、是否需要自动删除
* 5、其他参数
*/returnnewCustomExchange(DELAYEXCHANGE,"x-delayed-message",true,false,arguments);}//绑定队列和延迟交换机@BeanpublicBindingdelaybinding(){returnBindingBuilder.bind(delayqueue()).to(delayedExchange()).with("sectest").noargs();}}
3.4.2 生产者
- 1.消费队列的生产者
importcom.example.shop.config.RabbitmqConfig;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.util.UUID;@ComponentpublicclassSender_Direct{@AutowiredprivateAmqpTemplate rabbitTemplate;/**
* 用于消费订单
*
* @param orderId
*/publicvoidsend2Direct(String orderId){//创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)MessageProperties messageProperties =newMessageProperties();
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,RabbitmqConfig.ROUTING_KEY,"内容设置", message ->{//设置消息的id为唯一
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
message.getMessageProperties().setMessageId(orderId);return message;});}}
3.4.3 消费者
1.开启手动ack配置
spring:
application:
name: shop
rabbitmq:
host: 192.168.1.102
port: 5673
virtual-host: /
username: guest
password: guest
listener:
simple:
# 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
acknowledge-mode: manual
消费者要配置ack重试机制,具体参考前几篇文章,使用的是mysql消息ID的唯一性,有时候可能生成一样的订单,具体的没有进行实验,内容是json生成的,可以执行业务
importcom.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;importcom.example.des.Bean.MessageIdempotent;importcom.example.des.Bean.Shop;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.io.IOException;@ComponentpublicclassReceiver_Direct{privatestaticfinalInteger delayTimes =30;//延时消费时间,单位:秒@AutowiredprivateRabbitTemplate rabbitTemplate;@RabbitListener(queues ={"smsQueue"})publicvoidreceiveD(Message message,Channel channel)throwsIOException{try{// 获取消息IdString messageId = message.getMessageProperties().getMessageId();String msg =newString(message.getBody());//获取消息//向数据库插入数据MessageIdempotent messageIdempotent =newMessageIdempotent();
messageIdempotent.setMessageId(messageId);
messageIdempotent.setMessageContent(msg);
messageIdempotent.setRetryTimes(0);System.out.println(messageIdempotent.toString());Boolean save =true;//设置保存成功,消息投递失败是在确认模式那里if(!save){//说明属于重重复请求//1、处理消息内容的业务,解析json数据//2、创建订单,并保存Boolean flag =consumeOrder(newShop());if(flag){//投入延迟队列,如果30分钟订单还没有消费,就删除订单
rabbitTemplate.convertAndSend("delayedExchange","sectest",message,message1->{//设置发送消息的延长时间 单位:ms,表示30分钟
message1.getMessageProperties().setDelay(1000*60*30);return message1;});//更新消息状态,消费成功,
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else{//延迟投入死信,进行重试
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}else{//1、处理消息内容的业务,解析json数据//2、创建订单,并保存//投入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}catch(Exception e){System.out.println("错误信息");}}privatebooleanconsumeOrder(Shop shop){returntrue;}@RabbitListener(queues ={" delay.queue.demo.delay.queue"})publicvoiddead(String payload,Message message,Channel channel)throwsIOException{System.out.println("死信队列:"+payload);//删除消息 将数据库状态更新为失败,更新邮件或者消息通知,有时候可以人工消费long deliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,true);}@RabbitListener(queues ="delayedqueue")publicvoidreceivemsg(Message messages){//查询有没有被消费,也就是更新成功,有时候需要乐观锁}}
至此mq的消息重复以及幂等的信息处理就很完美的解决了,当然本文以数据库为例进行实现,感兴趣的可以尝试使用redis来进行实现
版权归原作者 weihe_7306 所有, 如有侵权,请联系我们删除。