0


rabbitMq动态创建和监听队列

rabbitMq动态创建和监听队列

背景

在与第三方业务员系统对接时,需要根据第三方的信息的进行队列的创建,且个数不定,这就造成使用@RabbitListener来添加监听不方便。本文采用了当业务需要时,动态的创建队列和监听队列的方式,适合某个任务为一组的队列方式,需要考虑队列使用完成后的处理方式。

引入jar

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

application.properties

// mq相关配置,简单使用rabbitmq只需配置以下几个配置项即可使用
spring.rabbitmq.host=124.222.229.252
spring.rabbitmq.port=15677
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

生产者端

封装方法,动态创建队列

    @Autowired
    private AmqpAdmin amqpAdmin;
    @Override
    publicvoidcreateQueue(String queueName){//判断队列是否存在if(!isExistQueue(queueName)){//创建队列
            amqpAdmin.declareQueue(newQueue(queueName,true));}
        logger.info("创建队列:{}成功",queueName);}/**方法描述: 判断队列是否存在
     * @author lin
     * @date 2023/4/3 15:11
     * @param queueName
     * @return boolean  true-存在
     */public boolean isExistQueue(String queueName){
        boolean flag =true;if(StringUtils.isNotBlank(queueName)){
            Properties queueProperties = amqpAdmin.getQueueProperties(queueName);if(queueProperties ==null){
                flag =false;}}else{thrownewRuntimeException("队列名称为空");}return flag;}

往指定队列发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    publicvoidsendMessage(String queueName, String tatus){if(!isExistQueue(queueName)){thrownewRuntimeException("发送消息失败,队列为空");}//定义业务消息实体
        SignTaskMessageVo messageVo =newSignTaskMessageVo();
        messageVo.setTaskId(queueName);
        messageVo.setStatus(taskStatus);
        messageVo.setSendTime(DateUtils.getCurrentTime());//往指定队列发送消息
        rabbitTemplate.convertAndSend(queueName, JSONObject.toJSONString(messageVo));
        logger.info("发送消息成功:{}", messageVo);}

消费者端

java动态添加监听的队列

配置类

@Configuration
publicclassRabbitConfig{/**方法描述:  简单消息监听器
     * @author lin
     * @date 2023/4/3 14:58
     * @param connectionFactory
     * @return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
    */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(connectionFactory);return container;}}

消息监听处理类

@Component
publicclassMessageListenerImplimplementsMessageListener{privatestatic final Logger logger = LoggerFactory.getLogger(MessageListenerImpl.class);
    @Override
    publicvoidonMessage(Message message){try{
            String mes =newString(message.getBody(),"utf-8");
            logger.info("监听到消息:{}",mes);//todo 业务处理}catch(UnsupportedEncodingException e){
            e.printStackTrace();}}}

动态添加监听

     @Autowired
    private SimpleMessageListenerContainer container;
    @Autowired
    private MessageListenerImpl message;
    @Override
    publicvoidaddListener(String queueName){//获取当前监听的队列名称
        String[] strings=container.getQueueNames();
        List<String> list=Arrays.asList(strings);if(!list.contains(queueName)){
            container.addQueueNames(queueName);//设置消息监听处理类
            container.setMessageListener(message);}}

前端js监听并消费

引入stomp.js(cv大法或者自己引入)

// Generated by CoffeeScript 1.7.1/*
   Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0
   Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/)
   Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com)
 */(function(){var Byte, Client, Frame, Stomp,
    __hasProp ={}.hasOwnProperty,
    __slice =[].slice;

  Byte ={LF:'\x0A',NULL:'\x00'};

  Frame =(function(){var unmarshallSingle;functionFrame(command, headers, body){this.command = command;this.headers = headers !=null? headers :{};this.body = body !=null? body :'';}Frame.prototype.toString=function(){var lines, name, skipContentLength, value, _ref;
      lines =[this.command];
      skipContentLength =this.headers['content-length']===false?true:false;if(skipContentLength){deletethis.headers['content-length'];}
      _ref =this.headers;for(name in _ref){if(!__hasProp.call(_ref, name))continue;
        value = _ref[name];
        lines.push(""+ name +":"+ value);}if(this.body &&!skipContentLength){
        lines.push("content-length:"+(Frame.sizeOfUTF8(this.body)));}
      lines.push(Byte.LF+this.body);return lines.join(Byte.LF);};

    Frame.sizeOfUTF8=function(s){if(s){returnencodeURI(s).match(/%..|./g).length;}else{return0;}};unmarshallSingle=function(data){var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _len, _ref, _ref1;
      divider = data.search(RegExp(""+ Byte.LF+ Byte.LF));
      headerLines = data.substring(0, divider).split(Byte.LF);
      command = headerLines.shift();
      headers ={};trim=function(str){return str.replace(/^\s+|\s+$/g,'');};
      _ref = headerLines.reverse();for(_i =0, _len = _ref.length; _i < _len; _i++){
        line = _ref[_i];
        idx = line.indexOf(':');
        headers[trim(line.substring(0, idx))]=trim(line.substring(idx +1));}
      body ='';
      start = divider +2;if(headers['content-length']){
        len =parseInt(headers['content-length']);
        body =(''+ data).substring(start, start + len);}else{
        chr =null;for(i = _j = start, _ref1 = data.length; start <= _ref1 ? _j < _ref1 : _j > _ref1; i = start <= _ref1 ?++_j :--_j){
          chr = data.charAt(i);if(chr === Byte.NULL){break;}
          body += chr;}}returnnewFrame(command, headers, body);};

    Frame.unmarshall=function(datas){var data;return(function(){var _i, _len, _ref, _results;
        _ref = datas.split(RegExp(""+ Byte.NULL+ Byte.LF+"*"));
        _results =[];for(_i =0, _len = _ref.length; _i < _len; _i++){
          data = _ref[_i];if((data !=null? data.length :void0)>0){
            _results.push(unmarshallSingle(data));}}return _results;})();};

    Frame.marshall=function(command, headers, body){var frame;
      frame =newFrame(command, headers, body);return frame.toString()+ Byte.NULL;};return Frame;})();

  Client =(function(){var now;functionClient(ws){this.ws = ws;this.ws.binaryType ="arraybuffer";this.counter =0;this.connected =false;this.heartbeat ={outgoing:10000,incoming:10000};this.maxWebSocketFrameSize =16*1024;this.subscriptions ={};}Client.prototype.debug=function(message){var _ref;returntypeof window !=="undefined"&& window !==null?(_ref = window.console)!=null? _ref.log(message):void0:void0;};now=function(){if(Date.now){return Date.now();}else{returnnewDate().valueOf;}};Client.prototype._transmit=function(command, headers, body){var out;
      out = Frame.marshall(command, headers, body);if(typeofthis.debug ==="function"){this.debug(">>> "+ out);}while(true){if(out.length >this.maxWebSocketFrameSize){this.ws.send(out.substring(0,this.maxWebSocketFrameSize));
          out = out.substring(this.maxWebSocketFrameSize);if(typeofthis.debug ==="function"){this.debug("remaining = "+ out.length);}}else{returnthis.ws.send(out);}}};Client.prototype._setupHeartbeat=function(headers){var serverIncoming, serverOutgoing, ttl, v, _ref, _ref1;if((_ref = headers.version)!== Stomp.VERSIONS.V1_1&& _ref !== Stomp.VERSIONS.V1_2){return;}
      _ref1 =(function(){var _i, _len, _ref1, _results;
        _ref1 = headers['heart-beat'].split(",");
        _results =[];for(_i =0, _len = _ref1.length; _i < _len; _i++){
          v = _ref1[_i];
          _results.push(parseInt(v));}return _results;})(), serverOutgoing = _ref1[0], serverIncoming = _ref1[1];if(!(this.heartbeat.outgoing ===0|| serverIncoming ===0)){
        ttl = Math.max(this.heartbeat.outgoing, serverIncoming);if(typeofthis.debug ==="function"){this.debug("send PING every "+ ttl +"ms");}this.pinger = Stomp.setInterval(ttl,(function(_this){returnfunction(){
            _this.ws.send(Byte.LF);returntypeof _this.debug ==="function"? _this.debug(">>> PING"):void0;};})(this));}if(!(this.heartbeat.incoming ===0|| serverOutgoing ===0)){
        ttl = Math.max(this.heartbeat.incoming, serverOutgoing);if(typeofthis.debug ==="function"){this.debug("check PONG every "+ ttl +"ms");}returnthis.ponger = Stomp.setInterval(ttl,(function(_this){returnfunction(){var delta;
            delta =now()- _this.serverActivity;if(delta > ttl *2){if(typeof _this.debug ==="function"){
                _this.debug("did not receive server activity for the last "+ delta +"ms");}return _this.ws.close();}};})(this));}};Client.prototype._parseConnect=function(){var args, connectCallback, errorCallback, headers;
      args =1<= arguments.length ?__slice.call(arguments,0):[];
      headers ={};switch(args.length){case2:
          headers = args[0], connectCallback = args[1];break;case3:if(args[1]instanceofFunction){
            headers = args[0], connectCallback = args[1], errorCallback = args[2];}else{
            headers.login = args[0], headers.passcode = args[1], connectCallback = args[2];}break;case4:
          headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3];break;default:
          headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3], headers.host = args[4];}return[headers, connectCallback, errorCallback];};Client.prototype.connect=function(){var args, errorCallback, headers, out;
      args =1<= arguments.length ?__slice.call(arguments,0):[];
      out =this._parseConnect.apply(this, args);
      headers = out[0],this.connectCallback = out[1], errorCallback = out[2];if(typeofthis.debug ==="function"){this.debug("Opening Web Socket...");}this.ws.onmessage =(function(_this){returnfunction(evt){var arr, c, client, data, frame, messageID, onreceive, subscription, _i, _len, _ref, _results;
          data =typeof ArrayBuffer !=='undefined'&& evt.data instanceofArrayBuffer?(arr =newUint8Array(evt.data),typeof _this.debug ==="function"? _this.debug("--- got data length: "+ arr.length):void0,((function(){var _i, _len, _results;
            _results =[];for(_i =0, _len = arr.length; _i < _len; _i++){
              c = arr[_i];
              _results.push(String.fromCharCode(c));}return _results;})()).join('')): evt.data;
          _this.serverActivity =now();if(data === Byte.LF){if(typeof _this.debug ==="function"){
              _this.debug("<<< PONG");}return;}if(typeof _this.debug ==="function"){
            _this.debug("<<< "+ data);}
          _ref = Frame.unmarshall(data);
          _results =[];for(_i =0, _len = _ref.length; _i < _len; _i++){
            frame = _ref[_i];switch(frame.command){case"CONNECTED":if(typeof _this.debug ==="function"){
                  _this.debug("connected to server "+ frame.headers.server);}
                _this.connected =true;
                _this._setupHeartbeat(frame.headers);
                _results.push(typeof _this.connectCallback ==="function"? _this.connectCallback(frame):void0);break;case"MESSAGE":
                subscription = frame.headers.subscription;
                onreceive = _this.subscriptions[subscription]|| _this.onreceive;if(onreceive){
                  client = _this;
                  messageID = frame.headers["message-id"];
                  frame.ack=function(headers){if(headers ==null){
                      headers ={};}return client.ack(messageID, subscription, headers);};
                  frame.nack=function(headers){if(headers ==null){
                      headers ={};}return client.nack(messageID, subscription, headers);};
                  _results.push(onreceive(frame));}else{
                  _results.push(typeof _this.debug ==="function"? _this.debug("Unhandled received MESSAGE: "+ frame):void0);}break;case"RECEIPT":
                _results.push(typeof _this.onreceipt ==="function"? _this.onreceipt(frame):void0);break;case"ERROR":
                _results.push(typeof errorCallback ==="function"?errorCallback(frame):void0);break;default:
                _results.push(typeof _this.debug ==="function"? _this.debug("Unhandled frame: "+ frame):void0);}}return _results;};})(this);this.ws.onclose =(function(_this){returnfunction(){var msg;
          msg ="Whoops! Lost connection to "+ _this.ws.url;if(typeof _this.debug ==="function"){
            _this.debug(msg);}
          _this._cleanUp();returntypeof errorCallback ==="function"?errorCallback(msg):void0;};})(this);returnthis.ws.onopen =(function(_this){returnfunction(){if(typeof _this.debug ==="function"){
            _this.debug('Web Socket Opened...');}
          headers["accept-version"]= Stomp.VERSIONS.supportedVersions();
          headers["heart-beat"]=[_this.heartbeat.outgoing, _this.heartbeat.incoming].join(',');return _this._transmit("CONNECT", headers);};})(this);};Client.prototype.disconnect=function(disconnectCallback, headers){if(headers ==null){
        headers ={};}this._transmit("DISCONNECT", headers);this.ws.onclose =null;this.ws.close();this._cleanUp();returntypeof disconnectCallback ==="function"?disconnectCallback():void0;};Client.prototype._cleanUp=function(){this.connected =false;if(this.pinger){
        Stomp.clearInterval(this.pinger);}if(this.ponger){return Stomp.clearInterval(this.ponger);}};Client.prototype.send=function(destination, headers, body){if(headers ==null){
        headers ={};}if(body ==null){
        body ='';}
      headers.destination = destination;returnthis._transmit("SEND", headers, body);};Client.prototype.subscribe=function(destination, callback, headers){var client;if(headers ==null){
        headers ={};}if(!headers.id){
        headers.id ="sub-"+this.counter++;}
      headers.destination = destination;this.subscriptions[headers.id]= callback;this._transmit("SUBSCRIBE", headers);
      client =this;return{id: headers.id,unsubscribe:function(){return client.unsubscribe(headers.id);}};};Client.prototype.unsubscribe=function(id){deletethis.subscriptions[id];returnthis._transmit("UNSUBSCRIBE",{id: id
      });};Client.prototype.begin=function(transaction){var client, txid;
      txid = transaction ||"tx-"+this.counter++;this._transmit("BEGIN",{transaction: txid
      });
      client =this;return{id: txid,commit:function(){return client.commit(txid);},abort:function(){return client.abort(txid);}};};Client.prototype.commit=function(transaction){returnthis._transmit("COMMIT",{transaction: transaction
      });};Client.prototype.abort=function(transaction){returnthis._transmit("ABORT",{transaction: transaction
      });};Client.prototype.ack=function(messageID, subscription, headers){if(headers ==null){
        headers ={};}
      headers["message-id"]= messageID;
      headers.subscription = subscription;returnthis._transmit("ACK", headers);};Client.prototype.nack=function(messageID, subscription, headers){if(headers ==null){
        headers ={};}
      headers["message-id"]= messageID;
      headers.subscription = subscription;returnthis._transmit("NACK", headers);};return Client;})();

  Stomp ={VERSIONS:{V1_0:'1.0',V1_1:'1.1',V1_2:'1.2',supportedVersions:function(){return'1.1,1.0';}},client:function(url, protocols){var klass, ws;if(protocols ==null){
        protocols =['v10.stomp','v11.stomp'];}
      klass = Stomp.WebSocketClass || WebSocket;
      ws =newklass(url, protocols);returnnewClient(ws);},over:function(ws){returnnewClient(ws);},Frame: Frame
  };if(typeof exports !=="undefined"&& exports !==null){
    exports.Stomp = Stomp;}if(typeof window !=="undefined"&& window !==null){
    Stomp.setInterval=function(interval, f){return window.setInterval(f, interval);};
    Stomp.clearInterval=function(id){return window.clearInterval(id);};
    window.Stomp = Stomp;}elseif(!exports){
    self.Stomp = Stomp;}}).call(this);

index.html代码示例

<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta http-equiv="X-UA-Compatible" content="ie=edge"><title>消费者示例</title>//需要引入stomp.js<script src="./stomp.js"></script></head><body>监听到的消息:<span id ="dataId"></span><br><script>var queueName ="ceshi-1";//获取到的队列名称// 初始化 ws 对象,需要rabbitMq开放协议,并监听ws端口号var ws =newWebSocket('ws://124.222.229.252:15674/ws');// 获得Stomp client对象var client = Stomp.over(ws);// 指定适当的心跳参数保持长时间稳定连接
        client.heartbeat.outgoing =10000;
        client.heartbeat.incoming =10000;// 定义连接成功回调函数varon_connect=function(x){//订阅名为 xxx队列中的消息 ,并手动消费
            client.subscribe(queueName,function(data){var msg = data.body;
               console.log("收到数据:"+ msg);
               data.ack();
               document.getElementById("dataId").innerText=msg;},{ack:'client'});//由客户端确认已经收到消息   };//定义连接成功回调函数/*var on_connect = function (x) {
            //订阅名为amq.topic的exchange(交换器)中的消息 ,并指定routingKey,对接时向服务端确认
            //指定自带的交换器amq.topic,wsId是路由key动态生成,需根据服务端返回的数据
            client.subscribe("/exchange/amq.topic/wsId.12345", function (data) {
                var msg = data.body;
               console.log("收到数据:" + msg);
               data.ack();
               document.getElementById("dataId").innerText=msg;
            },{ack:'client'});//由客户端确认已经收到消息   
        };*/// 定义错误时回调函数varon_error=function(){
            console.log('error');};// 连接RabbitMQ
        client.connect('guest','guest', on_connect, on_error,'/');//向名为debugQueue的队列中发送消息 ,并指定routingKey /*function send(){
            //将消息的发送放在一个事务内
            var tx=client.begin();
            client.send("/queque/debugQueue/debug",{content_Type:"application/json",transaction:tx.id},"hello!!!!!");
            //.....一系列的异步操作,tx.abort()->终止事务
            tx.commit();//提交消息
        }*///程序退出时候,断开连接//client.disconnect();</script></body></html>

打开index.html

在这里插入图片描述

标签: rabbitmq java

本文转载自: https://blog.csdn.net/qq_42223485/article/details/130052546
版权归原作者 林小花~ 所有, 如有侵权,请联系我们删除。

“rabbitMq动态创建和监听队列”的评论:

还没有评论