Просмотр исходного кода

调整mqtt链接,合并同地址,同端口,同账号的连接信息

liuyaowen 1 год назад
Родитель
Сommit
b9b937c71d

+ 29 - 23
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -2,8 +2,8 @@ package com.yunfeiyun.agmp.iots.core.manager;
 
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
-import com.yunfeiyun.agmp.common.utils.JSONUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
@@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.RegEx;
 import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.List;
@@ -38,7 +37,7 @@ public class ConnectionManager {
     private HttpManager httpManager;
 
     @Autowired
-    BusinessCoreService businessCoreService;
+    private BusinessCoreService businessCoreService;
     @Resource
     private TypeCacheService typeCacheService;
     @Resource
@@ -56,35 +55,36 @@ public class ConnectionManager {
      */
     @PostConstruct
     public void init() throws MqttException {
-        //先把所有型号查出来
-        List<IotDeviceconnResVo> iotDeviceConnResVoList = businessCoreService.selectTosDevicetypeResVoList();
-        log.info("【初始化】设备型号 构建链接 协议: {}个", iotDeviceConnResVoList.size());
+        deviceconnCacheService.cleanCache();
+        //把所有私有连接信息查询出来
+        List<IotDeviceconnResVo> iotDeviceConnResVoList = businessCoreService.selectDevConnResVoList();
+        log.info("【初始化】链接协议加载: {}个", iotDeviceConnResVoList.size());
         for (IotDeviceconnResVo iotDeviceconnResVo : iotDeviceConnResVoList) {
             deviceconnCacheService.setCache(iotDeviceconnResVo);
-            log.info("【初始化】【开始】协议加载,连接名称:{},厂家:{},类型:{} ,配置:{}", iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getFirmName(), iotDeviceconnResVo.getDevtypeBid(), iotDeviceconnResVo.getDevconnConfig());
+            log.info("【初始化】【开始】链接协议加载,连接名称:{},厂家:{},类型:{} ,配置:{}", iotDeviceconnResVo.getDevconnName(),iotDeviceconnResVo.getFirmName(), iotDeviceconnResVo.getDevtypeBid(), iotDeviceconnResVo.getDevconnConfig());
             //将配置信息转换成jsonObject,这是个数组
             JSONArray jsonConfig = parseConfigJson(iotDeviceconnResVo.getDevconnConfig());
-
             if (jsonConfig == null) {
-                log.info("【初始化】协议加载,厂家:{},失败:配置为空", iotDeviceconnResVo.getFirmName());
+                log.info("【初始化】链接协议加载,厂家:{},失败:配置为空", iotDeviceconnResVo.getFirmName());
                 continue;
             }
             // 遍历多个配置
             for (int j = 0; j < jsonConfig.size(); j++) {
-                buildSingleMqttCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
+                buildConnCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
             }
-
         }
 
     }
 
+
+
     /**
      * 基于单个配置json创建连接,原子类方法,不可拆分
      *
      * @param iotDeviceconnResVo
      * @param jsonConfig
      */
-    void buildSingleMqttCoreByConfig(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) {
+    void buildConnCoreByConfig(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) {
         String type = jsonConfig.getString("type");
         if (TextUtils.isEmpty(type)) {
             log.info("【初始化】协议加载,厂家:{} type 为空:跳过", iotDeviceconnResVo.getFirmName());
@@ -182,7 +182,7 @@ public class ConnectionManager {
             JSONObject connItem = JSONObject.from(connItemObject);
             String type = connItem.getString("type");
             if (TextUtils.isEmpty(type)) {
-                log.info("【设备创建初始化连接】协议加载 设备:{} 的连接信息中的type 为空:跳过", iotDevice.getDevCode());
+                log.info("【设备:{} 删除】协议加载 连接信息中的type 为空:跳过", iotDevice.getDevCode());
                 return;
             }
             try {
@@ -199,12 +199,12 @@ public class ConnectionManager {
                         break;
                     }
                     default: {
-                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(), type);
+                        log.info("【设备:{} 删除】其它类型:{},跳过", iotDevice.getDevCode(), type);
                         return;
                     }
                 }
             } catch (Exception e) {
-                log.error("【设备:{} 创建初始化连接】失败 异常信息:", iotDevice.getDevCode(), e);
+                log.error("【设备:{} 删除】失败 异常信息:", iotDevice.getDevCode(), e);
             }
         }
     }
@@ -223,8 +223,8 @@ public class ConnectionManager {
     public void createIotDeviceconnHandle(IotDeviceconn iotDeviceconn) {
         log.info("【初始化】【开始】协议加载,连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
         //将配置信息转换成jsonObject,这是个数组
-        JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
         deviceconnCacheService.setCache(iotDeviceconn);
+        JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
         if (jsonConfig == null) {
             log.error("【初始化】协议加载失败,连接名称:{},配置为空", iotDeviceconn.getDevconnName());
             return;
@@ -233,7 +233,7 @@ public class ConnectionManager {
         BeanUtils.copyProperties(iotDeviceconn, iotDeviceconnResVo);
         // 遍历多个配置
         for (int j = 0; j < jsonConfig.size(); j++) {
-            buildSingleMqttCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
+            buildConnCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
         }
     }
 
@@ -242,7 +242,6 @@ public class ConnectionManager {
      */
     public void deleteIotDeviceconnHandle(IotDeviceconn iotDeviceconn) {
         log.info("【连接信息删除】连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
-        deviceconnCacheService.deleteCache(iotDeviceconn);
         //将配置信息转换成jsonObject,这是个数组
         JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
         if (jsonConfig == null) {
@@ -263,7 +262,7 @@ public class ConnectionManager {
             try {
                 switch (type) {
                     case "mqtt":
-                        mqttManager.deleteMqttConnection(iotDeviceconnResVo.getDevconnBid());
+                        mqttManager.deleteMqttConnection(iotDeviceconnResVo);
                         break;
                     case "modbus-tcp": {
                         //先不处理,对接到了再梳理
@@ -283,6 +282,7 @@ public class ConnectionManager {
                 log.error("【连接信息{}删除失败】", iotDeviceconnResVo.getDevconnName(), e);
             }
         }
+        deviceconnCacheService.deleteCache(iotDeviceconn);
     }
 
     /**
@@ -300,7 +300,8 @@ public class ConnectionManager {
      * @throws Exception
      */
     private void mqttDeviceCreateHandle(IotDevice iotDevice) throws Exception {
-        String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(iotDevice.getDevtypeBid());
+        String serviceName = iotDeviceDictEnum.getServiceName();
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
         List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
         //改设备的所有topics
@@ -311,7 +312,9 @@ public class ConnectionManager {
             mqttTopicValue.setTopic(s);
             mqttTopicValues.add(mqttTopicValue);
         }
-        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(), serviceName, mqttTopicValues);
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
+        String connId = mqttManager.getMqttConnectionId(iotDeviceconn);
+        mqttManager.topicSingleSubscribeDevice(connId, serviceName, mqttTopicValues);
     }
 
     private void httpDeviceCreateHandle(IotDevice iotDevice) {
@@ -325,7 +328,8 @@ public class ConnectionManager {
      * @throws Exception
      */
     private void mqttDeleteDeviceHandle(IotDevice iotDevice) throws Exception {
-        String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(iotDevice.getDevtypeBid());
+        String serviceName = iotDeviceDictEnum.getServiceName();
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
         List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
         //改设备的所有topics
@@ -336,7 +340,9 @@ public class ConnectionManager {
             mqttTopicValue.setTopic(s);
             mqttTopicValues.add(mqttTopicValue);
         }
-        mqttManager.topicBatchUnSubscribeDevices(iotDevice.getDevconnBid(), serviceName, mqttTopicValues);
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByIotDevice(iotDevice);
+        String connId = mqttManager.getMqttConnectionId(iotDeviceconn);
+        mqttManager.topicBatchUnSubscribeDevices(connId, serviceName, mqttTopicValues);
     }
 
     private void httpDeleteDeviceHandle(IotDevice iotDevice) {

+ 112 - 64
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -1,12 +1,16 @@
 package com.yunfeiyun.agmp.iots.core.manager;
 
 import cn.hutool.core.util.ArrayUtil;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.constant.ErrorCode;
-import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.enums.IotDeviceconnTypeEnum;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
+import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
@@ -16,13 +20,13 @@ import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
@@ -41,12 +45,14 @@ public class MqttManager {
 
     @Autowired
     IIotDeviceService iIotDeviceService;
+    @Autowired
+    private DeviceconnCacheService deviceconnCacheService;
 
 
     /**
      * 实现类名称-->mqtt
      */
-    private Map<String, MqttCore> serviceMqttMap = new HashMap<>();
+    private ConcurrentHashMap<String, MqttCore> mqttCoreMap = new ConcurrentHashMap<>();
 
     /**
      * spring 自动注入
@@ -76,30 +82,17 @@ public class MqttManager {
      */
     public void buildMqttConnection(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig){
         try {
-            log.info("【开始构建MQTT连接】 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
-
             // 构建配置
             MqttConfig cfgYf = buildMqttConfig(iotDeviceconnResVo, jsonConfig);
-            //服务类Bean名称
-            String serviceName = cfgYf.getServiceName();
-            // 之前干嘛的待定,
-            String type = cfgYf.getType();
-            //厂家名称
-            String firmName = cfgYf.getFirmName();
-            log.info("【初始化厂商配置】 {} {} 开始", firmName, type);
-            // 创建MqttCore实例
-            MqttCore mqttCore = new MqttCore();
-
-            // 查询topics【需要实现:重新更改获取该型号下的设备】
-            String connectionId = iotDeviceconnResVo.getDevconnBid();
-            mqttCore.setConnectionId(connectionId);
+            // 生成Mqtt连接标识并写入缓存
+            String connectionId = generateMqttConnectionId(iotDeviceconnResVo,jsonConfig);
+            log.info("【开始构建MQTT连接】 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
             // 构建topic
-            List<IotDevice> devices = deviceTopicService.getDevicesByConectionId(connectionId);
-            log.info("【创建MqttCore实例】 mqttCore: {} connectionId:{} devSize:{}", mqttCore, connectionId, devices.size());
+            List<IotDevice> devices = deviceTopicService.getDevicesByDevConnBid(iotDeviceconnResVo.getDevconnBid());
             List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
             for (IotDevice iotDevice : devices) {
                 // 根据设备code获取topics
-                String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
+                String[] topics = deviceTopicService.getTopic(cfgYf.getServiceName(), iotDevice.getDevCode());
                 if (topics != null) {
                     for (String s : topics) {
                         MqttTopicValue mqttTopicValue = new MqttTopicValue();
@@ -109,34 +102,35 @@ public class MqttManager {
                         mqttTopicValues.add(mqttTopicValue);
                     }
                 }
-
             }
-
             String[] topics = new String[mqttTopicValues.size()];
-            if (mqttTopicValues != null && !mqttTopicValues.isEmpty()) {
+            if (!mqttTopicValues.isEmpty()) {
                 for (int i = 0; i < mqttTopicValues.size(); i++) {
                     topics[i] = mqttTopicValues.get(i).getTopic();
                 }
+            }
+            MqttCore mqttCore = mqttCoreMap.get(connectionId);
+            if(null != mqttCore){
+                // MqttCore已经存在
+               mqttCore.subscribe(topics);
+               mqttCore.bindTopicToDeviceId(mqttTopicValues);
+            }else {
+                // 创建新的mqttCore
+                mqttCore = new MqttCore();
+                mqttCore.setConnectionId(connectionId);
+                mqttCore.bindTopicToDeviceId(mqttTopicValues);
                 cfgYf.setSubTopic(topics);
+                // 构建MqttCore
+                mqttCore.buildMqttCore(cfgYf);
+                addConnectionMap(connectionId, mqttCore);
             }
-            mqttCore.bindTopicToDeviceId(mqttTopicValues);
-            cfgYf.setServiceName(serviceName);
-            log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
-
-            // 处理连接ID的逻辑 IP+port+name
-            log.info("【添加公共连接】 connectionId: {}", connectionId);
-            addConnectionMap(connectionId, mqttCore);
-
-            // 构建MqttCore
-            mqttCore.buildMqttCore(cfgYf);
-            log.info("【成功构建MqttCore】 mqttCore: {}", mqttCore);
-            log.info("【完成构建MQTT连接】");
         } catch (Exception e) {
             log.error("【构建MqttCore失败】 异常信息: {} ,{}", e.getMessage(), e);
         }
     }
 
 
+
     /**
      * 【链接管理-删除】
      * 删除mqtt链接,底层统一调这个
@@ -144,11 +138,20 @@ public class MqttManager {
      * @param connectionId
      * @throws MqttException
      */
-    public void deleteMqttConnection(String connectionId) throws MqttException {
-        log.info("【开始删除MQTT连接】 connectionId: {}", connectionId);
-
+    public void deleteMqttConnection(IotDeviceconnResVo iotDeviceconnResVo) throws MqttException {
+        // 首先获取连接标识
+        String connectionId = deviceconnCacheService.getMqttConnectIdByDeviceConnBid(iotDeviceconnResVo.getDevconnBid());
+        // 删除相关缓存
+        deviceconnCacheService.deleteMqttConnectionId(iotDeviceconnResVo.getDevconnBid());
+        log.info("【删除MQTT连接信息】 获取连接标识 connectionId: {}", connectionId);
+        // 运行到这一步的时候,设备的连接信息都已解绑,相关订阅都已取消
+        if(deviceconnCacheService.mqttConnectionIdHasLink(connectionId)){
+            // 连接标识下仍有其他连接再使用该MqttCore
+            log.info("【删除MQTT连接信息】 连接仍在使用,停止释放连接");
+            return;
+        }
         // 从map中获取MqttCore
-        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        MqttCore mqttCore = mqttCoreMap.get(connectionId);
         if (mqttCore != null) {
             try {
                 // 尝试关闭MqttCore
@@ -165,7 +168,7 @@ public class MqttManager {
                 throw e;
             } finally {
                 // 从map中移除该MqttCore
-                serviceMqttMap.remove(connectionId);
+                mqttCoreMap.remove(connectionId);
                 log.info("【从映射中移除MQTT连接】 connectionId: {}", connectionId);
             }
         } else {
@@ -173,7 +176,6 @@ public class MqttManager {
             log.warn("【尝试关闭不存在的MQTT连接】 connectionId: {}", connectionId);
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), "关闭失败:" + connectionId + "暂无对应对象");
         }
-
         log.info("【完成删除MQTT连接】 connectionId: {}", connectionId);
     }
 
@@ -228,16 +230,16 @@ public class MqttManager {
     private void putConnection(String connectionId, MqttCore mqttCore) {
         log.info("【开始维护连接】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
 
-        if (!serviceMqttMap.containsKey(connectionId)) {
+        if (!mqttCoreMap.containsKey(connectionId)) {
             // 如果 connectionId 不存在于 map 中,则添加新的连接
-            serviceMqttMap.put(connectionId, mqttCore);
+            mqttCoreMap.put(connectionId, mqttCore);
             log.info("【新增连接】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
         } else {
             // 如果 connectionId 已存在于 map 中,则记录重复配置的日志
             log.info("【重复配置】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
         }
 
-        log.info("【完成维护连接】 connectionId: {}, 当前连接数: {}", connectionId, serviceMqttMap.size());
+        log.info("【完成维护连接】 connectionId: {}, 当前连接数: {}", connectionId, mqttCoreMap.size());
     }
 
 
@@ -249,9 +251,9 @@ public class MqttManager {
      * @return
      */
     @Deprecated
-    public MqttPublisher getPublisherByService(String connectionId) {
+    public MqttPublisher getPublisherByDevConnBid(String connectionId) {
         throwDeprecatedMethod("getPublisherByService废弃,发布消息直接通过提供connection 调用 publishMsg");
-        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        MqttCore mqttCore = mqttCoreMap.get(connectionId);
         if (mqttCore == null) {
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
         }
@@ -262,20 +264,19 @@ public class MqttManager {
      * 【操作-发布消息】
      * 发布消息
      *
-     * @param connectionId 链接id
+     * @param devConnBid 设备链接id
      * @param topic
      * @param message
      * @throws MqttException
      */
-    public void publishMsg(String connectionId, String topic, String message) throws MqttException {
+    public void publishMsg(String devConnBid, String topic, String message) throws MqttException {
+        String connectionId = deviceconnCacheService.getMqttConnectIdByDeviceConnBid(devConnBid);
         log.info("【开始发布消息】 connectionId: {}, topic: {}, message: {}", connectionId, topic, message);
-
-        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        MqttCore mqttCore = mqttCoreMap.get(connectionId);
         if (mqttCore == null) {
             log.error("【发布消息失败】 connectionId: {} 暂无对应处理器", connectionId);
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
         }
-
         try {
             mqttCore.publish(topic, message);
             log.info("【消息发布成功】 connectionId: {}, topic: {}, message: {}", connectionId, topic, message);
@@ -348,7 +349,7 @@ public class MqttManager {
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), "【获取MQTT对象失败】:" + connectionId + "对应不存在");
         }
 
-        log.info("【获取MQTT核心类成功】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
+        log.info("【获取MQTT核心类成功】 connectionId: {}, mqttClientId: {}", connectionId, mqttCore.getClient().getClientId());
         return mqttCore;
     }
 
@@ -360,14 +361,14 @@ public class MqttManager {
      * @return
      */
     private MqttCore getMqttCore(String connectionId) {
-        return serviceMqttMap.get(connectionId);
+        return mqttCoreMap.get(connectionId);
     }
 
     /**
      * 打印已经加载的配置信息
      */
     public void showConfig() {
-        for (MqttCore mqttCore : serviceMqttMap.values()) {
+        for (MqttCore mqttCore : mqttCoreMap.values()) {
             log.info("【MQTT】已经记载的配置 FirmBizName:{} ,FirmBizId-devType:{}, ServiceType:{}, ServiceName:{} ,SubTopic:{}", mqttCore.getFirmName(), mqttCore.getServiceName(), mqttCore.getServiceType(), mqttCore.getSubTopic());
         }
     }
@@ -390,19 +391,17 @@ public class MqttManager {
      */
     public void topicBatchSubscribeDevices(String connectionId, String serviceName, List<MqttTopicValue> mqttTopicValues) throws MqttException {
         log.info("【开始批量订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, mqttTopicValues);
-
-        // 获取批量订阅的主题
-        String[] topics = new String[mqttTopicValues.size()];
-        for (int i = 0; i < topics.length; i++) {
-            topics[i] = mqttTopicValues.get(i).getTopic();
-        }
-        log.info("【获取批量订阅主题】 topics: {}", Arrays.toString(topics));
         // 获取MqttCore实例
         MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
-        log.info("【获取MqttCore实例】 mqttCore: {}", mqttCore);
+        log.info("【获取MqttCore实例】 mqttClientId: {}", mqttCore.getClient().getClientId());
+        // 获取批量订阅的主题
+        String[] topics = new String[mqttTopicValues.size()];
         // 执行订阅
         try {
-            mqttCore.subscribe(topics);
+            for (int i = 0; i < topics.length; i++) {
+                topics[i] = mqttTopicValues.get(i).getTopic();
+                mqttCore.getClient().subscribe(topics[i],0);
+            }
             mqttCore.bindTopicToDeviceId(mqttTopicValues);
             log.info("【批量订阅成功】 connectionId: {}, serviceName: {}, topics: {}, serviceType: {}", connectionId, serviceName, Arrays.toString(topics), mqttCore.getServiceType());
         } catch (MqttException e) {
@@ -496,4 +495,53 @@ public class MqttManager {
     public String getDevIdByTopic(String connectionId, String topic) {
         return getMqttCore(connectionId).getDevIdByTopic(topic);
     }
+
+    /**
+     * 获取连接信息,如果缓存中不存在的话,会根据jsonConfig进行解析
+     * @param iotDeviceconn 连接信息
+     * @param jsonConfig 连接信息配置
+     **/
+    public String getMqttConnectionId(IotDeviceconn iotDeviceconn,JSONObject jsonConfig){
+        if(null == iotDeviceconn){
+            throw new IotBizException(ErrorCode.INVALID_PARAMETER.getCode(),"连接信息为空");
+        }
+        return deviceconnCacheService.getMqttConnectIdByDeviceConnBid(iotDeviceconn.getDevconnBid());
+    }
+    /**
+     *
+     * */
+    public String generateMqttConnectionId(IotDeviceconn iotDeviceconn,JSONObject jsonConfig){
+        // 使用mqtt连接的ip+port+user
+        String ip = jsonConfig.getString("ip");
+        if(null != ip){
+            String[] ipItem = ip.split("\\.");
+            StringBuilder ipFormat = new StringBuilder();
+            for(String str : ipItem){
+                ipFormat.append(String.format("%03d",Long.parseLong(str)));
+            }
+            ip = ipFormat.toString();
+        }
+        String connectionId = ip+jsonConfig.getString("port")+jsonConfig.getString("username");
+        if(!IotDeviceconnTypeEnum.COMMON.getCode().equals(iotDeviceconn.getDevconnType())){
+            // 非通用连接
+            // 非通用连接信息会拼装租户标识
+            connectionId = connectionId + iotDeviceconn.getTid();
+        }
+        deviceconnCacheService.setMqttConnectionIdByConnBid(iotDeviceconn.getDevconnBid(),connectionId);
+        return connectionId;
+    }
+    /**
+     * 获取连接信息,如果缓存中不存在的话,会根据jsonConfig进行解析
+     * @param iotDeviceconn 连接信息
+     **/
+    public String getMqttConnectionId(IotDeviceconn iotDeviceconn){
+        JSONArray jsonArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
+        for(int i = 0;i<jsonArray.size();i++){
+            JSONObject configObject = jsonArray.getJSONObject(i);
+            if("mqtt".equals(configObject.getString("type"))){
+                return getMqttConnectionId(iotDeviceconn, configObject);
+            }
+        }
+        return null;
+    }
 }

+ 4 - 4
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/DeviceTopicService.java

@@ -265,15 +265,15 @@ public class DeviceTopicService {
      * @param connectionId
      * @return
      */
-    public String[] getDeviceCodesByConectionId(String connectionId) {
-        List<String> ids = iotDeviceService.getDeviceCodesByConectionId(connectionId);
+    public String[] getDeviceCodesByDevConnBid(String devconnBid) {
+        List<String> ids = iotDeviceService.getDeviceCodesByDevconnBid(devconnBid);
         if (ids == null || ids.isEmpty()) {
             return new String[]{};
         }
         return ids.toArray(new String[]{});
     }
 
-    public List<IotDevice> getDevicesByConectionId(String connectionId) {
-        return iotDeviceService.getDevicesByConectionId(connectionId);
+    public List<IotDevice> getDevicesByDevConnBid(String devConnBid) {
+        return iotDeviceService.getDevicesByDevconnBid(devConnBid);
     }
 }

+ 15 - 9
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -5,14 +5,13 @@ import com.yunfeiyun.agmp.iots.device.common.Device;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.stream.Stream;
 
 /**
  * mqtt 核心类
@@ -133,10 +132,10 @@ public class MqttCore {
         log.info("【初始化】MQTT CORE {},{},{},{},{},", mqttConfig.getDeviceType(), mqttConfig.getIp(), mqttConfig.getPort(), mqttConfig.getUsername(), mqttConfig.getPassword());
         this.mqttConfig = mqttConfig;
         buildClient();
-        connection();
+
         buildPublisher();
         buildSubscriber();
-
+        connection();
     }
 
     /**
@@ -147,8 +146,7 @@ public class MqttCore {
     private void buildClient() throws MqttException {
         //return new MqttClient("tcp://47.96.123.180:1883", UUID.randomUUID().toString().replace("-",""));
         String url = "tcp://" + mqttConfig.getIp() + ":" + mqttConfig.getPort();
-        String clientID = UUID.randomUUID().toString().replace("-", "");
-        this.mqttClient = new MqttClient(url, clientID, new MemoryPersistence());
+        this.mqttClient = new MqttClient(url, connectionId, new MemoryPersistence());
         log.info("【初始化】构建 MQTT client {}", this.mqttClient);
     }
 
@@ -225,7 +223,15 @@ public class MqttCore {
      * @throws MqttException
      */
     public void subscribe(String[] topics) throws MqttException {
-        getClient().subscribe(topics);
+        // 否则向mqttConfig中追加订阅
+        mqttConfig.setSubTopic(Stream.concat(Stream.of(topics),Stream.of(mqttConfig.getSubTopic())).toArray(String[]::new));
+        if(getClient().isConnected()){
+            // 如果此mqtt已经建立了连接,则正常订阅
+            for(String topic :topics){
+                getClient().subscribe(topic);
+                log.info("[MQTT] {} 连接已建立 追加订阅主题 {}", mqttConfig.getDeviceType(), topic);
+            }
+        }
     }
 
     /**

+ 1 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttPublisher.java

@@ -3,6 +3,7 @@ package com.yunfeiyun.agmp.iots.core.mqtt.network;
 
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 
 @Slf4j

+ 9 - 18
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -24,26 +24,14 @@ public class MqttSubscriber implements MqttCallbackExtended {
     public void init(MqttCore mqttCore) throws MqttException {
         this.mqttCore = mqttCore;
         this.mqttClient = mqttCore.getClient();
-        String[] topics = mqttCore.getSubTopic();
         mqttClient.setCallback(this);
-        if(topics!=null&&topics.length!=0){
-            for (String topic : topics) {
-                //TODO 测试设备乱发离线指令,暂时不订阅它
-                if ("/yfkj/cbd/offline/861551058867599".equals(topic)) {
-                    continue;
-                }
-                mqttClient.subscribe(topic);
-                log.debug("mqtt 主题已订阅 {},{}", mqttCore.getServiceType(), topic);
-            }
-        }
-
     }
 
     @Override
     public void connectionLost(Throwable throwable) {
         // 连接丢失时的处理逻辑
-        log.info("[MQTT] 连接断开,正在尝试重连...");
-        log.info(mqttClient.getServerURI());
+        log.info("[MQTT] 链接标识:{} URL:{} 连接断开,正在尝试重连...",mqttClient.getClientId(),mqttClient.getServerURI());
+
     }
 
     /**
@@ -96,12 +84,15 @@ public class MqttSubscriber implements MqttCallbackExtended {
      */
     @Override
     public void connectComplete(boolean reconnect, String serverURI) {
-        log.info("[MQTT] 连接断开,重连成功..." + serverURI);
-
+        String loggerMsg = "初始化连接成功";
+        if(reconnect){
+            loggerMsg = "连接断开,重连成功";
+        }
+        log.info("[MQTT] {}...{}" ,loggerMsg, serverURI);
         String[] topics = mqttCore.getSubTopic();
         if(topics==null||topics.length==0){
             mqttClient.setCallback(this);
-            log.info("[MQTT] 连接断开,重连成功 无topic ");
+            log.info("[MQTT] {} 无topic ",loggerMsg);
             return;
         }
         for (String topic : topics) {
@@ -111,7 +102,7 @@ public class MqttSubscriber implements MqttCallbackExtended {
             }
             try {
                 mqttClient.subscribe(topic);
-                log.info("[MQTT] 连接断开,重连成功 重新订阅主题 {},{}", mqttCore.getServiceType(), topic);
+                log.info("[MQTT]{} {} 订阅主题 {}",mqttCore.getServiceType(), loggerMsg, topic);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/mapper/BusinessCoreMapper.java

@@ -1,7 +1,6 @@
 package com.yunfeiyun.agmp.iots.device.mapper;
 
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
-import com.yunfeiyun.agmp.iots.common.modal.TosDevicetypeResVo;
 
 import java.util.List;
 
@@ -11,5 +10,6 @@ public interface BusinessCoreMapper {
      *
      * @return
      */
-    List<IotDeviceconnResVo> selectTosDevicetypeResVoList();
+    List<IotDeviceconnResVo> selectDevConnResVoList();
+
 }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/mapper/IotDeviceMapper.java

@@ -96,9 +96,9 @@ public interface IotDeviceMapper {
 
     int updateIotDeviceStatusByType(@Param("devStatus") String devStatus,@Param("firmBid") String firmBid,@Param("devtypeBid")String devtypeBid);
 
-    List<String> getDeviceCodesByConectionId(@Param("connectionId") String connectionId);
+    List<String> getDeviceCodesByDevconnBid(@Param("devconnBid") String devconnBid);
 
-    List<IotDevice> getDevicesByConectionId(String connectionId);
+    List<IotDevice> getDevicesByDevconnBid(String devconnBid);
 
     List<IotDevice> selectIotDeviceByDevtypeBidList(List<String> devTypeBidList);
 }

+ 4 - 10
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -8,16 +8,13 @@ import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
 import com.yunfeiyun.agmp.iot.common.constant.mq.IotActionEnums;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
-import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
-import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
-import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
 import com.yunfeiyun.agmp.iots.core.manager.ConnectionManager;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
-import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.device.serviceImp.IotCbdImgService;
 import lombok.extern.slf4j.Slf4j;
@@ -29,9 +26,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * 负责处理AGMP 子系统的消息
@@ -82,15 +76,15 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
                         connectionManager.deleteDeviceHandle(synGlobalTenantInfoDto.getData().to(IotDevice.class));
                         break;
                     //设备连接信息创建
-                    case DEVICE_COON_CREATE:
+                    case DEVICE_CONN_CREATE:
                         connectionManager.createIotDeviceconnHandle(synGlobalTenantInfoDto.getData().to(IotDeviceconn.class));
                         break;
                     //设备连接信息更新
-                    case DEVICE_COON_UPDATE:
+                    case DEVICE_CONN_UPDATE:
                         connectionManager.editIotDeviceconnHandle(synGlobalTenantInfoDto.getData().to(IotDeviceconn.class));
                         break;
                     //设备连接信息删除
-                    case DEVICE_COON_DELETE:
+                    case DEVICE_CONN_DELETE:
                         connectionManager.deleteIotDeviceconnHandle(synGlobalTenantInfoDto.getData().to(IotDeviceconn.class));
                         break;
                     //更新所有设备信息

+ 2 - 4
src/main/java/com/yunfeiyun/agmp/iots/service/BusinessCoreService.java

@@ -1,7 +1,6 @@
 package com.yunfeiyun.agmp.iots.service;
 
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
-import com.yunfeiyun.agmp.iots.common.modal.TosDevicetypeResVo;
 import com.yunfeiyun.agmp.iots.device.mapper.BusinessCoreMapper;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -22,9 +21,8 @@ public class BusinessCoreService {
      *
      * @return
      */
-    public List<IotDeviceconnResVo> selectTosDevicetypeResVoList() {
-
-        return businessCoreMapper.selectTosDevicetypeResVoList();
+    public List<IotDeviceconnResVo> selectDevConnResVoList() {
+        return businessCoreMapper.selectDevConnResVoList();
     }
 
 }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/service/IIotDeviceService.java

@@ -130,9 +130,9 @@ public interface IIotDeviceService {
 
     public void updateIotDeviceBatch(List<IotDevice> iotDeviceList);
 
-    List<String> getDeviceCodesByConectionId(String connectionId);
+    List<String> getDeviceCodesByDevconnBid(String connectionId);
 
-    List<IotDevice> getDevicesByConectionId(String connectionId);
+    List<IotDevice> getDevicesByDevconnBid(String connectionId);
 
     List<IotDevice> selectIotDeviceByDevtypeBidList(List<String> devTypeBidList);
 }

+ 4 - 4
src/main/java/com/yunfeiyun/agmp/iots/service/impl/IotDeviceServiceImpl.java

@@ -317,14 +317,14 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
     }
 
     @Override
-    public List<String> getDeviceCodesByConectionId(String connectionId) {
-        return iotDeviceMapper.getDeviceCodesByConectionId(connectionId);
+    public List<String> getDeviceCodesByDevconnBid(String devConnBid) {
+        return iotDeviceMapper.getDeviceCodesByDevconnBid(devConnBid);
     }
 
     @Override
-    public List<IotDevice> getDevicesByConectionId(String connectionId) {
+    public List<IotDevice> getDevicesByDevconnBid(String devConnBid) {
 
-        return iotDeviceMapper.getDevicesByConectionId(connectionId);
+        return iotDeviceMapper.getDevicesByDevconnBid(devConnBid);
     }
 
     @Override

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/task/IotStatusService.java

@@ -76,7 +76,7 @@ public class IotStatusService {
     /**
      * 定期根据类型查设备最新设备数据,是否长时间不上报。
      */
-    @Scheduled(cron = "0 0 */1 * * ?")
+    //  @Scheduled(cron = "0 0 */1 * * ?")
     public void validateStatusByDevType() {
         printMqttStatus();
         Iterator<String> iterator = validateDeviceType.iterator();

+ 1 - 2
src/main/resources/mapper/BusinessCoreMapper.xml

@@ -6,7 +6,7 @@
 
 
     <!-- * 这个方法的定位就是:获取设备型号的信息,用于构建链接 *-->
-    <select id="selectTosDevicetypeResVoList"
+    <select id="selectDevConnResVoList"
             resultType="com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo">
             SELECT
                 ic.*,
@@ -14,7 +14,6 @@
         it.devtypeCode ,
         ifd.firmName,
         ifd.firmBid
-
                 from
                 IotDeviceconn ic
         LEFT JOIN TosDevicetype it on ic.devtypeBid = it.devtypeBid

+ 4 - 4
src/main/resources/mapper/IotDeviceMapper.xml

@@ -511,11 +511,11 @@
         and d.devDelstatus='0'
         group by d.devBid
     </select>
-    <select id="getDeviceCodesByConectionId" resultType="java.lang.String">
-        select  devCode from IotDevice where devconnBid=#{connectionId} and devDelstatus='0'
+    <select id="getDeviceCodesByDevconnBid" resultType="java.lang.String">
+        select  devCode from IotDevice where devconnBid=#{devconnBid} and devDelstatus='0'
     </select>
-    <select id="getDevicesByConectionId" resultType="com.yunfeiyun.agmp.iot.common.domain.IotDevice">
-                select  * from IotDevice where devconnBid=#{connectionId} and devDelstatus='0'
+    <select id="getDevicesByDevconnBid" resultType="com.yunfeiyun.agmp.iot.common.domain.IotDevice">
+        select * from IotDevice where devconnBid=#{devconnBid} and devDelstatus='0'
     </select>
     <select id="selectIotDeviceByDevtypeBidList" resultType="com.yunfeiyun.agmp.iot.common.domain.IotDevice">
         <include refid="selectIotDeviceVo"/>