使用StreamBridge实现RabbitMq && 延时消息
Maven依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
延时消息需要安装插件
下载地址:link
1.下载完成放到rabbitmq安装目录plugins下
2.执行命令启用插件
3.重启mq
rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启用插件//重启mq
rabbitmq-server stop
rabbitmq-server start
Exchanges -> add a new exchange -> type 出现x-delayed-message即安装成功
yml配置
spring:
rabbitmq:
host: localhost
port:5672
username: xxxx
password: xxxx
function:
# 与消费者对应(消费者方法名称)
definition: ackMessage;normal;delay
stream:
rabbit:
bindings:
ackMessage-in-0:
consumer:
acknowledge-mode: manual # manual手动确认 ,auto 自动确认
delay-in-0:
consumer:
delayedExchange:true # 开启延时
delay-out-0:
producer:
delayedExchange:true # 开启延时
bindings:
delay-in-0:
destination: delay.exchange.cloud # mq对应交换机
content-type: application/json
consumer:
acknowledge-mode: auto # manual手动确认 ,auto 自动确认
group: delay-group # 消息组
binder: rabbit
delay-out-0:
destination: delay.exchange.cloud
content-type: application/json
group: delay-group
binder: rabbit
ackMessage-in-0:
destination: ackMessage.exchange.cloud
content-type: application/json
consumer:
acknowledge-mode: manual # manual手动确认 ,auto 自动确认
group: ackMessage-group
binder: rabbit
ackMessage-out-0:
destination: ackMessage.exchange.cloud
content-type: application/json
group: ackMessage-group
binder: rabbit
normal-in-0:
destination: normal.exchange.cloud
content-type: application/json
consumer:
acknowledge-mode: auto # manual手动确认 ,auto 自动确认
group: normal-group
binder: rabbit
normal-out-0:
destination: normal.exchange.cloud
content-type: application/json
group: normal-group
binder: rabbit
接口controller
importcom.alibaba.fastjson2.JSON;importlombok.AllArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.web.bind.annotation.*;importjava.time.LocalDateTime;/**
* @Description: RabbitmqController
*/@Slf4j@RestController@AllArgsConstructor@RequestMapping("/mq")publicclassMqController{//消息发送者privatefinalRabbitMqProducer rabbitMqProducer;/**
* 发送普通消息Rabbitmq
* bindingName 绑定队列名称
* @param msg 消息内容
*/@GetMapping("/sendMessage/{msg}/{bindingName}")publicR<Void>sendMessage(@PathVariable("msg")String msg,@PathVariable("bindingName")String bindingName){
log.info(bindingName +"发送消息: "+ msg);
rabbitMqProducer.sendMsg(msg, bindingName);returnR.ok();}/**
* 发送延迟消息
*
* @param message 消息实体
* @return
*/@PostMapping("/sendDelayedMessage")publicR<Void>sendDelayedMessage(@RequestBodyMessage message){
log.info(MqTExchangesEnum.delay +"发送延时消息: "+LocalDateTime.now()+" "+ message);
rabbitMqProducer.sendDelayMsg(JSON.toJSONString(message), message.getBindingName(), message.getSeconds());// 延迟时间(秒)returnR.ok();}}
发送者
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.cloud.stream.function.StreamBridge;importorg.springframework.messaging.Message;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Component;importjava.util.UUID;/**
* RabbitMq消息生产者
*/@ComponentpublicclassRabbitMqProducer{@AutowiredprivateStreamBridge streamBridge;/**
* @Description RabbitMq消息生产者
* @Param msg 消息内容
* @Param bindingName exchange绑定queue名称
**/publicvoidsendMsg(String msg,String bindingName){// 构建消息对象Messaging messaging =newMessaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);Message<Messaging> message =MessageBuilder.withPayload(messaging).build();
streamBridge.send(bindingName, message);}/**
* 发送延迟消息
*
* @param msg
* @param bindingName
* @param seconds
*/publicvoidsendDelayMsg(String msg,String bindingName,Integer seconds){// 构建消息对象Messaging messaging =newMessaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);Message<Messaging> message =MessageBuilder.withPayload(messaging).setHeader("x-delay", seconds *1000).build();
streamBridge.send(bindingName, message);}}
消费者
importcom.alibaba.fastjson2.JSON;importcom.alibaba.fastjson2.JSONObject;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.http.HttpHeaders;importorg.springframework.messaging.Message;importorg.springframework.stereotype.Component;importorg.springframework.util.MultiValueMap;importjava.time.LocalDateTime;importjava.util.function.Consumer;/**
* RabbitMq消息消费者
*/@Component@Slf4jpublicclassRabbitMqConsumer{/**
* mq接收ackMessage消息/手动ack确认
* @methodName 配置文件对应
**/@BeanConsumer<Message<Messaging>>ackMessage(){
log.info("ackMessage-初始化订阅");return obj ->{Channel channel = obj.getHeaders().get(AmqpHeaders.CHANNEL,Channel.class);Long deliveryTag = obj.getHeaders().get(AmqpHeaders.DELIVERY_TAG,Long.class);try{
log.info("ackMessage-消息接收成功:"+ obj.getPayload());//业务逻辑处理//ack确认
channel.basicAck(deliveryTag,false);}catch(Exception e){//重新回队列-true则重新入队列,否则丢弃或者进入死信队列。// channel.basicReject(deliveryTag, true);
log.error(e.getMessage());}};}/**
* mq接收normal消息
**/@BeanConsumer<Messaging>normal(){
log.info("normal-初始化订阅");return obj ->{
log.info("normal-消息接收成功:"+ obj);//业务逻辑处理};}/**
* mq接收延时消息
* Messaging 发送实体消息接收实体消息
**/@BeanConsumer<Message<Messaging>>delay(){
log.info("delay-初始化订阅");return obj ->{Messaging payload = obj.getPayload();
log.info("delay-消息接收成功:"+LocalDateTime.now()+" "+ payload);//业务逻辑处理};}}
本文转载自: https://blog.csdn.net/m0_59069917/article/details/128254750
版权归原作者 在搬砖 所有, 如有侵权,请联系我们删除。
版权归原作者 在搬砖 所有, 如有侵权,请联系我们删除。