Przeglądaj źródła

调整设备新增,编辑,删除重新初始化连接逻辑

liuyaowen 1 rok temu
rodzic
commit
881d855b40

+ 32 - 75
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -6,6 +6,7 @@ import com.yunfeiyun.agmp.common.utils.JSONUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
@@ -44,6 +45,7 @@ public class ConnectionManager {
     private DeviceTopicService deviceTopicService;
     @Resource
     private DeviceconnCacheService deviceconnCacheService;
+
     /**
      * 加载数据库设备配置
      *
@@ -53,7 +55,7 @@ public class ConnectionManager {
     public void init() throws MqttException {
         //先把所有型号查出来
         List<IotDeviceconnResVo> iotDeviceConnResVoList = businessCoreService.selectTosDevicetypeResVoList();
-        log.info("【初始化】设备型号 构建链接 协议: {}个",iotDeviceConnResVoList.size());
+        log.info("【初始化】设备型号 构建链接 协议: {}个", iotDeviceConnResVoList.size());
         for (IotDeviceconnResVo iotDeviceconnResVo : iotDeviceConnResVoList) {
             log.info("【初始化】【开始】协议加载,厂家:{},类型:{} ,配置:{}", iotDeviceconnResVo.getFirmName(), iotDeviceconnResVo.getDevtypeBid(), iotDeviceconnResVo.getDevconnConfig());
             //将配置信息转换成jsonObject,这是个数组
@@ -122,17 +124,17 @@ public class ConnectionManager {
         }
         JSONArray jsonConfig = null;
         try {
-            jsonConfig =JSONArray.parseArray(config);
+            jsonConfig = JSONArray.parseArray(config);
         } catch (Exception e) {
             log.error("【初始化】解析配置文件错误: \n" + config + "\n" + e);
         }
         return jsonConfig;
     }
 
-    public void createDeviceHandle(IotDevice iotDevice) throws Exception{
+    public void createDeviceHandle(IotDevice iotDevice) throws Exception {
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
-        for(Object connItemObject : connItemArray){
+        for (Object connItemObject : connItemArray) {
             JSONObject connItem = JSONObject.from(connItemObject);
             String type = connItem.getString("type");
             if (TextUtils.isEmpty(type)) {
@@ -153,7 +155,7 @@ public class ConnectionManager {
                         break;
                     }
                     default: {
-                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(),type);
+                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(), type);
                         return;
                     }
                 }
@@ -163,28 +165,10 @@ public class ConnectionManager {
         }
     }
 
-    private void mqttDeviceCreateHandle(IotDevice iotDevice) throws Exception {
-        String serviceName=typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
-        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
-        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
-        //改设备的所有topics
-        for(String s:topics){
-            MqttTopicValue mqttTopicValue=new MqttTopicValue();
-            mqttTopicValue.setDevCode(iotDevice.getDevCode());
-            mqttTopicValue.setDevId(iotDevice.getDevBid());
-            mqttTopicValue.setTopic(s);
-            mqttTopicValues.add(mqttTopicValue);
-        }
-        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
-    }
-
-    private void httpDeviceCreateHandle(IotDevice iotDevice) {
-        httpManager.deviceCreateHandle(iotDevice);
-    }
-    public void editDeviceHandle(IotDevice iotDevice){
+    public void deleteDeviceHandle(IotDevice iotDevice) {
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
-        for(Object connItemObject : connItemArray){
+        for (Object connItemObject : connItemArray) {
             JSONObject connItem = JSONObject.from(connItemObject);
             String type = connItem.getString("type");
             if (TextUtils.isEmpty(type)) {
@@ -194,18 +178,18 @@ public class ConnectionManager {
             try {
                 switch (type) {
                     case "mqtt":
-                        mqttEditDeviceHandle(iotDevice);
+                        mqttDeleteDeviceHandle(iotDevice);
                         break;
                     case "modbus-tcp": {
                         //先不处理,对接到了再梳理
                         break;
                     }
                     case "http": {
-                        httpEditDeviceHandle(iotDevice);
+                        httpDeleteDeviceHandle(iotDevice);
                         break;
                     }
                     default: {
-                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(),type);
+                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(), type);
                         return;
                     }
                 }
@@ -214,71 +198,44 @@ public class ConnectionManager {
             }
         }
     }
-    private void mqttEditDeviceHandle(IotDevice iotDevice) throws Exception {
-        String serviceName=typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
-        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
-        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
+
+    public void editDeviceHandle(IotDeviceEditMqModel iotDeviceEditMqModel) throws Exception {
+        deleteDeviceHandle(iotDeviceEditMqModel.getOldIotDevice());
+        createDeviceHandle(iotDeviceEditMqModel.getNewIotDevice());
+    }
+
+    private void mqttDeviceCreateHandle(IotDevice iotDevice) throws Exception {
+        String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
+        List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
         //改设备的所有topics
-        for(String s:topics){
-            MqttTopicValue mqttTopicValue=new MqttTopicValue();
+        for (String s : topics) {
+            MqttTopicValue mqttTopicValue = new MqttTopicValue();
             mqttTopicValue.setDevCode(iotDevice.getDevCode());
             mqttTopicValue.setDevId(iotDevice.getDevBid());
             mqttTopicValue.setTopic(s);
             mqttTopicValues.add(mqttTopicValue);
         }
-        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
+        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(), serviceName, mqttTopicValues);
     }
 
-    private void httpEditDeviceHandle(IotDevice iotDevice) {
+    private void httpDeviceCreateHandle(IotDevice iotDevice) {
         httpManager.deviceCreateHandle(iotDevice);
     }
 
-    public void deleteDeviceHandle(IotDevice iotDevice){
-        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
-        JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
-        for(Object connItemObject : connItemArray){
-            JSONObject connItem = JSONObject.from(connItemObject);
-            String type = connItem.getString("type");
-            if (TextUtils.isEmpty(type)) {
-                log.info("【设备创建初始化连接】协议加载 设备:{} 的连接信息中的type 为空:跳过", iotDevice.getDevCode());
-                return;
-            }
-            try {
-                switch (type) {
-                    case "mqtt":
-                        mqttDeleteDeviceHandle(iotDevice);
-                        break;
-                    case "modbus-tcp": {
-                        //先不处理,对接到了再梳理
-                        break;
-                    }
-                    case "http": {
-                        httpDeleteDeviceHandle(iotDevice);
-                        break;
-                    }
-                    default: {
-                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(),type);
-                        return;
-                    }
-                }
-            } catch (Exception e) {
-                log.error("【设备:{} 创建初始化连接】失败 异常信息:", iotDevice.getDevCode(), e);
-            }
-        }
-    }
     private void mqttDeleteDeviceHandle(IotDevice iotDevice) throws Exception {
-        String serviceName=typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
-        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
-        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
+        String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
+        List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
         //改设备的所有topics
-        for(String s:topics){
-            MqttTopicValue mqttTopicValue=new MqttTopicValue();
+        for (String s : topics) {
+            MqttTopicValue mqttTopicValue = new MqttTopicValue();
             mqttTopicValue.setDevCode(iotDevice.getDevCode());
             mqttTopicValue.setDevId(iotDevice.getDevBid());
             mqttTopicValue.setTopic(s);
             mqttTopicValues.add(mqttTopicValue);
         }
-        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
+        mqttManager.topicBatchUnSubscribeDevices(iotDevice.getDevconnBid(), serviceName, mqttTopicValues);
     }
 
     private void httpDeleteDeviceHandle(IotDevice iotDevice) {

+ 2 - 1
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -8,6 +8,7 @@ import com.yunfeiyun.agmp.iot.common.constant.mq.IotActionEnums;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
+import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
 import com.yunfeiyun.agmp.iots.core.manager.ConnectionManager;
@@ -68,7 +69,7 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
                         break;
                     //设备更新
                     case IOT_DEVICE_UPDATE:
-                        connectionManager.editDeviceHandle(synGlobalTenantInfoDto.getData().to(IotDevice.class));
+                        connectionManager.editDeviceHandle(synGlobalTenantInfoDto.getData().to(IotDeviceEditMqModel.class));
                         break;
                     //设备删除
                     case IOT_DEVICE_DELETE: