0


RabbitMQ开启MQTT协议支持

1)RabbitMQ启用MQTT插件

root@mq:/# rabbitmq-plugins enable rabbitmq_mqtt
Enabling plugins on node rabbit@mq:
rabbitmq_mqtt
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_mqtt
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq...
The following plugins have been enabled:
  rabbitmq_mqtt

started 1 plugins.
root@mq:/# rabbitmq-plugins enable rabbitmq_web_mqtt
Enabling plugins on node rabbit@mq:
rabbitmq_web_mqtt
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_mqtt
  rabbitmq_web_dispatch
  rabbitmq_web_mqtt
Applying plugin configuration to rabbit@mq...
The following plugins have been enabled:
  rabbitmq_web_mqtt

started 1 plugins.
root@mq:/# 

2)RabbitMQ管理控制台查看
如果插件启动成功,rabbitmq会打开1883和15675端口:
在这里插入图片描述

3)用MQTTX工具测试
在这里插入图片描述
4)用eclipse paho客户端测试
添加依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

收发消息测试

@RestControllerpublicclassDemoController{@GetMapping("/publish")publicStringpublish()throwsMqttException{MqttClientPersistence persistence =newMemoryPersistence();;//内存持久化MqttClient client =newMqttClient("tcp://192.168.137.138:1883","abc", persistence);//连接选项中定义用户名密码和其它配置MqttConnectOptions options =newMqttConnectOptions();
        options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
        options.setAutomaticReconnect(true);//是否自动重连
        options.setConnectionTimeout(30);//连接超时时间  秒
        options.setKeepAliveInterval(10);//连接保持检查周期  秒
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);//版本
        options.setUserName("xjs1919");
        options.setPassword("123321".toCharArray());// client.setManualAcks(true);
        client.connect(options);//连接//订阅topic
        client.subscribe("demoTopic",2);// 设置回调,将来收到消息的时候会被回调
        client.setCallback(newMqttCallbackExtended(){@OverridepublicvoidconnectComplete(boolean reconnect,String serverURI){System.out.println("连接完成");}@OverridepublicvoidconnectionLost(Throwable cause){System.out.println("连接丢失");}@OverridepublicvoidmessageArrived(String topic,MqttMessage message)throwsException{System.out.println("收到消息,topic:"+topic +", msg:"+newString(message.getPayload()));//client.messageArrivedComplete(message.getId(),message.getQos());}@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token){// Qos0: 当消息发送出去就回调// Qos1: 当发送者收到了puback的时候的回调// Qos2: 当发送者收到了pubcomp的时候的回调System.out.println("消息发送完成");}});

        client.publish("demoTopic","hello,这是一个测试消息!".getBytes(),2,false);return"ok";}}

参考文章:
https://www.cnblogs.com/motion/p/14974024.html
https://blog.csdn.net/u013615903/article/details/131395264

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/goldenfish1919/article/details/136238247
版权归原作者 若鱼1919 所有, 如有侵权,请联系我们删除。

“RabbitMQ开启MQTT协议支持”的评论:

还没有评论