Prechádzať zdrojové kódy

调整设备创建,编辑,删除重新订阅mqtt逻辑

liuyaowen 1 rok pred
rodič
commit
67f9e83143

+ 8 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/http/HttpClient.java

@@ -1,6 +1,7 @@
 package com.yunfeiyun.agmp.iots.core.http;
 
 import com.alibaba.fastjson2.JSONObject;
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.reqvo.IotXmznReqVo;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 
@@ -12,6 +13,8 @@ import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
  */
 public class HttpClient {
 
+
+
     /**
      * 响应结果回调
      */
@@ -56,7 +59,12 @@ public class HttpClient {
         this.iotDeviceconnResVo = iotDeviceconnResVo;
         this.clientConfig = configJson;
     }
+    /**
+     * 设备创建初始化逻辑,根据需求在具体的httpClient实现中进行重新
+     * */
+    public void deviceCreateHandle(IotDevice iotDevice) {
 
+    }
     /**
      * 弃用
      */

+ 171 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -4,8 +4,15 @@ import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.utils.JSONUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
+import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.common.modal.TosDevicetypeResVo;
+import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
+import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
+import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.service.BusinessCoreService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.util.TextUtils;
@@ -14,6 +21,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.RegEx;
+import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.List;
 
 @Component
@@ -28,7 +38,12 @@ public class ConnectionManager {
 
     @Autowired
     BusinessCoreService businessCoreService;
-
+    @Resource
+    private TypeCacheService typeCacheService;
+    @Resource
+    private DeviceTopicService deviceTopicService;
+    @Resource
+    private DeviceconnCacheService deviceconnCacheService;
     /**
      * 加载数据库设备配置
      *
@@ -114,5 +129,160 @@ public class ConnectionManager {
         return jsonConfig;
     }
 
+    public void createDeviceHandle(IotDevice iotDevice) throws Exception{
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
+        JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
+        for(Object connItemObject : connItemArray){
+            JSONObject connItem = JSONObject.from(connItemObject);
+            String type = connItem.getString("type");
+            if (TextUtils.isEmpty(type)) {
+                log.info("【设备创建初始化连接】协议加载 设备:{} 的连接信息中的type 为空:跳过", iotDevice.getDevCode());
+                return;
+            }
+            try {
+                switch (type) {
+                    case "mqtt":
+                        mqttDeviceCreateHandle(iotDevice);
+                        break;
+                    case "modbus-tcp": {
+                        //先不处理,对接到了再梳理
+                        break;
+                    }
+                    case "http": {
+                        httpDeviceCreateHandle(iotDevice);
+                        break;
+                    }
+                    default: {
+                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(),type);
+                        return;
+                    }
+                }
+            } catch (Exception e) {
+                log.error("【设备:{} 创建初始化连接】失败 异常信息:", iotDevice.getDevCode(), e);
+            }
+        }
+    }
+
+    private void mqttDeviceCreateHandle(IotDevice iotDevice) throws Exception {
+        String serviceName=typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
+        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
+        //改设备的所有topics
+        for(String s:topics){
+            MqttTopicValue mqttTopicValue=new MqttTopicValue();
+            mqttTopicValue.setDevCode(iotDevice.getDevCode());
+            mqttTopicValue.setDevId(iotDevice.getDevBid());
+            mqttTopicValue.setTopic(s);
+            mqttTopicValues.add(mqttTopicValue);
+        }
+        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
+    }
+
+    private void httpDeviceCreateHandle(IotDevice iotDevice) {
+        httpManager.deviceCreateHandle(iotDevice);
+    }
+    public void editDeviceHandle(IotDevice iotDevice){
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
+        JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
+        for(Object connItemObject : connItemArray){
+            JSONObject connItem = JSONObject.from(connItemObject);
+            String type = connItem.getString("type");
+            if (TextUtils.isEmpty(type)) {
+                log.info("【设备创建初始化连接】协议加载 设备:{} 的连接信息中的type 为空:跳过", iotDevice.getDevCode());
+                return;
+            }
+            try {
+                switch (type) {
+                    case "mqtt":
+                        mqttEditDeviceHandle(iotDevice);
+                        break;
+                    case "modbus-tcp": {
+                        //先不处理,对接到了再梳理
+                        break;
+                    }
+                    case "http": {
+                        httpEditDeviceHandle(iotDevice);
+                        break;
+                    }
+                    default: {
+                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(),type);
+                        return;
+                    }
+                }
+            } catch (Exception e) {
+                log.error("【设备:{} 创建初始化连接】失败 异常信息:", iotDevice.getDevCode(), e);
+            }
+        }
+    }
+    private void mqttEditDeviceHandle(IotDevice iotDevice) throws Exception {
+        String serviceName=typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
+        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
+        //改设备的所有topics
+        for(String s:topics){
+            MqttTopicValue mqttTopicValue=new MqttTopicValue();
+            mqttTopicValue.setDevCode(iotDevice.getDevCode());
+            mqttTopicValue.setDevId(iotDevice.getDevBid());
+            mqttTopicValue.setTopic(s);
+            mqttTopicValues.add(mqttTopicValue);
+        }
+        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
+    }
+
+    private void httpEditDeviceHandle(IotDevice iotDevice) {
+        httpManager.deviceCreateHandle(iotDevice);
+    }
+
+    public void deleteDeviceHandle(IotDevice iotDevice){
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
+        JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
+        for(Object connItemObject : connItemArray){
+            JSONObject connItem = JSONObject.from(connItemObject);
+            String type = connItem.getString("type");
+            if (TextUtils.isEmpty(type)) {
+                log.info("【设备创建初始化连接】协议加载 设备:{} 的连接信息中的type 为空:跳过", iotDevice.getDevCode());
+                return;
+            }
+            try {
+                switch (type) {
+                    case "mqtt":
+                        mqttDeleteDeviceHandle(iotDevice);
+                        break;
+                    case "modbus-tcp": {
+                        //先不处理,对接到了再梳理
+                        break;
+                    }
+                    case "http": {
+                        httpDeleteDeviceHandle(iotDevice);
+                        break;
+                    }
+                    default: {
+                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(),type);
+                        return;
+                    }
+                }
+            } catch (Exception e) {
+                log.error("【设备:{} 创建初始化连接】失败 异常信息:", iotDevice.getDevCode(), e);
+            }
+        }
+    }
+    private void mqttDeleteDeviceHandle(IotDevice iotDevice) throws Exception {
+        String serviceName=typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
+        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
+        //改设备的所有topics
+        for(String s:topics){
+            MqttTopicValue mqttTopicValue=new MqttTopicValue();
+            mqttTopicValue.setDevCode(iotDevice.getDevCode());
+            mqttTopicValue.setDevId(iotDevice.getDevBid());
+            mqttTopicValue.setTopic(s);
+            mqttTopicValues.add(mqttTopicValue);
+        }
+        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
+    }
+
+    private void httpDeleteDeviceHandle(IotDevice iotDevice) {
+        httpManager.deviceCreateHandle(iotDevice);
+    }
 }
 

+ 5 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/manager/HttpManager.java

@@ -103,4 +103,9 @@ public class HttpManager {
             return privateHttpClientByConnBid.get(iotDevice.getDevconnBid());
         }
     }
+
+    public void deviceCreateHandle(IotDevice iotDevice) {
+        HttpClient httpClient = getHttpClientByDevice(iotDevice);
+        httpClient.deviceCreateHandle(iotDevice);
+    }
 }

+ 7 - 22
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -10,6 +10,7 @@ import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
+import com.yunfeiyun.agmp.iots.core.manager.ConnectionManager;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
@@ -21,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -44,6 +46,8 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
     private TypeCacheService cacheService;
     @Autowired
     private DeviceTopicService deviceTopicService;
+    @Resource
+    private ConnectionManager connectionManager;
 
     @Override
     public void onMessage(Message message, Channel channel) throws Exception {
@@ -58,36 +62,17 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
             IotActionEnums iotActionEnums = IotActionEnums.getAction(action);
             if (iotActionEnums != null) {
                 switch (iotActionEnums) {
-                    //控制指令
-                    case CMD_TASK:
-                        CmdGroupModel cmdGroupModel = synGlobalTenantInfoDto.getData().to(CmdGroupModel.class);
-                        cmdDispatcherService.handleCmd(cmdGroupModel);
-                        break;
-                    //控制指令结果
-                    case CMD_TASK_RESULT:
-                        break;
                     //设备创建
                     case IOT_DEVICE_CREATE:
-                        //临时写到这里了,先激活设备,等到负责写链接重新订阅的人统一调整
-                        IotDevice iotDevice = synGlobalTenantInfoDto.getData().to(IotDevice.class);
-                        String serviceName=cacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
-                        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
-                        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
-                        //改设备的所有topics
-                        for(String s:topics){
-                            MqttTopicValue mqttTopicValue=new MqttTopicValue();
-                            mqttTopicValue.setDevCode(iotDevice.getDevCode());
-                            mqttTopicValue.setDevId(iotDevice.getDevBid());
-                            mqttTopicValue.setTopic(s);
-                            mqttTopicValues.add(mqttTopicValue);
-                        }
-                        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
+                        connectionManager.createDeviceHandle(synGlobalTenantInfoDto.getData().to(IotDevice.class));
                         break;
                     //设备更新
                     case IOT_DEVICE_UPDATE:
+                        connectionManager.editDeviceHandle(synGlobalTenantInfoDto.getData().to(IotDevice.class));
                         break;
                     //设备删除
                     case IOT_DEVICE_DELETE:
+                        connectionManager.deleteDeviceHandle(synGlobalTenantInfoDto.getData().to(IotDevice.class));
                         break;
                     //设备连接信息创建
                     case DEVICE_COON_CREATE: