Просмотр исходного кода

先去掉回执发送,修改日志

yf_zn 1 год назад
Родитель
Сommit
5fe5e18140

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/core/task/CmdTaskService.java

@@ -429,7 +429,7 @@ public class CmdTaskService {
         taskResult.setCompleteDate(DateUtils.dateTimeNow());
         //TODO
         // iotsMqService.sendTaskResultMsg(taskResult);
-        TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
+        //TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
 
     }
 
@@ -473,7 +473,7 @@ public class CmdTaskService {
         taskResult.setExtraBody(ext);
         // TODO
         // iotsMqService.sendTaskResultMsg(taskResult);
-        TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
+        //TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
     }
 
     public static long getTimeDifference(String datetime1, String datetime2) {

+ 27 - 16
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -1,5 +1,7 @@
 package com.yunfeiyun.agmp.iots.core.mqtt.network;
 
+import com.yunfeiyun.agmp.common.constant.ErrorCode;
+import com.yunfeiyun.agmp.common.exception.BizException;
 import com.yunfeiyun.agmp.common.utils.ip.IpUtils;
 import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
@@ -34,7 +36,6 @@ public class MqttCore {
     private String connectionId;
 
 
-
     /**
      * 内部维护的发布者
      */
@@ -48,8 +49,8 @@ public class MqttCore {
     /**
      * topic -> device
      */
-    Map<String,String> topicToDevId=new HashMap<>();
-    Map<String,MqttTopicValue> topicValueMap = new HashMap<>();
+    Map<String, String> topicToDevId = new HashMap<>();
+    Map<String, MqttTopicValue> topicValueMap = new HashMap<>();
 
     public String getConnectionId() {
         return connectionId;
@@ -74,7 +75,12 @@ public class MqttCore {
      * @return
      */
     public Device getDevice(String topic) {
-       return SpringUtils.getBean(topicValueMap.get(topic).getServiceName());
+        MqttTopicValue topicValue = topicValueMap.get(topic);
+        if (topicValue == null) {
+            log.info("【解析数据:根据topic获取Device: topic:{}】", topicValue);
+            throw new BizException(ErrorCode.FAILURE.getCode(), topic + "未找到Device");
+        }
+        return SpringUtils.getBean(topicValue.getServiceName());
     }
 
     public String getServiceType() {
@@ -148,7 +154,7 @@ public class MqttCore {
     private void buildClient() throws MqttException {
         //return new MqttClient("tcp://47.96.123.180:1883", UUID.randomUUID().toString().replace("-",""));
         String url = "tcp://" + mqttConfig.getIp() + ":" + mqttConfig.getPort();
-        this.mqttClient = new MqttClient(url, connectionId+ IpUtils.getHostIp(), new MemoryPersistence());
+        this.mqttClient = new MqttClient(url, IpUtils.getHostIp() + ":SAAS:" + connectionId, new MemoryPersistence());
         log.info("【初始化】构建 MQTT clientId {}", this.mqttClient.getClientId());
     }
 
@@ -206,6 +212,7 @@ public class MqttCore {
 
     /**
      * 关闭链接
+     *
      * @throws MqttException
      */
     public void close() throws MqttException {
@@ -213,7 +220,7 @@ public class MqttCore {
             mqttClient.disconnect();
             this.mqttClient.close();
             log.info("【关闭MQTT连接成功】 connectionId: {}", connectionId);
-        }else{
+        } else {
             log.info("【关闭MQTT连接失败:已关闭】 connectionId: {}", connectionId);
         }
 
@@ -221,15 +228,16 @@ public class MqttCore {
 
     /**
      * 订阅。原始类不对外暴漏
+     *
      * @param topics
      * @throws MqttException
      */
-    public void subscribe(String[] topics,MqttConfig cfgYf) throws MqttException {
+    public void subscribe(String[] topics, MqttConfig cfgYf) throws MqttException {
         // 否则向mqttConfig中追加订阅
-        mqttConfig.setSubTopic(Stream.concat(Stream.of(topics),Stream.of(mqttConfig.getSubTopic())).toArray(String[]::new));
-        if(getClient().isConnected()){
+        mqttConfig.setSubTopic(Stream.concat(Stream.of(topics), Stream.of(mqttConfig.getSubTopic())).toArray(String[]::new));
+        if (getClient().isConnected()) {
             // 如果此mqtt已经建立了连接,则正常订阅
-            for(String topic :topics){
+            for (String topic : topics) {
                 getClient().subscribe(topic);
                 log.info("[MQTT] {} 连接已建立 追加订阅主题 {}", cfgYf.getDeviceType(), topic);
             }
@@ -238,24 +246,26 @@ public class MqttCore {
 
     /**
      * 取消订阅,原始类不对外暴漏
+     *
      * @param topics
      * @throws MqttException
      */
     public void unsubscribe(String[] topics) throws MqttException {
         getClient().unsubscribe(topics);
-        for(String topic:topics){
+        for (String topic : topics) {
             topicToDevId.remove(topic);
         }
     }
 
     /**
      * 将topic映射到设备id
+     *
      * @param mqttTopicValues
      */
-    public void bindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues){
-        for(MqttTopicValue mqttTopicValue:mqttTopicValues){
-            topicToDevId.put(mqttTopicValue.getTopic(),mqttTopicValue.getDevId());
-            topicValueMap.put(mqttTopicValue.getTopic(),mqttTopicValue);
+    public void bindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues) {
+        for (MqttTopicValue mqttTopicValue : mqttTopicValues) {
+            topicToDevId.put(mqttTopicValue.getTopic(), mqttTopicValue.getDevId());
+            topicValueMap.put(mqttTopicValue.getTopic(), mqttTopicValue);
         }
 
     }
@@ -267,10 +277,11 @@ public class MqttCore {
 
     /**
      * 根据topic 获取设备id
+     *
      * @param topic
      * @return
      */
-    public String getDevIdByTopic(String topic){
+    public String getDevIdByTopic(String topic) {
         return topicToDevId.get(topic);
     }
 }

+ 12 - 16
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -30,7 +30,7 @@ public class MqttSubscriber implements MqttCallbackExtended {
     @Override
     public void connectionLost(Throwable throwable) {
         // 连接丢失时的处理逻辑
-        log.info("[MQTT] 链接标识:{} URL:{} 连接断开,正在尝试重连...",mqttClient.getClientId(),mqttClient.getServerURI());
+        log.info("[MQTT] 链接标识:{} URL:{} 连接断开,正在尝试重连...", mqttClient.getClientId(), mqttClient.getServerURI());
 
     }
 
@@ -44,9 +44,9 @@ public class MqttSubscriber implements MqttCallbackExtended {
             String msgContent = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
             log.info("【上报数据:收到】收到mqtt消息:" + topic + ", " + msgContent);
             JSONObject obj = null;
-            try{
+            try {
                 obj = JSON.parseObject(msgContent);
-            }catch (Exception e){
+            } catch (Exception e) {
                 log.error("【上报数据:解析异常】收到mqtt消息:" + topic + ", " + msgContent);
                 return;
             }
@@ -56,16 +56,16 @@ public class MqttSubscriber implements MqttCallbackExtended {
                 // 当收到mqtt消息无对应的设备类型时,则丢弃消息。
                 IotException.UNKNOWN_DEVICE.throwException();
             }
-            Object result = device.receiveData(topic, obj,mqttCore.getConnectionId());
+            Object result = device.receiveData(topic, obj, mqttCore.getConnectionId());
             log.info("【上报数据:处理结果】{}", result);
             //如果是“设备属性”消息,进行执行结果检查
             if (device.isDeviceProps(obj)) {
-                SpringUtils.getBean(CmdResultCheckService.class).beginCheck(device.findIotDevice(topic, obj,mqttCore.getConnectionId()), obj);
+                SpringUtils.getBean(CmdResultCheckService.class).beginCheck(device.findIotDevice(topic, obj, mqttCore.getConnectionId()), obj);
             } else {
                 log.error("其它数据");
             }
         } catch (Exception e) {
-            log.error("【接收上报】异常{}",e);
+            log.error("【接收上报】异常{}", e);
         }
     }
 
@@ -85,24 +85,20 @@ public class MqttSubscriber implements MqttCallbackExtended {
     @Override
     public void connectComplete(boolean reconnect, String serverURI) {
         String loggerMsg = "初始化连接成功";
-        if(reconnect){
+        if (reconnect) {
             loggerMsg = "连接断开,重连成功";
         }
-        log.info("[MQTT] clientId :{} {}...{}",this.mqttClient.getClientId() ,loggerMsg, serverURI);
+        log.info("[MQTT] clientId :{} {}...{}", this.mqttClient.getClientId(), loggerMsg, serverURI);
         String[] topics = mqttCore.getSubTopic();
-        if(topics==null||topics.length==0){
-            mqttClient.setCallback(this);
-            log.info("[MQTT] clientId :{} {} 无topic ",this.mqttClient.getClientId(),loggerMsg);
+        mqttClient.setCallback(this);
+        if (topics == null || topics.length == 0) {
+            log.info("[MQTT] clientId :{} {} 无topic ", this.mqttClient.getClientId(), loggerMsg);
             return;
         }
         for (String topic : topics) {
-            //TODO 测试设备乱发离线指令,暂时不订阅它
-            if ("/yfkj/cbd/offline/861551058867599".equals(topic)) {
-                continue;
-            }
             try {
                 mqttClient.subscribe(topic);
-                log.info("[MQTT] clientId :{} {} {} 订阅主题 {}",this.mqttClient.getClientId(),mqttCore.getServiceType(), loggerMsg, topic);
+                log.info("[MQTT] clientId :{} {} 订阅主题 {}", this.mqttClient.getClientId(),  loggerMsg, topic);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }