瀏覽代碼

阶段性提交:构建mqttmanager相关管理方法,废弃代码

yf_zn 1 年之前
父節點
當前提交
fb77dd7333

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -54,14 +54,14 @@ public class ConnectionManager {
             }
             switch (type) {
                 case "mqtt":
-                    mqttManager.initConfig(iotFirmdevResVo, jsonConfig);
+                    mqttManager.buildMqttConnection(iotFirmdevResVo, jsonConfig);
                     break;
                 case "modbus-tcp": {
                    //先不处理,对接到了再梳理
                     break;
                 }
                 case "http": {
-                    httpManager.init(iotFirmdevResVo, jsonConfig);
+                    httpManager.buildHttpConnection(iotFirmdevResVo, jsonConfig);
                     break;
                 }
                 default: {

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/manager/HttpManager.java

@@ -37,7 +37,7 @@ public class HttpManager {
     /**
      * 初始化
      */
-    public void init(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
+    public void buildHttpConnection(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
 
         log.info("【http初始化】加载配置{}",jsonConfig);
 

+ 255 - 134
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -32,12 +32,21 @@ import java.util.*;
  * 须知:
  * 1. 只要获取与mqtt通道有关的,硬通货就是connectionId
  * 2. 只要和数据代码解析,复用代码类的硬通货就是serviceName
- *
+ * 该类旨在提供设备的链接建立、删除、topic的订阅和删除,何时何地调用外层业务单独一个类统一调度
  */
 @Component
 @Slf4j
 public class MqttManager {
 
+    @Autowired
+    DeviceTopicService deviceTopicService;
+
+    @Autowired
+    IIotDeviceService iIotDeviceService;
+
+    @Autowired
+    IIotFirmdevService iotFirmdevService;
+
     /**
      * 实现类名称-->mqtt
      */
@@ -53,66 +62,113 @@ public class MqttManager {
     private CmdResultCheckService cmdResultCheckService;
 
 
-    @Autowired
-    DeviceTopicService deviceTopicService;
-
-    @Autowired
-    IIotDeviceService iIotDeviceService;
-
-    @Autowired
-    IIotFirmdevService iotFirmdevService;
 
     /**
      * 强制对于已经废弃/逻辑需要重新梳理抛出异常
      */
-    void throwDeprecatedMethod(){
-        throw new IotBizException(ErrorCode.FAILURE.getCode(),"方法已经废弃/逻辑需要重新梳理,重新更换");
+    void throwDeprecatedMethod(String msg){
+        throw new IotBizException(ErrorCode.FAILURE.getCode(),msg == null ? "方法已经废弃/逻辑需要重新梳理,重新更换" : msg);
     }
 
 
+    // mqtt链接相关开始############################################################################
+
     /**
-     * 根据厂家配置构建MqttCore
-     *
+     * 【链接管理-创建】
+     * 根据配置构建 MqttCoreConnection
+     * 创建连接,底层统一调这个
      * @param iotFirmdevResVo
      * @param jsonConfig
      * @throws MqttException
      */
-    public void initConfig(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) throws MqttException {
-        MqttConfig cfgYf = getMqttConfig(iotFirmdevResVo, jsonConfig);
+    public void buildMqttConnection(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) throws MqttException {
+        log.info("【开始构建MQTT连接】 iotFirmdevResVo: {}, jsonConfig: {}", iotFirmdevResVo, jsonConfig);
+
+        // 构建配置
+        MqttConfig cfgYf = buildMqttConfig(iotFirmdevResVo, jsonConfig);
         String firmBizId = cfgYf.getFirmBizId();
         String serviceName = cfgYf.getServiceName();
         String type = cfgYf.getType();
         String firmName = cfgYf.getFirmName();
         String deviceTypeId = cfgYf.getDeviceTypeBizId();
-        log.info("【初始化厂商 配置 】 {} {} 开始 ##########################", firmName, type);
+        log.info("【初始化厂商配置】 {} {} 开始", firmName, type);
+
+        // 创建MqttCore实例
         MqttCore mqttCore = new MqttCore();
-        //查询topics
+        log.info("【创建MqttCore实例】 mqttCore: {}", mqttCore);
+
+        // 查询topics
         String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceTopicService.getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
         cfgYf.setSubTopic(topics);
         cfgYf.setServiceName(serviceName);
-        log.info("【mqtt:{} 】  {} ", cfgYf.getDeviceType(), firmBizId);
-        log.info("【初始化厂商 加载 配置】 {} {} {} ", firmName, type, topics);
-        //这里从配置获取到一个链接的唯一标识,内部处理复用问题,目前来看,链接id只要相同就实现复用链接
-        //【这个id的生成用一个公共的方法】
-        if("私有".equals("查询字段")){
+        log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
+
+        // 处理连接ID的逻辑
+        if ("私有".equals("查询字段")) {
+            log.info("【添加私有连接】 connectionId: connectionId");
             addPrivateConnectionMap("connectionId", mqttCore);
-        }else{
+        } else {
+            log.info("【添加公共连接】 connectionId: connectionId");
             addCommonConnectionMap("connectionId", mqttCore);
         }
-        // 这个checker待梳理
-        mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
-        log.info("【初始化厂商 配置 】 {} {} 完成 ##########################", firmName, type);
+
+        // 构建MqttCore
+        try {
+            mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
+            log.info("【成功构建MqttCore】 mqttCore: {}", mqttCore);
+        } catch (MqttException e) {
+            log.error("【构建MqttCore失败】 异常信息: {}", e.getMessage(), e);
+            throw e; // 重新抛出异常,以便上层处理
+        }
+
+        log.info("【初始化厂商配置】 {} {} 完成", firmName, type);
+        log.info("【完成构建MQTT连接】");
     }
 
 
     /**
-     * 构建配置
+     * 【链接管理-删除】
+     * 删除mqtt链接,底层统一调这个
+     * @param connectionId
+     * @throws MqttException
+     */
+    public void deleteMqttConnection(String connectionId) throws MqttException {
+        log.info("【开始删除MQTT连接】 connectionId: {}", connectionId);
+
+        // 从map中获取MqttCore
+        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        if (mqttCore != null) {
+            try {
+                // 尝试关闭MqttCore
+                log.info("【尝试关闭MQTT连接】 connectionId: {}", connectionId);
+                mqttCore.close();
+                log.info("【成功关闭MQTT连接】 connectionId: {}", connectionId);
+            } catch (Exception e) {
+                // 记录关闭失败的日志
+                log.error("【链接关闭失败】 connectionId: {}, 异常信息: {}", connectionId, e.getMessage(), e);
+            }
+
+            // 从map中移除该MqttCore
+            serviceMqttMap.remove(connectionId);
+            log.info("【从映射中移除MQTT连接】 connectionId: {}", connectionId);
+        } else {
+            // 如果找不到对应的MqttCore,则抛出异常
+            log.warn("【尝试关闭不存在的MQTT连接】 connectionId: {}", connectionId);
+            throw new IotBizException(IotErrorCode.FAILURE.getCode(), "关闭失败:" + connectionId + "暂无对应对象");
+        }
+
+        log.info("【完成删除MQTT连接】 connectionId: {}", connectionId);
+    }
+
+
+    /**
+     * 将json配置文件解析,构建配置
      *
      * @param iotFirmdevResVo
      * @param jsonConfig
      * @return
      */
-    MqttConfig getMqttConfig(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
+    MqttConfig buildMqttConfig(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
         MqttConfig cfgYf = new MqttConfig();
         String firmBizId = iotFirmdevResVo.getFirmBid();
         String serviceName = jsonConfig.getString("service");
@@ -135,29 +191,13 @@ public class MqttManager {
         return cfgYf;
     }
 
-    //链接相关
-
-
-    /**
-     * 获取Publisher 根据服务名称
-     *
-     * @param connectionId
-     * @return
-     */
-    public MqttPublisher getPublisherByService(String connectionId) {
-        MqttCore mqttCore = serviceMqttMap.get(connectionId);
-        if (mqttCore == null) {
-            throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
-        }
-        return mqttCore.getMqttPublisher();
-    }
 
     /**
      * 添加私有的链接:自行修改入参,链接的id生成
      * @param connectionId
      * @param mqttCore
      */
-    public void addPrivateConnectionMap(String connectionId, MqttCore mqttCore) {
+    private void addPrivateConnectionMap(String connectionId, MqttCore mqttCore) {
         putConnection(connectionId,mqttCore);
 
     }
@@ -167,70 +207,141 @@ public class MqttManager {
      * @param connectionId
      * @param mqttCore
      */
-    public void addCommonConnectionMap(String connectionId, MqttCore mqttCore) {
+    private void addCommonConnectionMap(String connectionId, MqttCore mqttCore) {
         putConnection(connectionId,mqttCore);
 
     }
 
     /**
-     * 放链接
+     * 维护链接,connectionId => mqttCore
      * @param connectionId
      * @param mqttCore
      */
-    void putConnection(String connectionId, MqttCore mqttCore){
+    private void putConnection(String connectionId, MqttCore mqttCore) {
+        log.info("【开始维护连接】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
+
         if (!serviceMqttMap.containsKey(connectionId)) {
-            // 维护 服务名字--> mqtt
+            // 如果 connectionId 不存在于 map 中,则添加新的连接
             serviceMqttMap.put(connectionId, mqttCore);
+            log.info("【新增连接】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
         } else {
-            log.info("【MQTT】重复配置 connectionId:{} ", connectionId);
+            // 如果 connectionId 已存在于 map 中,则记录重复配置的日志
+            log.info("【重复配置】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
+        }
+
+        log.info("【完成维护连接】 connectionId: {}, 当前连接数: {}", connectionId, serviceMqttMap.size());
+    }
+
+
+    /**
+     * 获取Publisher 根据服务名称
+     * 之前对外暴漏publish,现在将其包装,直接提供发布方法
+     * @param connectionId
+     * @return
+     */
+    @Deprecated
+    public MqttPublisher getPublisherByService(String connectionId) {
+        throwDeprecatedMethod("getPublisherByService废弃,发布消息直接通过提供connection 调用 publishMsg");
+        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        if (mqttCore == null) {
+            throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
+        }
+        return mqttCore.getMqttPublisher();
+    }
+
+    /**
+     * 【操作-发布消息】
+     * 发布消息
+     * @param connectionId 链接id
+     * @param topic
+     * @param message
+     * @throws MqttException
+     */
+    public void publishMsg(String connectionId, String topic, String message) throws MqttException {
+        log.info("【开始发布消息】 connectionId: {}, topic: {}, message: {}", connectionId, topic, message);
+
+        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        if (mqttCore == null) {
+            log.error("【发布消息失败】 connectionId: {} 暂无对应处理器", connectionId);
+            throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
+        }
+
+        try {
+            mqttCore.publish(topic, message);
+            log.info("【消息发布成功】 connectionId: {}, topic: {}, message: {}", connectionId, topic, message);
+        } catch (MqttException e) {
+            log.error("【消息发布失败】 connectionId: {}, topic: {}, message: {}, 异常信息: {}", connectionId, topic, message, e.getMessage(), e);
+            throw e;
         }
     }
 
     /**
      * 获取设备实现类
      *
-     * @return
+     * @param serviceName 服务名
+     * @return 设备处理器
      */
     public Device getDeviceHandler(String serviceName) {
-        log.info("【初始化】获取设备处理器 {}-{}", serviceName, deviceHandlerMap);
+        log.info("【开始获取设备处理器】 serviceName: {}", serviceName);
+
         Device device = deviceHandlerMap.get(serviceName);
         if (device == null) {
+            log.error("【获取设备处理器失败】 serviceName: {} 暂无对应处理器", serviceName);
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), serviceName + "暂无对应处理器");
         }
+        log.info("【获取设备处理器成功】 serviceName: {}, device: {}", serviceName, device);
         return device;
     }
 
     /**
      * 根据connectionId获取对应的设备类型id
      *
-     * @param connectionId
-     * @return
+     * @param connectionId 连接ID
+     * @return 设备类型ID
      */
     public String getDeviceTypeBizId(String connectionId) {
-        return getMqttCoreByService(connectionId).getServiceBizId();
+        log.info("【开始获取设备类型ID】 connectionId: {}", connectionId);
+
+        MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
+        String deviceTypeBizId = mqttCore.getServiceBizId();
+
+        log.info("【获取设备类型ID成功】 connectionId: {}, deviceTypeBizId: {}", connectionId, deviceTypeBizId);
+        return deviceTypeBizId;
     }
 
     /**
      * 根据connectionId获取对应的厂家id
      *
-     * @param connectionId
-     * @return
+     * @param connectionId 连接ID
+     * @return 厂家ID
      */
     public String getFirmBizId(String connectionId) {
-        return getMqttCoreByService(connectionId).getFirmBizId();
+        log.info("【开始获取厂家ID】 connectionId: {}", connectionId);
+
+        MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
+        String firmBizId = mqttCore.getFirmBizId();
+
+        log.info("【获取厂家ID成功】 connectionId: {}, firmBizId: {}", connectionId, firmBizId);
+        return firmBizId;
     }
 
+
     /**
      * 根据connectionId获取对应mqtt核心类
      *
-     * @param connectionId
-     * @return
+     * @param connectionId 连接ID
+     * @return MqttCore实例
      */
-    private MqttCore getMqttCoreByService(String connectionId) {
+    private MqttCore getMqttCoreByConnectionId(String connectionId) {
+        log.info("【开始获取MQTT核心类】 connectionId: {}", connectionId);
+
         MqttCore mqttCore = getMqttCore(connectionId);
         if (mqttCore == null) {
+            log.error("【获取MQTT对象失败】 connectionId: {} 对应不存在", connectionId);
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), "【获取MQTT对象失败】:" + connectionId + "对应不存在");
         }
+
+        log.info("【获取MQTT核心类成功】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
         return mqttCore;
     }
 
@@ -242,7 +353,7 @@ public class MqttManager {
      * @param connectionId
      * @return
      */
-    public MqttCore getMqttCore(String connectionId) {
+    private MqttCore getMqttCore(String connectionId) {
         return serviceMqttMap.get(connectionId);
     }
 
@@ -255,119 +366,129 @@ public class MqttManager {
         }
     }
 
+
     //订阅相关开始############################################################################
 
     /**
-     * 批量订阅,根据一种链接的同一类serviceName的多个设备
+     * 【订阅-批量订阅】
+     * 原子类方法,不拆不合
+     * 批量订阅,根据一种链接的同一类serviceName的多个设备,
      * 如此分类的原因
      * 1. 相同connectionId下serviceName不一定相同
      * 2. 相同的serviceName的connectionId不一定相同
+     * @param connectionId
      * @param serviceName
      * @param deviceIds
      * @throws MqttException
      */
-    public void topicSubscribeBatchByType(String connectionId,String serviceName,List<String>  deviceIds) throws MqttException {
+    public void topicBatchSubscribeDevices(String connectionId, String serviceName, List<String> deviceIds) throws MqttException {
+        log.info("【开始批量订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, deviceIds);
+
+        // 获取批量订阅的主题
         String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceIds.toArray(new String[0]));
-        MqttCore mqttCore = getMqttCoreByService(connectionId);
-        MqttClient mqttClient = mqttCore.getClient();
-        mqttClient.subscribe(topics);
-        log.info("【设备订阅】【订阅】【重连】:设备:{} {} ", topics, mqttCore.getServiceType());
+        log.info("【获取批量订阅主题】 topics: {}", Arrays.toString(topics));
+
+        // 获取MqttCore实例
+        MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
+        log.info("【获取MqttCore实例】 mqttCore: {}", mqttCore);
+
+        // 执行订阅
+        try {
+            mqttCore.subscribe(topics);
+            log.info("【批量订阅成功】 connectionId: {}, serviceName: {}, topics: {}, serviceType: {}", connectionId, serviceName, Arrays.toString(topics), mqttCore.getServiceType());
+        } catch (MqttException e) {
+            log.error("【批量订阅失败】 connectionId: {}, serviceName: {}, topics: {}, 异常信息: {}", connectionId, serviceName, Arrays.toString(topics), e.getMessage(), e);
+            throw e;
+        }
+
+        log.info("【完成批量订阅】 connectionId: {}, serviceName: {}, topics: {}", connectionId, serviceName, Arrays.toString(topics));
     }
 
     /**
+     * 【订阅-单个订阅】
      * 单个设备订阅
      * @param connectionId
      * @param serviceName
      * @param deviceId
      * @throws MqttException
      */
-    public void subscribe(String connectionId,String serviceName,String deviceId) throws MqttException {
-        List<String>  deviceIds =new ArrayList<>();
+    public void topicSingleSubscribeDevice(String connectionId, String serviceName, String deviceId) throws MqttException {
+        log.info("【开始单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
+
+        // 转换为批量订阅
+        List<String> deviceIds = new ArrayList<>();
         deviceIds.add(deviceId);
-        topicSubscribeBatchByType(connectionId,serviceName,deviceIds);
 
+        // 调用批量订阅方法
+        topicBatchSubscribeDevices(connectionId, serviceName, deviceIds);
+
+        log.info("【完成单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
     }
 
 
     /**
-     * 批量取消订阅
-     *
-     * @param
-     * @param
+     * 【订阅-批量取消订阅
+     * @param connectionId
+     * @param serviceName
+     * @param deviceIds
      * @throws MqttException
      */
-    public void topicSubscribeCancelByBatch(String connectionId,String serviceName,List<String>  deviceIds) throws MqttException {
+    public void topicBatchUnSubscribeDevices(String connectionId, String serviceName, List<String> deviceIds) throws MqttException {
+        log.info("【开始批量取消订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, deviceIds);
+
+        // 获取批量取消订阅的主题
         String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceIds.toArray(new String[0]));
-        MqttCore mqttCore = getMqttCoreByService(connectionId);
-        MqttClient mqttClient = mqttCore.getClient();
-        mqttClient.unsubscribe(topics);
+        log.info("【获取批量取消订阅主题】 topics: {}", Arrays.toString(topics));
+
+        // 获取MqttCore实例
+        MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
+        log.info("【获取MqttCore实例】 mqttCore: {}", mqttCore);
+
+        // 执行取消订阅
+        try {
+            mqttCore.unsubscribe(topics);
+            log.info("【批量取消订阅成功】 connectionId: {}, serviceName: {}, topics: {}", connectionId, serviceName, Arrays.toString(topics));
+        } catch (MqttException e) {
+            log.error("【批量取消订阅失败】 connectionId: {}, serviceName: {}, topics: {}, 异常信息: {}", connectionId, serviceName, Arrays.toString(topics), e.getMessage(), e);
+            throw e;
+        }
+
+        log.info("【完成批量取消订阅】 connectionId: {}, serviceName: {}, topics: {}", connectionId, serviceName, Arrays.toString(topics));
     }
 
     /**
-     * 单个设备取消订阅
+     * 【订阅-单个设备取消订阅
      * @param connectionId
      * @param serviceName
      * @param deviceId
      * @throws MqttException
      */
-    public void subtopicSubscribeCancel(String connectionId,String serviceName,String deviceId) throws MqttException {
-        List<String>  deviceIds =new ArrayList<>();
-        deviceIds.add(deviceId);
-        topicSubscribeCancelByBatch(connectionId,serviceName,deviceIds);
-    }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+    public void topicSingleUnSubscribeDevice(String connectionId, String serviceName, String deviceId) throws MqttException {
+        log.info("【开始单个设备取消订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
 
+        // 转换为批量取消订阅
+        List<String> deviceIds = new ArrayList<>();
+        deviceIds.add(deviceId);
 
-    //¥##########################废弃代码,有替换的替换,没替换的重新梳理
+        // 调用批量取消订阅方法
+        topicBatchUnSubscribeDevices(connectionId, serviceName, deviceIds);
 
-    /**
-     * mqtt 手动动态订阅
-     * 废弃,换topicSubscribeBatchByType
-     * @throws MqttException
-     *
-     */
-    @Deprecated
-    public void subscribeByBatch(Map<String, List<String>> serviceNameMap) throws MqttException {
-        throwDeprecatedMethod();
-        for (Map.Entry<String, List<String>> entry : serviceNameMap.entrySet()) {
-            String serviceName = entry.getKey();
-            String[] topics = deviceTopicService.getBatchTopic(serviceName, entry.getValue().toArray(new String[0]));
-            MqttCore mqttCore = getMqttCoreByService(serviceName);
-            MqttClient mqttClient = mqttCore.getClient();
-            mqttClient.subscribe(topics);
-            log.info("【设备订阅】【订阅】【重连】:设备:{} {} ", topics, mqttCore.getServiceType());
-        }
-    }
-    @Deprecated
-    public void subscribeByBatch(String[] topics, MqttCore mqttCore) throws MqttException {
-        throwDeprecatedMethod();
-        MqttClient mqttClient = mqttCore.getClient();
-        mqttClient.subscribe(topics);
-        log.info("【设备订阅】【多个相同服务共用一个客户端 需要手动订阅】:设备:{} {} {}", topics, mqttCore.getServiceType(), mqttCore);
+        log.info("【完成单个设备取消订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
     }
 
 
+    //订阅相关结束############################################################################
 
 
     /**
+     * @deprecated
+     * 逻辑需要基于connectionId梳理
+     *
      * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
      */
     @Deprecated
     public void reSubscribe() {
-        throwDeprecatedMethod();
+        throwDeprecatedMethod("重新订阅逻辑不完善,需要梳理构建新的逻辑");
         IotFirmdev iotFirmdev = new IotFirmdev();
         List<IotFirmdevResVo> iotFirmdevResVos = iotFirmdevService.selectIotFirmdevList(iotFirmdev);
         for (IotFirmdevResVo iotFirmdevResVo : iotFirmdevResVos) {
@@ -385,6 +506,8 @@ public class MqttManager {
     }
 
     /**
+     * @deprecated
+     * 逻辑需要基于connectionId梳理
      * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
      *
      * @param iotFirmdevResVo
@@ -393,11 +516,11 @@ public class MqttManager {
     @Deprecated
     private void startSubscribe(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
         log.info("【设备重新订阅】【订阅】【重连】:设备:{} {}", iotFirmdevResVo, jsonConfig);
-        MqttConfig cfgYf = getMqttConfig(iotFirmdevResVo, jsonConfig);
+        MqttConfig cfgYf = buildMqttConfig(iotFirmdevResVo, jsonConfig);
         String firmBizId = cfgYf.getFirmBizId();
         String serviceName = cfgYf.getServiceName();
         String deviceTypeId = cfgYf.getDeviceTypeBizId();
-
+        String connectionId="获取链接id";
         long nowTime = DateUtils.getNowDate().getTime() - (2 * 60 * 60 * 1000);
         String devUpdateddate = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, new Date(nowTime));
 
@@ -407,7 +530,7 @@ public class MqttManager {
         selectIotDevice.setDevUpdateddate(devUpdateddate);
 
         List<IotDevice> iotDeviceList = iIotDeviceService.selectIotDeviceList(selectIotDevice);
-        if (iotDeviceList == null || iotDeviceList.size() == 0) {
+        if (iotDeviceList.isEmpty()) {
             log.warn("【设备重新订阅】【订阅】【重连】:设备:{} 没有找到设备", serviceName);
             return;
         }
@@ -419,16 +542,14 @@ public class MqttManager {
             return;
         }
 
-        MqttCore mqttCore = getMqttCoreByService(serviceName);
+        MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
         try {
             MqttClient mqttClient = mqttCore.getClient();
             if (!mqttClient.isConnected()) {
-                mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
+                buildMqttConnection(iotFirmdevResVo,jsonConfig);
             } else {
-                for (String topic : topics) {
-                    mqttClient.unsubscribe(topic);
-                    mqttClient.subscribe(topic);
-                }
+                mqttCore.unsubscribe(topics);
+                mqttCore.subscribe(topics);
             }
 
         } catch (Exception e) {

+ 26 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -131,7 +131,6 @@ public class MqttCore {
         String url = "tcp://" + mqttConfig.getIp() + ":" + mqttConfig.getPort();
         String clientID = UUID.randomUUID().toString().replace("-", "");
         this.mqttClient = new MqttClient(url, clientID, new MemoryPersistence());
-        ;
         log.info("【初始化】构建 MQTT client {}", this.mqttClient);
     }
 
@@ -186,4 +185,30 @@ public class MqttCore {
         log.debug("发mqtt:topic:" + topic + "  msg:" + message);
         mqttPublisher.publish(topic, message);
     }
+
+    /**
+     * 关闭链接
+     * @throws MqttException
+     */
+    public void close() throws MqttException {
+        this.mqttClient.close();
+    }
+
+    /**
+     * 订阅。原始类不对外暴漏
+     * @param topics
+     * @throws MqttException
+     */
+    public void subscribe(String[] topics) throws MqttException {
+        getClient().subscribe(topics);
+    }
+
+    /**
+     * 取消订阅,原始类不对外暴漏
+     * @param topics
+     * @throws MqttException
+     */
+    public void unsubscribe(String[] topics) throws MqttException {
+        getClient().unsubscribe(topics);
+    }
 }

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/BySfDeviceImpl.java

@@ -238,7 +238,7 @@ public class BySfDeviceImpl implements IBySfDevice {
 
 
         // 更新设备数据信息到数据库 mongodb
-        iotDevice.setCId(iotDeviceOld.getCId());
+        //iotDevice.setCId(iotDeviceOld.getCId());
         iIotBoyunsfdataService.insertBoyunsfdata(iotDevice, sfData);
 
         // 保存 设备最新数据 到redis

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IotDeviceServiceImpl.java

@@ -110,7 +110,7 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
         iotDevice.setDevtypeBid(devtypeBid);
         iotDevice.setFirmBid(firmBid);
         iotDevice.setDevCode(devCode);
-        iotDevice.setExtInfo(extinfo);
+        //iotDevice.setExtInfo(extinfo);
         return selectIotDeviceByTypeFirmCodeExt(iotDevice);
     }
 
@@ -326,7 +326,7 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
      * @return
      */
     private boolean needFakeStatus(String devBid){
-        String ignorStatusDeviceList = sysConfigService.selectConfigValueByKey("iot_ignor_status_devicelist");
+        String ignorStatusDeviceList = sysConfigService.selectConfigValueByCommonKey("iot_ignor_status_devicelist");
         String[] ignorStatusDevBids = ignorStatusDeviceList.split(",");
         //判断devbid是否在忽略状态设备列表中
         for(String ignorDevBid : ignorStatusDevBids){

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/mq/IotsMqReceiver.java

@@ -107,7 +107,7 @@ public class IotsMqReceiver {
             MqMsg mqMsg = JSONObject.parseObject(content, MqMsg.class);
             log.info("【IOTS】收到iotm的 设备创建消息 messageBody ={} | {} ", mqMsg.getCmd(), mqMsg.getData());
             Map<String, List<String>> serviceDeviceMap = getServiceDeviceMap(mqMsg);
-            mqttManager.subscribeByBatch(serviceDeviceMap);
+            //mqttManager.topicBatchUnSubscribeDevices(serviceDeviceMap);
 
         } catch (Exception e) {
             log.error("【IOTS】【接收消息:设备创建】【异常】", e);
@@ -123,7 +123,7 @@ public class IotsMqReceiver {
             MqMsg mqMsg = JSONObject.parseObject(s, MqMsg.class);
             log.info("【IOTS】收到iotm的 设备删除消息 messageBody ={} | {} ", mqMsg.getCmd(), mqMsg.getData());
             Map<String, List<String>> serviceDeviceMap = getServiceDeviceMap(mqMsg);
-            mqttManager.unsubscribeByBatch(serviceDeviceMap);
+            //mqttManager.topicBatchUnSubscribeDevices(serviceDeviceMap);
 
         } catch (Exception e) {
             log.error("【IOTS】【接收消息:设备删除】【异常】", e);

+ 5 - 4
src/main/java/com/yunfeiyun/agmp/iots/service/IotStatusService.java

@@ -175,6 +175,7 @@ public class IotStatusService {
      * @param devType
      * @throws MqttException
      */
+    @Deprecated
     void reConnection(String serviceName, String devCode, String devName, String devType) throws MqttException {
         // 用于复用前人写的手动订阅和取消,虽然我们这里只有一个id
         Map<String, List<String>> serviceNameMap = new HashMap<>();
@@ -184,14 +185,14 @@ public class IotStatusService {
         // 最多重连20次,策略待定
         if (stringLongMap.containsKey(devCode)) {
             if (stringLongMap.get(devCode) < 20) {
-                mqttManager.unsubscribeByBatch(serviceNameMap);
-                mqttManager.subscribeByBatch(serviceNameMap);
+                //mqttManager.unsubscribeByBatch(serviceNameMap);
+                //mqttManager.subscribeByBatch(serviceNameMap);
                 statCount(devCode, devName);
                 log.info("【设备检测】【异常】【重连】:设备:devName: {} ,devCode: {}, devType:{}", devName, devCode, devType);
             }
         } else {
-            mqttManager.unsubscribeByBatch(serviceNameMap);
-            mqttManager.subscribeByBatch(serviceNameMap);
+            //mqttManager.unsubscribeByBatch(serviceNameMap);
+            //mqttManager.subscribeByBatch(serviceNameMap);
             statCount(devCode, devName);
             log.info("【设备检测】【异常】【重连】:设备:devName: {} ,devCode: {}, devType:{}", devName, devCode, devType);
         }