0


RabbitMq应用延时消息

一.建立绑定关系

package com.lx.mq.bind;

import com.lx.constant.MonitorEventConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author liuweiping.com
 * @version 1.0
 * @date 2023-06-26 10:04:03
 */
@Slf4j
@Configuration
public class MonitorRabbitMqBinding {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    /**
     * Description: 延迟消息 <br/>
     * Created By: liu wei ping <br/>
     * Creation Time: 2023年6月26日 下午6:59:43 <br/>
     * <br/>
     * @return <br/>
     */
    @Bean("delayExchange")
    public CustomExchange buildDelayedMessageNoticeExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile, "x-delayed-message", Boolean.FALSE, Boolean.FALSE, args);
    }

    @Bean
    public Queue buildDelayedMessageNoticeQueue(){
        return QueueBuilder.durable(MonitorEventConst.MONITOR_DELAYED_MESSAGE_QUEUE + profile).build();
    }

    @Bean
    public Binding buildDelayedMessageNoticeBinding(){
        return BindingBuilder.bind(buildDelayedMessageNoticeQueue()).to(buildDelayedMessageNoticeExchange()).with(MonitorEventConst.MONITOR_DELAYED_MESSAGE_ROUTING_KEY).noargs();
    }

    /**
     * 交车完成事件消息定时处理队列
     */
    @Bean
    public Queue deliveryCompleteEventHandQueue() {
        return QueueBuilder.durable(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + profile).build();
    }

    /**
     * 交车完成事件消息定时处理队列绑定
     */
    @Bean
    public Binding deliveryCompleteBinding() {
        return BindingBuilder.bind(deliveryCompleteEventHandQueue())
                .to(buildDelayedMessageNoticeExchange())
                .with(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY)
                .noargs();
    }

}

二.建立生产者

1.消息实体

package com.lx.dto.monitor;

import lombok.Data;

import java.util.Date;

/**
 * @author liuweiping.com
 * @version 1.0
 * @date 2023-06-26 10:11:06
 */
@Data
public class MonitorEventMessage {

    /**
     * 事件id
     */
    private String eventId;

    /**
     * 事件编码
     */
    private String eventCode;

    /**
     * 业务数据
     */
    private String businessUniqueKey;

    /**
     * 业务类型
     */
    private String businessType;

    /**
     * 到期时间
     */
    private Long expireMillis;

    /**
     *  时间处理唯一版本号
     */
    private Integer eventHandVersion;

    /**
     * 定时处理时间
     */
    private Date timedOperationTime;

    public void setTimedOperationTime(Date timedOperationTime) {
        this.timedOperationTime = timedOperationTime;
        expireMillis = timedOperationTime.getTime() - new Date().getTime();
        if (expireMillis < 0) {
            expireMillis = 0L;
        }
    }
}
package com.lx.mq.producer;

import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 监控事件消息发送类
 */
@Slf4j
@Component
public class MonitorEventMessageProducer {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  交车完成监控事件定时发送
     */
    public void sendDeliveryCompleteEventHandMessage(MonitorEventMessage monitorEventMessage) {
        String message = JsonUtil.toJson(monitorEventMessage);;
        rabbitTemplate.convertAndSend(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile,
                MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY,
                message,
                msg -> {
                    msg.getMessageProperties().setDelay(monitorEventMessage.getExpireMillis().intValue());
                    return msg;
                });
        log.info("sending event processing messages: {}", message);//发送事件处理消息
    }

  
}

三.建立消费者

package com.lx.mq.consumer;

import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * 监控事件消息发送类
 */
@Slf4j
@Component
public class MonitorEventMessageConsumer {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    /**
     *  交车完成事件处理mq监听
     */
    @RabbitListener(queues = MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + "-${spring.profiles.active}")
    public void dealWithDeliveryCompleteEventHandMessage(String eventMessage, Channel channel, Message message) {
        log.info("dealWithDeliveryCompleteEventHandMessage:【{}】", JsonUtil.toJson(eventMessage));
        String str = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("Received the message of regular loading and unloading of goods: {}", str); //收到商品定时上下架消息
        MonitorEventMessage monitorEventMessage = JsonUtil.toBean(eventMessage, MonitorEventMessage.class);
        try {
            analyzeHand(monitorEventMessage);
        }catch (Exception e){
            log.error("交车完成事件分析失败,参数:{},e:{}",JsonUtil.toJson(monitorEventMessage),JsonUtil.toJson(e));
        }
    }

    /**
     *  事件分析
     * @param monitorEventMessage
     */
    private void  analyzeHand(MonitorEventMessage monitorEventMessage) throws Exception {

    }
}

四.测试类测试

package com.lx.controller;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.lx.conf.MQConfig;
import com.lx.conmon.ResultData;
import com.lx.dto.monitor.MonitorEventMessage;
import com.lx.mq.producer.MonitorEventMessageProducer;
import com.lx.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.lx.constant.RedisMange;
import com.lx.utils.RedisUtil;
import org.thymeleaf.util.DateUtils;

/**
 * @Author : liu wei ping
 * @CreateTime : 2019/9/3
 * @Description :
 **/

@RestController
public class SendMessageController {

    @Autowired
    private MonitorEventMessageProducer messageProducer;

 
    @GetMapping("/sendTopicMessage3")
    public ResultData<String> sendTopicMessage3() {
        MonitorEventMessage monitorEventMessage = new MonitorEventMessage();
        monitorEventMessage.setEventCode("delivery");
        //设置定时处理时间= 当前时间+ 定时处理时长
        monitorEventMessage.setTimedOperationTime(DateUtil.date(DateUtil.getCurrentMillis() + 30 * 1000));
        monitorEventMessage.setBusinessType("deliveryType");
        messageProducer.sendDeliveryCompleteEventHandMessage(monitorEventMessage);
        return new ResultData<>("ok");
    }
}

五.效果如图所示

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/qq_33816292/article/details/131506360
版权归原作者 孰能生巧-LWP 所有, 如有侵权,请联系我们删除。

“RabbitMq应用延时消息”的评论:

还没有评论