0


使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息

使用StreamBridge实现RabbitMq && 延时消息

Maven依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

延时消息需要安装插件

下载地址:link
1.下载完成放到rabbitmq安装目录plugins下
2.执行命令启用插件
3.重启mq

rabbitmq-plugins enable rabbitmq_delayed_message_exchange  // 启用插件//重启mq
rabbitmq-server stop
rabbitmq-server start

Exchanges -> add a new exchange -> type 出现x-delayed-message即安装成功

Exchanges -> add a new exchange -> type 出现x-delayed-message即安装成功

yml配置

spring:
  rabbitmq:
    host: localhost
    port:5672
    username: xxxx
    password: xxxx

  function:
      # 与消费者对应(消费者方法名称)
      definition: ackMessage;normal;delay
    stream:
      rabbit:
        bindings:
          ackMessage-in-0:
            consumer:
              acknowledge-mode: manual  # manual手动确认 ,auto 自动确认
          delay-in-0:
            consumer:
              delayedExchange:true # 开启延时
          delay-out-0:
            producer:
              delayedExchange:true  # 开启延时

      bindings:
        delay-in-0:
          destination: delay.exchange.cloud  # mq对应交换机
          content-type: application/json
          consumer:
            acknowledge-mode: auto # manual手动确认 ,auto 自动确认
          group: delay-group    # 消息组
          binder: rabbit
        delay-out-0:
          destination: delay.exchange.cloud
          content-type: application/json
          group: delay-group
          binder: rabbit

        ackMessage-in-0:
          destination: ackMessage.exchange.cloud
          content-type: application/json
          consumer:
            acknowledge-mode: manual # manual手动确认 ,auto 自动确认
          group: ackMessage-group
          binder: rabbit
        ackMessage-out-0:
          destination: ackMessage.exchange.cloud
          content-type: application/json
          group: ackMessage-group
          binder: rabbit
        normal-in-0:
          destination: normal.exchange.cloud
          content-type: application/json
          consumer:
            acknowledge-mode: auto # manual手动确认 ,auto 自动确认
          group: normal-group
          binder: rabbit
        normal-out-0:
          destination: normal.exchange.cloud
          content-type: application/json
          group: normal-group
          binder: rabbit

接口controller

importcom.alibaba.fastjson2.JSON;importlombok.AllArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.web.bind.annotation.*;importjava.time.LocalDateTime;/**
 * @Description: RabbitmqController
 */@Slf4j@RestController@AllArgsConstructor@RequestMapping("/mq")publicclassMqController{//消息发送者privatefinalRabbitMqProducer rabbitMqProducer;/**
     * 发送普通消息Rabbitmq
     * bindingName 绑定队列名称
     * @param msg 消息内容
     */@GetMapping("/sendMessage/{msg}/{bindingName}")publicR<Void>sendMessage(@PathVariable("msg")String msg,@PathVariable("bindingName")String bindingName){
        log.info(bindingName +"发送消息: "+ msg);
        rabbitMqProducer.sendMsg(msg, bindingName);returnR.ok();}/**
     * 发送延迟消息
     *
     * @param message  消息实体
     * @return
     */@PostMapping("/sendDelayedMessage")publicR<Void>sendDelayedMessage(@RequestBodyMessage message){
        log.info(MqTExchangesEnum.delay +"发送延时消息: "+LocalDateTime.now()+"  "+ message);
        rabbitMqProducer.sendDelayMsg(JSON.toJSONString(message), message.getBindingName(), message.getSeconds());// 延迟时间(秒)returnR.ok();}}

发送者

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.cloud.stream.function.StreamBridge;importorg.springframework.messaging.Message;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Component;importjava.util.UUID;/**
 * RabbitMq消息生产者
 */@ComponentpublicclassRabbitMqProducer{@AutowiredprivateStreamBridge streamBridge;/**
     * @Description RabbitMq消息生产者
     * @Param msg 消息内容
     * @Param bindingName  exchange绑定queue名称
     **/publicvoidsendMsg(String msg,String bindingName){// 构建消息对象Messaging messaging =newMessaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);Message<Messaging> message =MessageBuilder.withPayload(messaging).build();
        streamBridge.send(bindingName, message);}/**
     * 发送延迟消息
     *
     * @param msg
     * @param bindingName
     * @param seconds
     */publicvoidsendDelayMsg(String msg,String bindingName,Integer seconds){// 构建消息对象Messaging messaging =newMessaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);Message<Messaging> message =MessageBuilder.withPayload(messaging).setHeader("x-delay", seconds *1000).build();
        streamBridge.send(bindingName, message);}}

消费者

importcom.alibaba.fastjson2.JSON;importcom.alibaba.fastjson2.JSONObject;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.support.AmqpHeaders;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.http.HttpHeaders;importorg.springframework.messaging.Message;importorg.springframework.stereotype.Component;importorg.springframework.util.MultiValueMap;importjava.time.LocalDateTime;importjava.util.function.Consumer;/**
 * RabbitMq消息消费者
 */@Component@Slf4jpublicclassRabbitMqConsumer{/**
     * mq接收ackMessage消息/手动ack确认
     * @methodName 配置文件对应
     **/@BeanConsumer<Message<Messaging>>ackMessage(){
        log.info("ackMessage-初始化订阅");return obj ->{Channel channel = obj.getHeaders().get(AmqpHeaders.CHANNEL,Channel.class);Long deliveryTag = obj.getHeaders().get(AmqpHeaders.DELIVERY_TAG,Long.class);try{
                log.info("ackMessage-消息接收成功:"+ obj.getPayload());//业务逻辑处理//ack确认
                channel.basicAck(deliveryTag,false);}catch(Exception e){//重新回队列-true则重新入队列,否则丢弃或者进入死信队列。//                    channel.basicReject(deliveryTag, true);
                log.error(e.getMessage());}};}/**
     * mq接收normal消息
     **/@BeanConsumer<Messaging>normal(){
        log.info("normal-初始化订阅");return obj ->{
            log.info("normal-消息接收成功:"+ obj);//业务逻辑处理};}/**
     * mq接收延时消息
     * Messaging 发送实体消息接收实体消息
     **/@BeanConsumer<Message<Messaging>>delay(){
        log.info("delay-初始化订阅");return obj ->{Messaging payload = obj.getPayload();
            log.info("delay-消息接收成功:"+LocalDateTime.now()+"  "+ payload);//业务逻辑处理};}}

本文转载自: https://blog.csdn.net/m0_59069917/article/details/128254750
版权归原作者 在搬砖 所有, 如有侵权,请联系我们删除。

“使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息”的评论:

还没有评论