Kaynağa Gözat

调整mqtt重连机制

liuyaowen 1 yıl önce
ebeveyn
işleme
26dbb677a5

+ 22 - 18
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -90,24 +90,28 @@ public class MqttSubscriber implements MqttCallbackExtended {
      */
     @Override
     public void connectComplete(boolean reconnect, String serverURI) {
-        String loggerMsg = "初始化连接成功";
-        if (reconnect) {
-            loggerMsg = "连接断开,重连成功";
-        }
-        log.info("[MQTT] clientId :{} {}...{}", this.mqttClient.getClientId(), loggerMsg, serverURI);
-        String[] topics = mqttCore.getSubTopic();
-        mqttClient.setCallback(this);
-        if (topics == null || topics.length == 0) {
-            log.info("[MQTT] clientId :{} {} 无topic ", this.mqttClient.getClientId(), loggerMsg);
-            return;
-        }
-        for (String topic : topics) {
-            try {
-                mqttClient.subscribe(topic);
-                log.info("[MQTT] clientId :{} {} 订阅主题 {}", this.mqttClient.getClientId(),  loggerMsg, topic);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+        MqttSubscriber mqttSubscriber = this;
+        MqttClient thisMqttClient = this.mqttClient;
+        CompletableFuture.runAsync(() -> {
+            String loggerMsg = "初始化连接成功";
+            if (reconnect) {
+                loggerMsg = "连接断开,重连成功";
+            }
+            log.info("[MQTT] clientId :{} {}...{}", thisMqttClient.getClientId(), loggerMsg, serverURI);
+            String[] topics = mqttCore.getSubTopic();
+            thisMqttClient.setCallback(mqttSubscriber);
+            if (topics == null || topics.length == 0) {
+                log.info("[MQTT] clientId :{} {} 无topic ", thisMqttClient.getClientId(), loggerMsg);
+                return;
+            }
+            for (String topic : topics) {
+                try {
+                    thisMqttClient.subscribe(topic);
+                    log.info("[MQTT] clientId :{} {} 订阅主题 {}", thisMqttClient.getClientId(),  loggerMsg, topic);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
             }
-        }
+        });
     }
 }