فهرست منبع

优化设备订阅逻辑

liuyaowen 11 ماه پیش
والد
کامیت
5a00cacdaa

+ 6 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/http/HttpClient.java

@@ -5,6 +5,8 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.reqvo.IotXmznReqVo;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 
+import java.util.List;
+
 /**
  * 对接一个厂商http接口的客户端
  *
@@ -65,6 +67,10 @@ public class HttpClient {
     public void deviceCreateHandle(IotDevice iotDevice) {
 
     }
+    //设备批量创建初始化逻辑,根据需求在具体的httpClient实现中进行重新
+    public void deviceCreateByBatchHandle(List<IotDevice> iotDeviceList) {
+    }
+
 
     public JSONObject getClientConfig() {
         return clientConfig;

+ 66 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -9,6 +9,7 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
+import com.yunfeiyun.agmp.iot.common.service.IotMqttTopicCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
@@ -40,6 +41,8 @@ public class ConnectionManager {
     private DeviceTopicService deviceTopicService;
     @Resource
     private DeviceconnCacheService deviceconnCacheService;
+    @Resource
+    private IotMqttTopicCacheService iotMqttTopicCacheService;
 
     private boolean INIT_SUCCESS = false;
 
@@ -69,6 +72,7 @@ public class ConnectionManager {
             }
         }
         this.INIT_SUCCESS = true;
+        log.info("【初始化】所有协议加载完成");
     }
 
     public boolean initCompleted(){
@@ -170,6 +174,65 @@ public class ConnectionManager {
         }
     }
 
+    public void createDeviceByBatchHandle(IotDeviceconn iotDeviceconn,List<IotDevice> iotDeviceList){
+        iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDeviceconn.getDevconnBid());
+        JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
+        for (Object connItemObject : connItemArray) {
+            JSONObject connItem = JSONObject.from(connItemObject);
+            String type = connItem.getString("type");
+            if (TextUtils.isEmpty(type)) {
+                log.info("【设备批量创建初始化连接】协议加载连接信息中的type 为空:跳过");
+                return;
+            }
+            try {
+                switch (type) {
+                    case "mqtt":
+                        mqttDeviceCreateByBatchHandle(iotDeviceconn,iotDeviceList);
+                        break;
+                    case "modbus-tcp": {
+                        //先不处理,对接到了再梳理
+                        break;
+                    }
+                    case "http": {
+                        httpDeviceCreateByBatchHandle(iotDeviceconn,iotDeviceList);
+                        break;
+                    }
+                    default: {
+                        log.info("【设备批量创建初始化连接】其它类型:{},跳过", type);
+                        return;
+                    }
+                }
+            } catch (Exception e) {
+                log.error("【设备批量创建初始化连接】失败 异常信息:",  e);
+            }
+        }
+    }
+
+    private void httpDeviceCreateByBatchHandle(IotDeviceconn iotDeviceconn, List<IotDevice> iotDeviceList) {
+        httpManager.deviceCreateByBatchHandle(iotDeviceconn,iotDeviceList);
+    }
+
+    private void mqttDeviceCreateByBatchHandle(IotDeviceconn iotDeviceconn,List<IotDevice> iotDeviceList) throws MqttException {
+        IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(iotDeviceconn.getDevtypeBid());
+        String serviceName = iotDeviceDictEnum.getServiceName();
+        List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
+        for(IotDevice iotDevice : iotDeviceList){
+            String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
+            //改设备的所有topics
+            for (String s : topics) {
+                MqttTopicValue mqttTopicValue = new MqttTopicValue();
+                mqttTopicValue.setDevCode(iotDevice.getDevCode());
+                mqttTopicValue.setDevId(iotDevice.getDevBid());
+                mqttTopicValue.setServiceName(IotDeviceDictEnum.findServiceNameByDevTypeBid(iotDevice.getDevtypeBid()));
+                mqttTopicValue.setTopic(s);
+                iotMqttTopicCacheService.addTopicCache(iotDevice.getDevCode(),topics);
+                mqttTopicValues.add(mqttTopicValue);
+            }
+        }
+        String connId = mqttManager.getMqttConnectionId(iotDeviceconn);
+        mqttManager.topicSingleSubscribeDevice(connId, serviceName, mqttTopicValues);
+    }
+
     /**
      * 设备删除时,删除设备连接
      */
@@ -301,6 +364,7 @@ public class ConnectionManager {
         IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(iotDevice.getDevtypeBid());
         String serviceName = iotDeviceDictEnum.getServiceName();
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
+
         List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
         //改设备的所有topics
         for (String s : topics) {
@@ -314,6 +378,7 @@ public class ConnectionManager {
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         String connId = mqttManager.getMqttConnectionId(iotDeviceconn);
         mqttManager.topicSingleSubscribeDevice(connId, serviceName, mqttTopicValues);
+        iotMqttTopicCacheService.addTopicCache(iotDevice.getDevCode(),topics);
     }
 
     private void httpDeviceCreateHandle(IotDevice iotDevice) {
@@ -343,6 +408,7 @@ public class ConnectionManager {
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByIotDevice(iotDevice);
         String connId = mqttManager.getMqttConnectionId(iotDeviceconn);
         mqttManager.topicBatchUnSubscribeDevices(connId, serviceName, mqttTopicValues);
+        iotMqttTopicCacheService.removeTopicCache(iotDevice.getDevCode());
     }
 
     private void httpDeleteDeviceHandle(IotDevice iotDevice) {

+ 15 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/manager/HttpManager.java

@@ -19,6 +19,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -103,9 +104,17 @@ public class HttpManager {
     public HttpClient getHttpClientByDevice(IotDevice iotDevice){
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         if(IotDeviceconnTypeEnum.COMMON.getCode().equals(iotDeviceconn.getDevconnType())){
-            return commonHttpClientByTypeBid.get(iotDevice.getDevtypeBid());
+            return commonHttpClientByTypeBid.get(iotDeviceconn.getDevtypeBid());
         }else {
-            return privateHttpClientByConnBid.get(iotDevice.getDevconnBid());
+            return privateHttpClientByConnBid.get(iotDeviceconn.getDevconnBid());
+        }
+    }
+    public HttpClient getHttpClientByDevice(String devConnBid){
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(devConnBid);
+        if(IotDeviceconnTypeEnum.COMMON.getCode().equals(iotDeviceconn.getDevconnType())){
+            return commonHttpClientByTypeBid.get(iotDeviceconn.getDevtypeBid());
+        }else {
+            return privateHttpClientByConnBid.get(iotDeviceconn.getDevconnBid());
         }
     }
 
@@ -128,4 +137,8 @@ public class HttpManager {
     }
 
 
+    public void deviceCreateByBatchHandle(IotDeviceconn iotDeviceconn, List<IotDevice> iotDeviceList) {
+        HttpClient httpClient = getHttpClientByDevice(iotDeviceconn.getDevconnBid());
+        httpClient.deviceCreateByBatchHandle(iotDeviceList);
+    }
 }

+ 3 - 31
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -102,13 +102,15 @@ public class MqttManager {
                     for (String s : topics) {
                         MqttTopicValue mqttTopicValue = new MqttTopicValue();
                         mqttTopicValue.setDevCode(iotDevice.getDevCode());
-                        mqttTopicValue.setServiceName(IotDeviceDictEnum.findServiceNameByDevTypeBid(iotDevice.getDevtypeBid()));
+                        mqttTopicValue.setServiceName(IotDeviceDictEnum.findServiceNameByDevTypeBid(iotDeviceconnResVo.getDevtypeBid()));
                         mqttTopicValue.setDevId(iotDevice.getDevBid());
                         mqttTopicValue.setTopic(s);
                         mqttTopicValues.add(mqttTopicValue);
                     }
                 }
+                iotMqttTopicCacheService.addTopicCache(iotDevice.getDevCode(),topics);
             }
+            mqttTopicValues.toArray();
             String[] topics = new String[mqttTopicValues.size()];
             if (!mqttTopicValues.isEmpty()) {
                 for (int i = 0; i < mqttTopicValues.size(); i++) {
@@ -130,9 +132,6 @@ public class MqttManager {
                 mqttCore.buildMqttCore(cfgYf);
                 addConnectionMap(connectionId, mqttCore);
             }
-
-            refreshTopicCache();
-
             log.info("【开始构建MQTT连接】构建完成 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
         } catch (Exception e) {
             log.error("【构建MqttCore失败】 异常信息: {} ,{}", e.getMessage(), e);
@@ -178,7 +177,6 @@ public class MqttManager {
             } finally {
                 // 从map中移除该MqttCore
                 mqttCoreMap.remove(connectionId);
-                refreshTopicCache();
                 log.info("【从映射中移除MQTT连接】 connectionId: {}", connectionId);
             }
         } else {
@@ -436,9 +434,6 @@ public class MqttManager {
         log.info("【开始单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValues);
         // 调用批量订阅方法
         topicBatchSubscribeDevices(connectionId, serviceName, mqttTopicValues);
-
-        refreshTopicCache();
-
         log.info("【完成单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValues);
     }
 
@@ -453,7 +448,6 @@ public class MqttManager {
      */
     public void topicBatchUnSubscribeDevices(String connectionId, String serviceName, List<MqttTopicValue> mqttTopicValues) throws MqttException {
         log.info("【开始批量取消订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, mqttTopicValues);
-
         // 获取批量取消订阅的主题
         String[] topics = deviceTopicService.getBatchTopic(serviceName, mqttTopicValues);
         log.info("【获取批量取消订阅主题】 topics: {}", Arrays.toString(topics));
@@ -470,9 +464,6 @@ public class MqttManager {
             log.error("【批量取消订阅失败】 connectionId: {}, serviceName: {}, topics: {}, 异常信息: {}", connectionId, serviceName, Arrays.toString(topics), e.getMessage(), e);
             throw e;
         }
-
-        refreshTopicCache();
-
         log.info("【完成批量取消订阅】 connectionId: {}, serviceName: {}, topics: {}", connectionId, serviceName, Arrays.toString(topics));
     }
 
@@ -564,23 +555,4 @@ public class MqttManager {
         return null;
     }
 
-
-    public void updateCommonConnection(TosDevicetype tosDevicetype, JSONArray jsonConfig) {
-
-
-    }
-
-    private void refreshTopicCache() {
-        if (true) {
-            //数据量太大,先关闭校验,待优化时候打开
-            return;
-        }
-        Set<String> topicSet = new HashSet<>();
-        for (Map.Entry<String, MqttCore> entry : mqttCoreMap.entrySet()) {
-            MqttCore mqttCore = entry.getValue();
-            Map<String, String> topicToDevId = mqttCore.getTopicToDevId();
-            topicSet.addAll(topicToDevId.keySet());
-        }
-        iotMqttTopicCacheService.refreshTopicCache(topicSet);
-    }
 }

+ 10 - 10
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -237,12 +237,16 @@ public class MqttCore {
     public void subscribe(String[] topics, MqttConfig cfgYf) throws MqttException {
         // 否则向mqttConfig中追加订阅
         mqttConfig.setSubTopic(Stream.concat(Stream.of(topics), Stream.of(mqttConfig.getSubTopic())).toArray(String[]::new));
-        if (getClient().isConnected()) {
+        if(!getClient().isConnected()){
+            log.info("[MQTT] {} 连接还为连接,主题会在连接成功后订阅",cfgYf.getDeviceType());
+            return;
+        }
+        if (topics.length>0) {
             // 如果此mqtt已经建立了连接,则正常订阅
-            for (String topic : topics) {
-                getClient().subscribe(topic);
-                log.info("[MQTT] {} 连接已建立 追加订阅主题 {}", cfgYf.getDeviceType(), topic);
-            }
+            getClient().subscribe(topics);
+            log.info("[MQTT] {} 连接已建立 追加订阅主题完成", cfgYf.getDeviceType());
+        }else {
+            log.info("[MQTT] {} 连接已建立 无主题需要订阅",cfgYf.getDeviceType());
         }
     }
 
@@ -271,11 +275,7 @@ public class MqttCore {
         }
 
     }
-//    public void unBindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues){
-//        for(MqttTopicValue mqttTopicValue:mqttTopicValues){
-//            topicToDevId.remove(mqttTopicValue.getTopic());
-//        }
-//    }
+
 
     /**
      * 根据topic 获取设备id

+ 2 - 1
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -65,7 +65,8 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
                 switch (iotActionEnums) {
                     //设备创建
                     case IOT_DEVICE_CREATE:
-                        connectionManager.createDeviceHandle(synGlobalTenantInfoDto.getData().to(IotDevice.class));
+                        JSONObject param = synGlobalTenantInfoDto.getData();
+                        connectionManager.createDeviceByBatchHandle(param.getObject("iotDeviceconn",IotDeviceconn.class),param.getList("bodyItem",IotDevice.class));
                         break;
                     //设备更新
                     case IOT_DEVICE_UPDATE:

+ 6 - 0
src/main/java/com/yunfeiyun/agmp/iots/task/IotStatusService.java

@@ -16,6 +16,7 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 import java.util.*;
 
 /**
@@ -36,6 +37,8 @@ public class IotStatusService {
 
     @Autowired
     IIotDeviceconfigService iIotDeviceconfigService;
+    @Resource
+    private ConnectionManager connectionManager;
 
     @PostConstruct
     void init() {
@@ -74,6 +77,9 @@ public class IotStatusService {
     //@Scheduled(cron = "0 0 */1 * * ?")
     @Scheduled(cron = "0 0/20 * * * ? ")
     public void validateStatusByDevType() {
+        if(!connectionManager.initCompleted()){
+            return;
+        }
         Iterator<String> iterator = validateDeviceType.iterator();
         while (iterator.hasNext()) {
             String type = iterator.next();