JAVA开发过程中使用mqtt消息订阅和发布时会出现客户机未连接的报错,只要在订阅和发布的方法里添加创建链接就可以
/**
- 连接 MQTT
*/
public void connect() {
try {
System.err.println("clientid----------"+clientId);
client = new MqttClient(hostUrl, clientId, new MemoryPersistence());
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 设置超时时间,单位秒
connOpts.setConnectionTimeout(timeout);
// 设置心跳时间,单位秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
connOpts.setKeepAliveInterval(keepAlive);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
client.connect(connOpts);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
/**
订阅
@param topic 主题
*/
public void subscribe(String topic) {
try {
client.subscribe(topic, qos);
} catch (MqttException me) {
me.printStackTrace();
connect();
}
}
/**
消息发布
@param topic 主题
@param data 消息
*/
public void publish(String topic, String data) {
try {
MqttMessage message = new MqttMessage(data.getBytes());
message.setQos(1); // 消息服务质量等级
message.setRetained(true); // 保留消息
client.publish(topic, message);
} catch (MqttException me) {
me.printStackTrace();
connect();
}
}
/**
- 断开连接
*/
public void disconnect() {
try {
client.disconnect();
client.close();
} catch (MqttException me) {
me.printStackTrace();
}
}
版权归原作者 小白白691934404 所有, 如有侵权,请联系我们删除。