瀏覽代碼

设备离线后重新订阅

yf_zn 1 年之前
父節點
當前提交
71090b3486

+ 5 - 5
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/checker/CmdResultCheckService.java

@@ -6,8 +6,8 @@ import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
-import com.yunfeiyun.agmp.iots.service.checker.EzvizAbstractCmdResultChecker;
-import com.yunfeiyun.agmp.iots.service.checker.YfCbdAbstractCmdResultChecker;
+import com.yunfeiyun.agmp.iots.service.checker.EzvizCmdResultChecker;
+import com.yunfeiyun.agmp.iots.service.checker.YfCbdCmdResultChecker;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.util.TextUtils;
 import org.apache.http.util.TextUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -54,7 +54,7 @@ public class CmdResultCheckService {
     public void addChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
     public void addChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
 
 
         String deviceID = cmdExecModel.getCmdModel().getDeviceId();
         String deviceID = cmdExecModel.getCmdModel().getDeviceId();
-        log.info("任务id【{}】Checker : 创建, 设备id:{}, 指令:{} 类型:{}", cmdExecModel.getTaskUuid(), deviceID, cmdExecModel.getCmdModel().getCmdDistribution(),iotDevice.getDevtypeBid());
+        log.info("任务id【{}】Checker : 创建, 设备id:{}, 指令:{} 类型:{}", cmdExecModel.getTaskUuid(), deviceID, cmdExecModel.getCmdModel().getCmdDistribution(), iotDevice.getDevtypeBid());
         //这里需要该,根据新的判断逻辑,创建对应的checker
         //这里需要该,根据新的判断逻辑,创建对应的checker
         AbstractCmdResultChecker abstractCmdResultChecker = newChecker(iotDevice, cmdExecModel);
         AbstractCmdResultChecker abstractCmdResultChecker = newChecker(iotDevice, cmdExecModel);
         log.info("任务id【{}】,创建结果:{}", cmdExecModel.getTaskUuid(), abstractCmdResultChecker);
         log.info("任务id【{}】,创建结果:{}", cmdExecModel.getTaskUuid(), abstractCmdResultChecker);
@@ -197,9 +197,9 @@ public class CmdResultCheckService {
      */
      */
     private AbstractCmdResultChecker newChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
     private AbstractCmdResultChecker newChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
         if (IotDeviceDictConst.TYPE_YF_CBD.equals(iotDevice.getDevtypeBid()) || IotDeviceDictConst.TYPE_YF_GKCBD.equals(iotDevice.getDevtypeBid())) {
         if (IotDeviceDictConst.TYPE_YF_CBD.equals(iotDevice.getDevtypeBid()) || IotDeviceDictConst.TYPE_YF_GKCBD.equals(iotDevice.getDevtypeBid())) {
-            return new YfCbdAbstractCmdResultChecker(cmdExecModel);
+            return new YfCbdCmdResultChecker(cmdExecModel);
         } else if (IotDeviceDictConst.TYPE_EZVIZ_JKSB.equals(iotDevice.getDevtypeBid())) {
         } else if (IotDeviceDictConst.TYPE_EZVIZ_JKSB.equals(iotDevice.getDevtypeBid())) {
-            return new EzvizAbstractCmdResultChecker(cmdExecModel);
+            return new EzvizCmdResultChecker(cmdExecModel);
         }
         }
         return null;
         return null;
     }
     }

+ 35 - 15
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -10,11 +10,10 @@ import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
-import com.yunfeiyun.agmp.iots.common.modal.TosDevicetypeResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
-import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.service.BusinessCoreService;
 import com.yunfeiyun.agmp.iots.service.BusinessCoreService;
+import com.yunfeiyun.agmp.iots.task.IotStatusService;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.util.TextUtils;
 import org.apache.http.util.TextUtils;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -46,6 +45,8 @@ public class ConnectionManager {
     private DeviceTopicService deviceTopicService;
     private DeviceTopicService deviceTopicService;
     @Resource
     @Resource
     private DeviceconnCacheService deviceconnCacheService;
     private DeviceconnCacheService deviceconnCacheService;
+    @Autowired
+    private IotStatusService iotStatusService;
 
 
 
 
     /**
     /**
@@ -60,7 +61,7 @@ public class ConnectionManager {
         log.info("【初始化】设备型号 构建链接 协议: {}个", iotDeviceConnResVoList.size());
         log.info("【初始化】设备型号 构建链接 协议: {}个", iotDeviceConnResVoList.size());
         for (IotDeviceconnResVo iotDeviceconnResVo : iotDeviceConnResVoList) {
         for (IotDeviceconnResVo iotDeviceconnResVo : iotDeviceConnResVoList) {
             deviceconnCacheService.setCache(iotDeviceconnResVo);
             deviceconnCacheService.setCache(iotDeviceconnResVo);
-            log.info("【初始化】【开始】协议加载,连接名称:{},厂家:{},类型:{} ,配置:{}", iotDeviceconnResVo.getDevconnName(),iotDeviceconnResVo.getFirmName(), iotDeviceconnResVo.getDevtypeBid(), iotDeviceconnResVo.getDevconnConfig());
+            log.info("【初始化】【开始】协议加载,连接名称:{},厂家:{},类型:{} ,配置:{}", iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getFirmName(), iotDeviceconnResVo.getDevtypeBid(), iotDeviceconnResVo.getDevconnConfig());
             //将配置信息转换成jsonObject,这是个数组
             //将配置信息转换成jsonObject,这是个数组
             JSONArray jsonConfig = parseConfigJson(iotDeviceconnResVo.getDevconnConfig());
             JSONArray jsonConfig = parseConfigJson(iotDeviceconnResVo.getDevconnConfig());
 
 
@@ -133,10 +134,11 @@ public class ConnectionManager {
         }
         }
         return jsonConfig;
         return jsonConfig;
     }
     }
+
     /**
     /**
      * 设备创建时,初始化设备连接
      * 设备创建时,初始化设备连接
-     * */
-    public void createDeviceHandle(IotDevice iotDevice) throws Exception {
+     */
+    public void createDeviceHandle(IotDevice iotDevice) {
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
         JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
         for (Object connItemObject : connItemArray) {
         for (Object connItemObject : connItemArray) {
@@ -169,9 +171,10 @@ public class ConnectionManager {
             }
             }
         }
         }
     }
     }
+
     /**
     /**
      * 设备删除时,删除设备连接
      * 设备删除时,删除设备连接
-     * */
+     */
     public void deleteDeviceHandle(IotDevice iotDevice) {
     public void deleteDeviceHandle(IotDevice iotDevice) {
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
         JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
         JSONArray connItemArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
@@ -205,17 +208,19 @@ public class ConnectionManager {
             }
             }
         }
         }
     }
     }
+
     /**
     /**
      * 设备更新时,先删除旧设备连接,再创建新设备连接
      * 设备更新时,先删除旧设备连接,再创建新设备连接
-     * */
+     */
     public void editDeviceHandle(IotDeviceEditMqModel iotDeviceEditMqModel) throws Exception {
     public void editDeviceHandle(IotDeviceEditMqModel iotDeviceEditMqModel) throws Exception {
         deleteDeviceHandle(iotDeviceEditMqModel.getOldIotDevice());
         deleteDeviceHandle(iotDeviceEditMqModel.getOldIotDevice());
         createDeviceHandle(iotDeviceEditMqModel.getNewIotDevice());
         createDeviceHandle(iotDeviceEditMqModel.getNewIotDevice());
     }
     }
+
     /**
     /**
      * 连接信息创建,初始化连接信息
      * 连接信息创建,初始化连接信息
-     * */
-    public void createIotDeviceconnHandle(IotDeviceconn iotDeviceconn){
+     */
+    public void createIotDeviceconnHandle(IotDeviceconn iotDeviceconn) {
         log.info("【初始化】【开始】协议加载,连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
         log.info("【初始化】【开始】协议加载,连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
         //将配置信息转换成jsonObject,这是个数组
         //将配置信息转换成jsonObject,这是个数组
         JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
         JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
@@ -225,16 +230,17 @@ public class ConnectionManager {
             return;
             return;
         }
         }
         IotDeviceconnResVo iotDeviceconnResVo = new IotDeviceconnResVo();
         IotDeviceconnResVo iotDeviceconnResVo = new IotDeviceconnResVo();
-        BeanUtils.copyProperties(iotDeviceconn,iotDeviceconnResVo);
+        BeanUtils.copyProperties(iotDeviceconn, iotDeviceconnResVo);
         // 遍历多个配置
         // 遍历多个配置
         for (int j = 0; j < jsonConfig.size(); j++) {
         for (int j = 0; j < jsonConfig.size(); j++) {
             buildSingleMqttCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
             buildSingleMqttCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
         }
         }
     }
     }
+
     /**
     /**
      * 连接信息删除,删除连接信息
      * 连接信息删除,删除连接信息
-     * */
-    public void deleteIotDeviceconnHandle(IotDeviceconn iotDeviceconn){
+     */
+    public void deleteIotDeviceconnHandle(IotDeviceconn iotDeviceconn) {
         log.info("【连接信息删除】连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
         log.info("【连接信息删除】连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
         deviceconnCacheService.deleteCache(iotDeviceconn);
         deviceconnCacheService.deleteCache(iotDeviceconn);
         //将配置信息转换成jsonObject,这是个数组
         //将配置信息转换成jsonObject,这是个数组
@@ -244,7 +250,7 @@ public class ConnectionManager {
             return;
             return;
         }
         }
         IotDeviceconnResVo iotDeviceconnResVo = new IotDeviceconnResVo();
         IotDeviceconnResVo iotDeviceconnResVo = new IotDeviceconnResVo();
-        BeanUtils.copyProperties(iotDeviceconn,iotDeviceconnResVo);
+        BeanUtils.copyProperties(iotDeviceconn, iotDeviceconnResVo);
         // 遍历多个配置
         // 遍历多个配置
         for (int j = 0; j < jsonConfig.size(); j++) {
         for (int j = 0; j < jsonConfig.size(); j++) {
             JSONObject config = jsonConfig.getJSONObject(j);
             JSONObject config = jsonConfig.getJSONObject(j);
@@ -278,14 +284,21 @@ public class ConnectionManager {
             }
             }
         }
         }
     }
     }
+
     /**
     /**
      * 连接信息编辑,先删除连接信息,再初始化连接信息
      * 连接信息编辑,先删除连接信息,再初始化连接信息
-     * */
-    public void editIotDeviceconnHandle(IotDeviceconn iotDeviceconn){
+     */
+    public void editIotDeviceconnHandle(IotDeviceconn iotDeviceconn) {
         deleteIotDeviceconnHandle(iotDeviceconn);
         deleteIotDeviceconnHandle(iotDeviceconn);
         createIotDeviceconnHandle(iotDeviceconn);
         createIotDeviceconnHandle(iotDeviceconn);
     }
     }
 
 
+    /**
+     * 对该设备进行订阅
+     *
+     * @param iotDevice
+     * @throws Exception
+     */
     private void mqttDeviceCreateHandle(IotDevice iotDevice) throws Exception {
     private void mqttDeviceCreateHandle(IotDevice iotDevice) throws Exception {
         String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
         String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
@@ -305,6 +318,12 @@ public class ConnectionManager {
         httpManager.deviceCreateHandle(iotDevice);
         httpManager.deviceCreateHandle(iotDevice);
     }
     }
 
 
+    /**
+     * 删除该设备的订阅
+     *
+     * @param iotDevice
+     * @throws Exception
+     */
     private void mqttDeleteDeviceHandle(IotDevice iotDevice) throws Exception {
     private void mqttDeleteDeviceHandle(IotDevice iotDevice) throws Exception {
         String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
         String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
@@ -323,5 +342,6 @@ public class ConnectionManager {
     private void httpDeleteDeviceHandle(IotDevice iotDevice) {
     private void httpDeleteDeviceHandle(IotDevice iotDevice) {
         httpManager.deviceCreateHandle(iotDevice);
         httpManager.deviceCreateHandle(iotDevice);
     }
     }
+
 }
 }
 
 

+ 0 - 75
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -486,81 +486,6 @@ public class MqttManager {
 
 
     //订阅相关结束############################################################################
     //订阅相关结束############################################################################
 
 
-
-    /**
-     * @deprecated 逻辑需要基于connectionId梳理
-     * <p>
-     * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
-     */
-    @Deprecated
-    public void reSubscribe() {
-        throwDeprecatedMethod("重新订阅逻辑不完善,需要梳理构建新的逻辑");
-//        IotFirmdev iotFirmdev = new IotFirmdev();
-//        List<IotFirmdevResVo> iotFirmdevResVos = iotFirmdevService.selectIotFirmdevList(iotFirmdev);
-//        for (IotFirmdevResVo iotDeviceconnResVo : iotFirmdevResVos) {
-//            try {
-//                JSONObject jsonConfig = JSONObject.parseObject(iotDeviceconnResVo.getFirmdevCfg());
-//                String type = jsonConfig.getString("type");
-//                if (!Objects.equals(type, "mqtt")) {
-//                    continue;
-//                }
-//                startSubscribe(iotDeviceconnResVo, jsonConfig);
-//            } catch (Exception e) {
-//                log.error("【设备重新订阅】【订阅】【重连】 解析配置文件错误: \n" + iotDeviceconnResVo.getFirmdevCfg() + "\n" + e);
-//            }
-//        }
-    }
-
-    /**
-     * @param iotDeviceconnResVo
-     * @param jsonConfig
-     * @deprecated 逻辑需要基于connectionId梳理
-     * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
-     */
-    @Deprecated
-    private void startSubscribe(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) {
-        log.info("【设备重新订阅】【订阅】【重连】:设备:{} {}", iotDeviceconnResVo, jsonConfig);
-        MqttConfig cfgYf = buildMqttConfig(iotDeviceconnResVo, jsonConfig);
-        String firmBizId = cfgYf.getFirmBizId();
-        String serviceName = cfgYf.getServiceName();
-        String deviceTypeId = cfgYf.getDeviceTypeBizId();
-        String connectionId = "获取链接id";
-        long nowTime = DateUtils.getNowDate().getTime() - (2 * 60 * 60 * 1000);
-        String devUpdateddate = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, new Date(nowTime));
-
-        IotDevice selectIotDevice = new IotDevice();
-        selectIotDevice.setFirmBid(firmBizId);
-        selectIotDevice.setDevtypeBid(deviceTypeId);
-        selectIotDevice.setDevUpdateddate(devUpdateddate);
-
-        List<IotDevice> iotDeviceList = iIotDeviceService.selectIotDeviceList(selectIotDevice);
-        if (iotDeviceList.isEmpty()) {
-            log.warn("【设备重新订阅】【订阅】【重连】:设备:{} 没有找到设备", serviceName);
-            return;
-        }
-
-        String[] deviceId = iotDeviceList.stream().map(IotDevice::getDevBid).toArray(String[]::new);
-        String[] topics = null;// deviceTopicService.getBatchTopic(serviceName, deviceId);
-        if (topics == null || topics.length == 0) {
-            log.warn("【设备重新订阅】【订阅】【重连】:设备:{} 没有找到topic", serviceName);
-            return;
-        }
-
-        MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
-        try {
-            MqttClient mqttClient = mqttCore.getClient();
-            if (!mqttClient.isConnected()) {
-                buildMqttConnection(iotDeviceconnResVo, jsonConfig);
-            } else {
-                mqttCore.unsubscribe(topics);
-                mqttCore.subscribe(topics);
-            }
-
-        } catch (Exception e) {
-            log.error("【设备重新订阅】【订阅】【重连】:设备:{} {} ", topics, mqttCore.getServiceType());
-        }
-    }
-
     /**
     /**
      * 根据topic 获取id
      * 根据topic 获取id
      *
      *

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/service/checker/EzvizAbstractCmdResultChecker.java

@@ -9,9 +9,9 @@ import lombok.extern.slf4j.Slf4j;
 
 
 
 
 @Slf4j
 @Slf4j
-public class EzvizAbstractCmdResultChecker extends AbstractCmdResultChecker {
+public class EzvizCmdResultChecker extends AbstractCmdResultChecker {
 
 
-    public EzvizAbstractCmdResultChecker(CmdExecModel cmdExecModel) {
+    public EzvizCmdResultChecker(CmdExecModel cmdExecModel) {
         super(cmdExecModel);
         super(cmdExecModel);
     }
     }
 
 

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/service/checker/YfBzyAbstractCmdResultChecker.java

@@ -15,10 +15,10 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.Set;
 
 
 @Slf4j
 @Slf4j
-public class YfBzyAbstractCmdResultChecker extends AbstractCmdResultChecker {
+public class YfBzyCmdResultChecker extends AbstractCmdResultChecker {
 
 
 
 
-    public YfBzyAbstractCmdResultChecker(CmdExecModel cmdExecModel) {
+    public YfBzyCmdResultChecker(CmdExecModel cmdExecModel) {
         super(cmdExecModel);
         super(cmdExecModel);
     }
     }
 
 

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/service/checker/YfCbdAbstractCmdResultChecker.java

@@ -15,10 +15,10 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.Set;
 
 
 @Slf4j
 @Slf4j
-public class YfCbdAbstractCmdResultChecker extends AbstractCmdResultChecker {
+public class YfCbdCmdResultChecker extends AbstractCmdResultChecker {
 
 
 
 
-    public YfCbdAbstractCmdResultChecker(CmdExecModel cmdExecModel) {
+    public YfCbdCmdResultChecker(CmdExecModel cmdExecModel) {
         super(cmdExecModel);
         super(cmdExecModel);
     }
     }
 
 

+ 14 - 3
src/main/java/com/yunfeiyun/agmp/iots/task/IotStatusService.java

@@ -2,14 +2,15 @@ package com.yunfeiyun.agmp.iots.task;
 
 
 import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
+import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
 import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
 import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.model.device.IotDeviceStatusResVo;
 import com.yunfeiyun.agmp.iot.common.model.device.IotDeviceStatusResVo;
+import com.yunfeiyun.agmp.iots.core.manager.ConnectionManager;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceconfigService;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceconfigService;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
@@ -18,7 +19,6 @@ import javax.annotation.PostConstruct;
 import java.util.*;
 import java.util.*;
 
 
 /**
 /**
- * 【78环境测试中】
  * 负责iots 与iotm 之间的状态通讯
  * 负责iots 与iotm 之间的状态通讯
  * 1. 先处理设备数据状态异常问题
  * 1. 先处理设备数据状态异常问题
  */
  */
@@ -77,7 +77,7 @@ public class IotStatusService {
      * 定期根据类型查设备最新设备数据,是否长时间不上报。
      * 定期根据类型查设备最新设备数据,是否长时间不上报。
      */
      */
     @Scheduled(cron = "0 0 */1 * * ?")
     @Scheduled(cron = "0 0 */1 * * ?")
-    public void validateStatusByDevType(){
+    public void validateStatusByDevType() {
         printMqttStatus();
         printMqttStatus();
         Iterator<String> iterator = validateDeviceType.iterator();
         Iterator<String> iterator = validateDeviceType.iterator();
         while (iterator.hasNext()) {
         while (iterator.hasNext()) {
@@ -126,6 +126,7 @@ public class IotStatusService {
                         iotDevice.setDevModifieddate(DateUtils.dateTimeNow());
                         iotDevice.setDevModifieddate(DateUtils.dateTimeNow());
                         iIotDeviceService.updateIotDevice(iotDevice);
                         iIotDeviceService.updateIotDevice(iotDevice);
                         log.info("【设备检测】【设备状态异常更新-设备类型{} 】设备标识:{} 设备编号 {} 设备名称 {} 异常状态 {}", type, devBid, devCode, devName, iotDeviceStatusResVo.getDevStatus());
                         log.info("【设备检测】【设备状态异常更新-设备类型{} 】设备标识:{} 设备编号 {} 设备名称 {} 异常状态 {}", type, devBid, devCode, devName, iotDeviceStatusResVo.getDevStatus());
+                        reCreateDeviceTopic(iotDevice);
                     }
                     }
                 }
                 }
             } catch (Exception e) {
             } catch (Exception e) {
@@ -135,6 +136,16 @@ public class IotStatusService {
         log.info("【设备检测结束 类型:{}】【 设备:{} 个#########################", type, iotDeviceStatusResVos.size());
         log.info("【设备检测结束 类型:{}】【 设备:{} 个#########################", type, iotDeviceStatusResVos.size());
     }
     }
 
 
+    /**
+     * 重新订阅和构建topic
+     *
+     * @param iotDevice
+     */
+    void reCreateDeviceTopic(IotDevice iotDevice) {
+        ConnectionManager connectionManager = SpringUtils.getBean(ConnectionManager.class);
+        connectionManager.deleteDeviceHandle(iotDevice);
+        connectionManager.createDeviceHandle(iotDevice);
+    }
 
 
     /**
     /**
      * 超过10分钟返回true
      * 超过10分钟返回true