0


Android原生 MQTT开发

一、引包

1.1.模块的build.gradle

  1. //mqtt框架
  2. implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
  3. implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
  4. implementation 'org.bouncycastle:bcpkix-jdk15on:1.59'

1.2 旧版AndroidStudio开发工具在项目的guild.gradle引入

  1. maven {
  2. url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
  3. }

1.3 新版则在setting.gradle引入

  1. maven {
  2. url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
  3. }

二、工具类封装

  1. /**
  2. * mqtt
  3. * 服务
  4. */
  5. public class XyMqttService extends Service {
  6. public final static String TAG = "数据处理";
  7. public static MqttAndroidClient mqttAndroidClient;
  8. private static MqttConnectOptions mMqttConnectOptions;
  9. //wss://z66811d5.ala.cn-hangzhou.emqxsl.cn:8084/mqtt
  10. public static String HOST = "tcp://www.....:1883";//服务器地址(协议+地址+端口号)
  11. // public static String HOST = "tcp://.........n:8883";//服务器地址(协议+地址+端口号)
  12. public String USERNAME = "xiaoya";//用户名
  13. public String PASSWORD = "xiaoya";//密码
  14. public static String PUBLISH_TOPIC = "xiaoya/video/pull/1";//发布主题
  15. public static String RESPONSE_TOPIC = "xiaoya/video/1111";//响应主题
  16. public String CLIENTID = "safffadasaafqedq2";//设备唯一标识
  17. /**
  18. * 订阅主题:
  19. * 小雅视频拉取:xiaoya/video/pull/+
  20. * 小雅视频通话:xiaoya/video/chat/+
  21. */
  22. @Override
  23. public int onStartCommand(Intent intent, int flags, int startId) {
  24. init();
  25. return START_NOT_STICKY;//非粘性的 service强制杀死后,不会尝试重新启动service
  26. }
  27. @Nullable
  28. @Override
  29. public IBinder onBind(Intent intent) {
  30. return null;
  31. }
  32. /**
  33. * 开启服务
  34. */
  35. public static void startService(Context mContext) {
  36. mContext.startService(new Intent(mContext, XyMqttService.class));
  37. }
  38. /**
  39. * 发布 (模拟其他客户端发布消息)
  40. *
  41. * @param message 消息
  42. */
  43. public static void publish(String message) {
  44. String topic = PUBLISH_TOPIC;
  45. Integer qos = 1;
  46. Boolean retained = false;
  47. try {
  48. //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
  49. mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
  50. } catch (MqttException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. /**
  55. * 响应 (收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等)
  56. *
  57. * @param message 消息
  58. */
  59. public static void response(String message) {
  60. String topic = RESPONSE_TOPIC;
  61. Integer qos = 1;
  62. Boolean retained = false;
  63. try {
  64. //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
  65. mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
  66. } catch (MqttException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. /**
  71. * 初始化
  72. */
  73. private void init() {
  74. String serverURI = HOST; //服务器地址(协议+地址+端口号)
  75. Logger.d("==初始化MQ:" + serverURI);
  76. if (mqttAndroidClient == null) {
  77. mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
  78. mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
  79. }
  80. if (mMqttConnectOptions == null) {
  81. mMqttConnectOptions = new MqttConnectOptions();
  82. try {
  83. // InputStream caCrtFileI = getResources().openRawResource(R.raw.ca);
  84. // mMqttConnectOptions.setSocketFactory(getSingleSocketFactory(caCrtFileI));
  85. mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
  86. mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
  87. mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
  88. mMqttConnectOptions.setUserName(USERNAME); //设置用户名
  89. mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码
  90. } catch (Exception e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. // last will message
  95. boolean doConnect = true;
  96. String message = "{\"status\":\"" + CLIENTID + "\"}";
  97. String topic = PUBLISH_TOPIC;
  98. Integer qos = 1;
  99. Boolean retained = true;
  100. if ((!message.equals("")) || (!topic.equals(""))) {
  101. // 最后的遗嘱
  102. try {
  103. mMqttConnectOptions.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
  104. } catch (Exception e) {
  105. Logger.i("==Exception Occured==");
  106. doConnect = false;
  107. iMqttActionListener.onFailure(null, e);
  108. }
  109. }
  110. if (doConnect) {
  111. doClientConnection();
  112. }
  113. }
  114. /**
  115. * 连接MQTT服务器
  116. */
  117. private static void doClientConnection() {
  118. try {
  119. if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
  120. Logger.d("====连接MQTT服务器===" + HOST);
  121. mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
  122. }
  123. } catch (Exception e) {
  124. e.printStackTrace();
  125. }
  126. }
  127. /**
  128. * 判断网络是否连接
  129. */
  130. private static boolean isConnectIsNomarl() {
  131. ConnectivityManager connectivityManager = (ConnectivityManager) XiaoYaApp.getContext().getSystemService(Context.CONNECTIVITY_SERVICE);
  132. NetworkInfo info = connectivityManager.getActiveNetworkInfo();
  133. if (info != null && info.isAvailable()) {
  134. String name = info.getTypeName();
  135. Logger.d("===当前网络名称:" + name);
  136. return true;
  137. } else {
  138. Logger.d("===没有可用网络===");
  139. /*没有可用网络的时候,延迟3秒再尝试重连*/
  140. new Handler().postDelayed(new Runnable() {
  141. @Override
  142. public void run() {
  143. Logger.d("===没有可用网络doClientConnection===");
  144. doClientConnection();
  145. }
  146. }, 3000);
  147. return false;
  148. }
  149. }
  150. //MQTT是否连接成功的监听
  151. private static IMqttActionListener iMqttActionListener = new IMqttActionListener() {
  152. @Override
  153. public void onSuccess(IMqttToken arg0) {
  154. Logger.d("==mqtt连接成功 " + HOST);
  155. try {
  156. if (mqttAndroidClient != null) {
  157. mqttAndroidClient.subscribe(PUBLISH_TOPIC, 1);//订阅主题,参数:主题、服务质量
  158. }
  159. } catch (Exception e) {
  160. e.printStackTrace();
  161. }
  162. }
  163. @Override
  164. public void onFailure(IMqttToken arg0, Throwable arg1) {
  165. arg1.printStackTrace();
  166. Logger.d("==mqtt连接失败 ==" + arg1);
  167. doClientConnection();//连接失败,重连(可关闭服务器进行模拟)
  168. }
  169. };
  170. //订阅主题的回调
  171. private MqttCallback mqttCallback = new MqttCallback() {
  172. @Override
  173. public void messageArrived(String topic, MqttMessage msgStr) throws Exception {
  174. try {
  175. String enCodeMsg = new String(msgStr.getPayload());
  176. Logger.d("==收到消息: " + enCodeMsg);
  177. if (enCodeMsg.contains("请求视频推流")) {
  178. initLiveCamera();
  179. } else if (enCodeMsg.contains("退出视频推流")) {
  180. deStroyLive();
  181. }
  182. //收到消息,这里弹出Toast表示。如果需要更新UI,可以使用广播或者EventBus进行发送
  183. //收到其他客户端的消息后,响应给对方告知消息已到达或者消息有问题等
  184. // response("message arrived");
  185. } catch (Exception e) {
  186. e.printStackTrace();
  187. }
  188. }
  189. @Override
  190. public void deliveryComplete(IMqttDeliveryToken arg0) {
  191. }
  192. @Override
  193. public void connectionLost(Throwable arg0) {
  194. Logger.d("==连接断开 ");
  195. // doClientConnection();//连接断开,重连
  196. }
  197. };
  198. public static void disconnect(Context context) {
  199. try {
  200. if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
  201. mqttAndroidClient.unsubscribe(PUBLISH_TOPIC);
  202. mqttAndroidClient.unregisterResources();
  203. mqttAndroidClient.disconnect(0); //断开连接
  204. mqttAndroidClient = null;
  205. context.stopService(new Intent(context, XyMqttService.class));
  206. ZegoExpressEngine.destroyEngine(new IZegoDestroyCompletionCallback() {
  207. @Override
  208. public void onDestroyCompletion() {
  209. //销毁成功
  210. }
  211. });
  212. }
  213. } catch (Exception e) {
  214. e.printStackTrace();
  215. }
  216. }
  217. /**
  218. * 单向
  219. * 认证
  220. */
  221. public static SSLSocketFactory getSingleSocketFactory(InputStream caCrtFileInputStream) throws Exception {
  222. Security.addProvider(new BouncyCastleProvider());
  223. X509Certificate caCert = null;
  224. BufferedInputStream bis = new BufferedInputStream(caCrtFileInputStream);
  225. CertificateFactory cf = CertificateFactory.getInstance("X.509");
  226. while (bis.available() > 0) {
  227. caCert = (X509Certificate) cf.generateCertificate(bis);
  228. }
  229. KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
  230. caKs.load(null, null);
  231. caKs.setCertificateEntry("cert-certificate", caCert);
  232. TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
  233. tmf.init(caKs);
  234. SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
  235. sslContext.init(null, tmf.getTrustManagers(), null);
  236. return sslContext.getSocketFactory();
  237. }
  238. }
标签: android

本文转载自: https://blog.csdn.net/shi450561200/article/details/134846646
版权归原作者 人民的石头 所有, 如有侵权,请联系我们删除。

“Android原生 MQTT开发”的评论:

还没有评论