Parcourir la source

规整checker逻辑,打开checker

yf_zn il y a 1 an
Parent
commit
1bfd526a2e

+ 0 - 28
src/main/java/com/yunfeiyun/agmp/iots/AgmpIotsApplication.java

@@ -24,34 +24,6 @@ public class AgmpIotsApplication {
     public static void main(String[] args) {
         SpringApplication.run(AgmpIotsApplication.class, args);
         log.info("物联网服务子系统启动成功");
-//        for (int i = 0; i < 1; i++) {
-//            try {
-//                Thread.sleep(10000);
-//                MqttTopicValue mqttTopicValue=new MqttTopicValue();
-//                mqttTopicValue.setDevCode(TestConst.deviceId);
-//                mqttTopicValue.setDevId("25470430-0191-1000-e000-50297f000001");
-//                log.info("我在这里模拟接收到iotm的消息,下发指令第{}次", i);
-//                SpringUtils.getBean(TestController.class).orderTest();
-//                // 取消订阅
-//                log.info("我在这里模拟解绑topic,下发指令第{}次", i);
-//                SpringUtils.getBean(MqttManager.class).topicSingleUnSubscribeDevice(TestConst.connectionId, TestConst.serviceName, mqttTopicValue);
-//                log.info("我在这里模拟解绑topic,等10s收到消息在解绑{}", i);
-//
-//                Thread.sleep(10000);
-//                log.info("我在这里模拟解绑后发消息,");
-//                SpringUtils.getBean(TestController.class).orderTest();
-//                // 取消订阅
-//                log.info("我在这里模拟解绑后再订阅");
-//
-//                SpringUtils.getBean(MqttManager.class).topicSingleSubscribeDevice(TestConst.connectionId, TestConst.serviceName, mqttTopicValue);
-//                log.info("我在这里模拟解绑后再订阅发消息,");
-//                SpringUtils.getBean(TestController.class).orderTest();
-//
-//            } catch (Exception e) {
-//                e.printStackTrace();
-//            }
-//
-//        }
     }
 
 

+ 13 - 32
src/main/java/com/yunfeiyun/agmp/iots/service/checker/CmdResultChecker.java

@@ -1,25 +1,12 @@
-package com.yunfeiyun.agmp.iots.service.checker;
+package com.yunfeiyun.agmp.iots.core.cmd.checker;
 
 
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
-import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdgroupStatus;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
-import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
-import com.yunfeiyun.agmp.iots.core.cmd.CmdUtil;
-import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
-import com.yunfeiyun.agmp.iots.core.cmd.core.task.CmdTaskModel;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
-import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
 
 /**
  * 单条指令执行结果检查器基类
@@ -36,37 +23,34 @@ import java.util.concurrent.ScheduledFuture;
  * 当单个“普通指令”执行出错、失败、超时时,认为整个任务执行失败;
  * 当最后一组指令(可能只有一个)全部执行成功时,认为整个任务执行成功。
  */
-@Data
 @Slf4j
-public class CmdResultChecker {
+public abstract class AbstractCmdResultChecker {
 
     /**
-     * 该条指令
+     * 执行的指令
      */
     private CmdExecModel cmdExecModel;
 
     /**
-     * 知道结果或等待结果超时后,变更此标记为true
+     * 是否完成:true / false
      */
     private boolean isDone;
 
-    public CmdResultChecker(CmdExecModel cmdExecModel) {
-        this.cmdExecModel = cmdExecModel;
+    public CmdExecModel getCmdExecModel() {
+        return cmdExecModel;
     }
 
-    public void configRetry(int retry) {
 
+    public boolean isDone() {
+        return isDone;
     }
 
-    public void configTimedout(int timedout) {
+    private AbstractCmdResultChecker() {
 
     }
 
-    /**
-     * 有的设备需要发送查询指令,以触发设备立即上报数据
-     */
-    public void reqData() {
-
+    public AbstractCmdResultChecker(CmdExecModel cmdExecModel) {
+        this.cmdExecModel = cmdExecModel;
     }
 
 
@@ -80,7 +64,7 @@ public class CmdResultChecker {
         cmdExecModel.setCmdStatus(cmdStatus.getCode());
         cmdExecModel.setErrmsg(errMsg);
         //告诉checkservie我知道结果了
-        SpringUtils.getBean(CmdResultCheckService.class).onChecked(cmdExecModel);
+        SpringUtils.getBean(CmdResultCheckService.class).finished(cmdExecModel);
     }
 
 
@@ -90,9 +74,6 @@ public class CmdResultChecker {
      *
      * @param jobjMsg
      */
-    public void check(IotDevice iotDevice, JSONObject jobjMsg) {
-
-
-    }
+    public abstract void check(IotDevice iotDevice, JSONObject jobjMsg);
 
 }

+ 5 - 6
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/core/CmdCheckScheduledFuture.java

@@ -1,16 +1,16 @@
-package com.yunfeiyun.agmp.iots.core.cmd.core;
+package com.yunfeiyun.agmp.iots.core.cmd.checker;
 
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
 
 import java.util.concurrent.*;
 
-public class CmdCheckScheduledFuture{
-
+/**
+ * 超时线程对象
+ */
+public class CmdCheckScheduledFuture {
 
     private CmdExecModel cmdExecModel;
 
-
-
     private ScheduledFuture future;
 
 
@@ -20,7 +20,6 @@ public class CmdCheckScheduledFuture{
     }
 
 
-
     public CmdExecModel getCmdExecModel() {
         return cmdExecModel;
     }

+ 207 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/checker/CmdResultCheckService.java

@@ -0,0 +1,207 @@
+package com.yunfeiyun.agmp.iots.core.cmd.checker;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
+import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
+import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
+import com.yunfeiyun.agmp.iots.service.checker.EzvizAbstractCmdResultChecker;
+import com.yunfeiyun.agmp.iots.service.checker.YfCbdAbstractCmdResultChecker;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.util.TextUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * 核心checker服务类,用于检查和管理每个指令的checker 执行情况
+ */
+@Slf4j
+@Service
+public class CmdResultCheckService {
+
+    @Autowired
+    private CmdDispatcherService cmdDispatcherService;
+
+
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10);
+
+
+    /**
+     * 键:设备唯一标识 bid
+     * 值:对应的“执行结果检查器”实例  列表
+     * <p>
+     * 每个检查器实例关联的指令是唯一的
+     */
+    private ConcurrentHashMap<String, Map<String, AbstractCmdResultChecker>> hmChecker = new ConcurrentHashMap<>();
+
+    /**
+     * 超时处理线程暂存起来
+     * 键:同上
+     */
+    private ConcurrentHashMap<String, Map<String, CmdCheckScheduledFuture>> hmFuture = new ConcurrentHashMap<>();
+
+
+    /**
+     * 每条指令执行后,添加并启动一个对应的“执行结果检查器”
+     *
+     * @param cmdExecModel
+     */
+    public void addChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
+
+        String deviceID = cmdExecModel.getCmdModel().getDeviceId();
+        log.info("任务id【{}】Checker : 创建, 设备id:{}, 指令:{} 类型:{}", cmdExecModel.getTaskUuid(), deviceID, cmdExecModel.getCmdModel().getCmdDistribution(),iotDevice.getDevtypeBid());
+        //这里需要该,根据新的判断逻辑,创建对应的checker
+        AbstractCmdResultChecker abstractCmdResultChecker = newChecker(iotDevice, cmdExecModel);
+        log.info("任务id【{}】,创建结果:{}", cmdExecModel.getTaskUuid(), abstractCmdResultChecker);
+
+        if (abstractCmdResultChecker == null) {
+            log.error("任务id【{}】,【无可用Checker】跳过");
+            return;
+        }
+        Map<String, AbstractCmdResultChecker> checkerMap = hmChecker.get(deviceID);
+        if (checkerMap != null) {
+            checkerMap.put(cmdExecModel.getCeBid(), abstractCmdResultChecker);
+        } else {
+            checkerMap = new HashMap<>();
+            checkerMap.put(cmdExecModel.getCeBid(), abstractCmdResultChecker);
+            hmChecker.put(deviceID, checkerMap);
+        }
+
+        //超时处理
+        ScheduledFuture<Boolean> scheduledFuture = scheduledThreadPoolExecutor.schedule(
+                new CmdResultTimedout(abstractCmdResultChecker)
+                , cmdExecModel.getCmdModel().getTimeout()
+                , TimeUnit.SECONDS
+        );
+        CmdCheckScheduledFuture future = new CmdCheckScheduledFuture(cmdExecModel, scheduledFuture);
+        Map<String, CmdCheckScheduledFuture> checkScheduledFutureMap = hmFuture.get(deviceID);
+        if (checkScheduledFutureMap != null) {
+            checkScheduledFutureMap.put(cmdExecModel.getCeBid(), future);
+        } else {
+            checkScheduledFutureMap = new ConcurrentHashMap<>();
+            checkScheduledFutureMap.put(cmdExecModel.getCeBid(), future);
+            hmFuture.put(deviceID, checkScheduledFutureMap);
+        }
+        log.debug("任务id【{}】,指令执行结果检查器 已添加  指令:{}", cmdExecModel.getTaskUuid(), cmdExecModel);
+    }
+
+    /**
+     * 标记该指令任务彻底结束,进行处理工作
+     * 1. 删除超时线程
+     * 2. 删除checker内容
+     * 3. 标记数据库指令发送task,发送回执
+     *
+     * @param cmdExecModel
+     */
+    public void finished(CmdExecModel cmdExecModel) {
+        if (cmdExecModel == null) {
+            return;
+        }
+        if (TextUtils.isEmpty(cmdExecModel.getCeBid())) {
+            return;
+        }
+        if ((CmdStatus.CMD_STATUS_SUCC.getCode()).equals(cmdExecModel.getCmdStatus())) {
+            CmdCheckScheduledFuture future = getFutureByCeid(cmdExecModel.getCmdModel().getDeviceId(), cmdExecModel.getCeBid());
+            if (future != null) {
+                log.info("取消 超时处理线程");
+                future.getFuture().cancel(true);
+                removeFuture(cmdExecModel);
+            }
+        }
+        removeChecker(cmdExecModel);
+        //告诉 指令执行器 当前指令的执行结果已经知道了
+        cmdDispatcherService.onCmdResult(cmdExecModel);
+    }
+
+    /**
+     * 根据设备的“上报数据数据”,进行检查,是否成功
+     *
+     * @param iotDevice
+     * @param jobMsg
+     */
+    public void beginCheck(IotDevice iotDevice, JSONObject jobMsg) {
+        //找到该设备的checker,执行check
+        if (iotDevice == null) {
+            return;
+        }
+        //遍历该设备的所有checker 调该设备的“指令执行结果检查器”的check方法
+        Map<String, AbstractCmdResultChecker> checkerList = hmChecker.get(iotDevice.getDevBid());
+        if (checkerList == null) {
+            return;
+        }
+        for (AbstractCmdResultChecker checker : checkerList.values()) {
+            //检查是否需要检查
+            if (!checker.getCmdExecModel().getCmdModel().getNeedcheckStatus()) {
+                continue;
+            }
+
+            if (checker.isDone()) {
+                log.info("checker {} 已经done,不能再次check() ", checker);
+            } else {
+                log.info("cheker {}" + checker);
+                checker.check(iotDevice, jobMsg);
+            }
+        }
+    }
+
+
+    /**
+     * 从list中拿出对应的实例
+     *
+     * @param devBid 设备标识
+     * @param ceBid  “单条指令执行”的唯一标识 or
+     * @return
+     */
+    private CmdCheckScheduledFuture getFutureByCeid(String devBid, String ceBid) {
+        Map<String, CmdCheckScheduledFuture> list = hmFuture.get(devBid);
+        if (list != null) {
+            for (CmdCheckScheduledFuture item : list.values()) {
+                if (ceBid.equals(item.getCmdExecModel().getCeBid())) {
+                    return item;
+                }
+            }
+        }
+        log.info("找不到对应的 future   设备:{}  指令:{}", devBid, ceBid);
+        return null;
+    }
+
+    private void removeChecker(CmdExecModel cmdExecModel) {
+        String ceBid = cmdExecModel.getCeBid();
+        Map<String, AbstractCmdResultChecker> list = hmChecker.get(cmdExecModel.getCmdModel().getDeviceId());
+        if (list != null) {
+            log.info("成功移除 {}的checker", ceBid);
+            list.remove(ceBid);
+        }
+    }
+
+    private void removeFuture(CmdExecModel cmdExecModel) {
+        String ceBid = cmdExecModel.getCeBid();
+        Map<String, CmdCheckScheduledFuture> list = hmFuture.get(cmdExecModel.getCmdModel().getDeviceId());
+        if (list != null) {
+            log.info("成功移除 {}的checkerFuturer", ceBid);
+            list.remove(ceBid);
+        }
+    }
+
+
+    /**
+     * 根据设备类型等信息,new对应的子类实例
+     *
+     * @param cmdExecModel
+     * @return
+     */
+    private AbstractCmdResultChecker newChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
+        if (IotDeviceDictConst.TYPE_YF_CBD.equals(iotDevice.getDevtypeBid()) || IotDeviceDictConst.TYPE_YF_GKCBD.equals(iotDevice.getDevtypeBid())) {
+            return new YfCbdAbstractCmdResultChecker(cmdExecModel);
+        } else if (IotDeviceDictConst.TYPE_EZVIZ_JKSB.equals(iotDevice.getDevtypeBid())) {
+            return new EzvizAbstractCmdResultChecker(cmdExecModel);
+        }
+        return null;
+    }
+
+}

+ 32 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/checker/CmdResultTimedout.java

@@ -0,0 +1,32 @@
+package com.yunfeiyun.agmp.iots.core.cmd.checker;
+
+import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.Callable;
+
+/**
+ * 超时线程类
+ */
+@Slf4j
+public class CmdResultTimedout implements Callable<Boolean> {
+
+    private AbstractCmdResultChecker abstractCmdResultChecker;
+
+    public CmdResultTimedout(AbstractCmdResultChecker abstractCmdResultChecker) {
+        this.abstractCmdResultChecker = abstractCmdResultChecker;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+
+        log.debug("等待指令执行结果超时了!");
+
+        //执行到这里,说明超时了,要做的处理:
+        abstractCmdResultChecker.onResult(CmdStatus.CMD_STATUS_FAIL, "等待指令[" + abstractCmdResultChecker.getCmdExecModel().getCmdModel().getTag() + "]执行结果超时");
+
+
+        return true;
+    }
+
+}

+ 0 - 30
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/core/CmdResultTimedout.java

@@ -1,30 +0,0 @@
-package com.yunfeiyun.agmp.iots.core.cmd.core;
-
-import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
-import com.yunfeiyun.agmp.iots.service.checker.CmdResultChecker;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.concurrent.Callable;
-
-@Slf4j
-public class CmdResultTimedout implements Callable<Boolean> {
-
-    private CmdResultChecker cmdResultChecker;
-
-    public CmdResultTimedout(CmdResultChecker cmdResultChecker) {
-        this.cmdResultChecker = cmdResultChecker;
-    }
-
-    @Override
-    public Boolean call() throws Exception {
-
-        log.debug("等待指令执行结果超时了!");
-
-        //执行到这里,说明超时了,要做的处理:
-        cmdResultChecker.onResult(CmdStatus.CMD_STATUS_FAIL,"等待指令["+cmdResultChecker.getCmdExecModel().getCmdModel().getTag()+"]执行结果超时");
-
-
-        return true;
-    }
-
-}

+ 6 - 13
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/core/task/CmdTaskService.java

@@ -1,9 +1,7 @@
 package com.yunfeiyun.agmp.iots.core.cmd.core.task;
 
 import com.alibaba.fastjson2.JSONObject;
-import com.yunfeiyun.agmp.common.constant.ErrorCode;
 import com.yunfeiyun.agmp.common.enums.RedisCacheKey;
-import com.yunfeiyun.agmp.common.exception.BizException;
 import com.yunfeiyun.agmp.common.framework.manager.RedisCacheManager;
 import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.common.utils.JSONUtils;
@@ -12,11 +10,9 @@ import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdDef;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdgroupStatus;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
 import com.yunfeiyun.agmp.iot.common.domain.IotCmdexec;
 import com.yunfeiyun.agmp.iot.common.domain.IotCmdtask;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
-import com.yunfeiyun.agmp.iot.common.domain.resvo.IotFirmdevResVo;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
 import com.yunfeiyun.agmp.iot.common.model.task.TaskResult;
@@ -29,7 +25,7 @@ import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
 
 import com.yunfeiyun.agmp.iots.service.*;
-import com.yunfeiyun.agmp.iots.service.checker.CmdResultCheckService;
+import com.yunfeiyun.agmp.iots.core.cmd.checker.CmdResultCheckService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -101,7 +97,7 @@ public class CmdTaskService {
                 cmdExecModel.setCmdStatus(CmdStatus.CMD_STATUS_FAIL.getCode());
                 cmdExecModel.setErrmsg("指令下发失败:设备:" + iotDevice.getDevCode() + "的对应厂家配置异常,请检查");
                 cmdExecModel.setCmdFinishTime(DateUtils.dateTimeNow());
-                cmdResultCheckService.onChecked(cmdExecModel);
+                cmdResultCheckService.finished(cmdExecModel);
                 throw new IotBizException(IotErrorCode.UNKNOWN_DEVICE_FIRM.getCode(), "设备:" + iotDevice.getDevCode() + "的对应厂家配置异常,请检查");
             }
             Device device = mqttManager.getDeviceHandler(serviceName);
@@ -109,7 +105,7 @@ public class CmdTaskService {
                 cmdExecModel.setCmdStatus(CmdStatus.CMD_STATUS_FAIL.getCode());
                 cmdExecModel.setErrmsg("指令下发失败:设备" + iotDevice.getDevCode() + "未找到对应设备处理器,serviceName:" + serviceName);
                 cmdExecModel.setCmdFinishTime(DateUtils.dateTimeNow());
-                cmdResultCheckService.onChecked(cmdExecModel);
+                cmdResultCheckService.finished(cmdExecModel);
                 // 设备厂家不存在,该异常只会发生于项目初始化时,缺少初始化数据,所以可直接抛出异常,
                 throw new IotBizException(IotErrorCode.UNKNOWN_DEVICE_FIRM.getCode(), "设备:" + iotDevice.getDevCode() + "未找到对应设备处理器,serviceName:" + serviceName);
             }
@@ -123,7 +119,7 @@ public class CmdTaskService {
                 cmdExecModel.setCmdStatus(CmdStatus.CMD_STATUS_FAIL.getCode());
                 cmdExecModel.setErrmsg("【调试专用】模拟该指令失败");
                 cmdExecModel.setCmdFinishTime(DateUtils.dateTimeNow());
-                //cmdResultCheckService.onChecked(cmdExecModel);
+                cmdResultCheckService.finished(cmdExecModel);
                 return;
             }
             try {
@@ -135,11 +131,10 @@ public class CmdTaskService {
                 iotCmdexecService.startCmdExecLog(cmdModel.getCeBid());
                 //对于不需要检查执行结果的指令,在指令发送成功后,直接回调检查结果成功回调
                 if (!cmdExecModel.getCmdModel().getNeedcheckStatus()) {
-                    //cmdResultChecker.onResult(CmdStatus.CMD_STATUS_SUCC,"");
                     cmdExecModel.setCmdStatus(CmdStatus.CMD_STATUS_SUCC.getCode());
                     cmdExecModel.setCmdFinishTime(DateUtils.dateTimeNow());
                     log.info("不需要检查执行结果的指令,直接回调【该指令已成功】 {}", cmdExecModel);
-                    //cmdResultCheckService.onChecked(cmdExecModel);
+                    cmdResultCheckService.finished(cmdExecModel);
                 }
 
             } catch (Exception e) {
@@ -151,7 +146,7 @@ public class CmdTaskService {
                 cmdModel.setClogSendresult("执行失败:" + e.getMessage());
                 iIotCmdlogService.insertErrorCmdlog(cmdModel);
                 log.info("发送该条指令时程序报错,直接回调【指令执行失败】 {}", cmdExecModel);
-                //cmdResultCheckService.onChecked(cmdExecModel);
+                cmdResultCheckService.finished(cmdExecModel);
                 return;
             }
 
@@ -238,8 +233,6 @@ public class CmdTaskService {
             List<SerialTaskModel> serialTaskModels = getSerialTaskModels(cmdExecModel, iotCmdtask);
             if (serialTaskModels.isEmpty()) {
                 log.error("执行完了");
-                // 该流程的触发原因:
-                // 1.任务已完成,设备多次上报指令执行结果
                 return;
             }
 

+ 0 - 4
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -6,7 +6,6 @@ import com.yunfeiyun.agmp.common.constant.ErrorCode;
 import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
-import com.yunfeiyun.agmp.iot.common.enums.DevType;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
@@ -16,13 +15,10 @@ import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttCore;
 import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
-import com.yunfeiyun.agmp.iots.service.checker.CmdResultCheckService;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;

+ 0 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -1,9 +1,7 @@
 package com.yunfeiyun.agmp.iots.core.mqtt.network;
 
-import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.device.common.Device;
-import com.yunfeiyun.agmp.iots.service.checker.CmdResultCheckService;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -5,7 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.exception.IotException;
-import com.yunfeiyun.agmp.iots.service.checker.CmdResultCheckService;
+import com.yunfeiyun.agmp.iots.core.cmd.checker.CmdResultCheckService;
 import org.eclipse.paho.client.mqttv3.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +72,7 @@ public class MqttSubscriber implements MqttCallbackExtended {
             log.info("【上报数据:处理结果】{}", result);
             //如果是“设备属性”消息,进行执行结果检查
             if (device.isDeviceProps(obj)) {
-                SpringUtils.getBean(CmdResultCheckService.class).check(device.findIotDevice(topic, obj,mqttCore.getConnectionId()), obj);
+                SpringUtils.getBean(CmdResultCheckService.class).beginCheck(device.findIotDevice(topic, obj,mqttCore.getConnectionId()), obj);
             } else {
                 log.error("其它数据");
             }

+ 10 - 4
src/main/java/com/yunfeiyun/agmp/iots/device/common/Device.java

@@ -25,9 +25,14 @@ public interface Device {
      * @param orderCmdParam 该参数待定,还不知道需要哪些基础参数
      * @return
      */
-    //public Object receiveData(String topic, JSONObject cmdJson) throws Exception;
+    public Object receiveData(String topic, JSONObject cmdJson, String connectionId) throws Exception;
 
-    public Object receiveData(String topic, JSONObject cmdJson,String connectionId) throws Exception;
+    /**
+     * 根据上报的结果,判断这个是不是永远checker检查的数据,checker要检查数据下发的是否正确
+     *
+     * @param cmdJson
+     * @return
+     */
 
     public boolean isDeviceProps(JSONObject cmdJson);
 
@@ -38,9 +43,10 @@ public interface Device {
      * @param jobjMsg
      * @return
      */
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId);
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg, String connectionId);
+
     /**
      * 更新所有设备数据
-     * */
+     */
     public void sysAllDevice();
 }

+ 0 - 241
src/main/java/com/yunfeiyun/agmp/iots/service/checker/CmdResultCheckService.java

@@ -1,241 +0,0 @@
-package com.yunfeiyun.agmp.iots.service.checker;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
-import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
-import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
-import com.yunfeiyun.agmp.iots.config.TestConst;
-import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
-import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
-import com.yunfeiyun.agmp.iots.core.cmd.core.CmdCheckScheduledFuture;
-import com.yunfeiyun.agmp.iots.core.cmd.core.CmdResultTimedout;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.util.TextUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.concurrent.*;
-
-@Slf4j
-@Service
-public class CmdResultCheckService {
-
-    @Autowired
-    private CmdDispatcherService cmdDispatcherService;
-
-
-    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10);
-
-
-    /**
-     * 键:设备唯一标识 bid
-     * 值:对应的“执行结果检查器”实例  列表
-     * <p>
-     * 每个检查器实例关联的指令是唯一的
-     */
-    private ConcurrentHashMap<String, ConcurrentHashMap<String, CmdResultChecker>> hmChecker = new ConcurrentHashMap<>();
-
-    /**
-     * 超时处理线程暂存起来
-     * 键:同上
-     */
-    private ConcurrentHashMap<String, ConcurrentHashMap<String, CmdCheckScheduledFuture>> hmFuture = new ConcurrentHashMap<>();
-
-
-    /**
-     * 每条指令执行后,添加并启动一个对应的“执行结果检查器”
-     *
-     * @param cmdExecModel
-     */
-    public void addChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
-
-        String deviceID = cmdExecModel.getCmdModel().getDeviceId();
-        log.info("任务id【{}】Checker : 创建, 设备id:{}, 指令:{}", cmdExecModel.getTaskUuid(), deviceID, cmdExecModel.getCmdModel().getCmdDistribution());
-        //这里需要该,根据新的判断逻辑,创建对应的checker
-        CmdResultChecker cmdResultChecker = newChecker(iotDevice, cmdExecModel);
-        log.info("任务id【{}】,创建结果:{}", cmdExecModel.getTaskUuid(), cmdResultChecker);
-
-        if (cmdResultChecker == null) {
-            log.error("任务id【{}】,【指令结果检查器】创建异常!");
-            return;
-        }
-        ConcurrentHashMap<String, CmdResultChecker> checkerMap = hmChecker.get(deviceID);
-        if (checkerMap != null) {
-            checkerMap.put(cmdExecModel.getCeBid(), cmdResultChecker);
-            log.info("任务id【{}】,在已有list中添加item", cmdExecModel.getTaskUuid());
-        } else {
-            checkerMap = new ConcurrentHashMap<>();
-            checkerMap.put(cmdExecModel.getCeBid(), cmdResultChecker);
-            hmChecker.put(deviceID, checkerMap);
-            log.info("任务id【{}】,新建list,在里面添加一个item", cmdExecModel.getTaskUuid());
-        }
-
-        //超时处理
-        ScheduledFuture<Boolean> scheduledFuture = scheduledThreadPoolExecutor.schedule(
-                new CmdResultTimedout(cmdResultChecker)
-                , cmdExecModel.getCmdModel().getTimeout()
-                , TimeUnit.SECONDS
-        );
-        CmdCheckScheduledFuture future = new CmdCheckScheduledFuture(cmdExecModel, scheduledFuture);
-        ConcurrentHashMap<String, CmdCheckScheduledFuture> checkScheduledFutureMap = hmFuture.get(deviceID);
-        if (checkScheduledFutureMap != null) {
-            checkScheduledFutureMap.put(cmdExecModel.getCeBid(), future);
-        } else {
-            checkScheduledFutureMap = new ConcurrentHashMap<>();
-            checkScheduledFutureMap.put(cmdExecModel.getCeBid(), future);
-            hmFuture.put(deviceID, checkScheduledFutureMap);
-        }
-        log.debug("任务id【{}】,指令执行结果检查器 已添加  指令:{}", cmdExecModel.getTaskUuid(), cmdExecModel);
-    }
-
-    /**
-     * 检查完执行结果之后,下一步要做的事情
-     *
-     * @param cmdExecModel
-     */
-    public void onChecked(CmdExecModel cmdExecModel) {
-        if (cmdExecModel == null) {
-            log.error("cmdExceModel is null");
-            return;
-        }
-        if (TextUtils.isEmpty(cmdExecModel.getCeBid())) {
-            log.error("cmdExceModel 标识 empty");
-            return;
-        }
-        if ((CmdStatus.CMD_STATUS_SUCC.getCode()).equals(cmdExecModel.getCmdStatus())) {
-            CmdCheckScheduledFuture future = getFutureByCeid(cmdExecModel.getCmdModel().getDeviceId(), cmdExecModel.getCeBid());
-            if (future == null) {
-                log.error("找不到对应的 超时处理线程");
-            } else {
-                log.debug("取消 超时处理线程");
-                future.getFuture().cancel(true);
-                removeFuture(cmdExecModel);
-            }
-        }
-        removeChecker(cmdExecModel);
-        //告诉 指令执行器 当前指令的执行结果已经知道了
-        cmdDispatcherService.onCmdResult(cmdExecModel);
-    }
-
-    /**
-     * 根据设备的“最新数据”,进行check
-     *
-     * @param iotDevice
-     * @param jobjMSg
-     */
-    public void check(IotDevice iotDevice, JSONObject jobjMSg) {
-        //找到该设备的checker,执行check
-        if (iotDevice == null) {
-            log.error("参数 devcie: null");
-            return;
-        }
-        //遍历该设备的所有checker 调该设备的“指令执行结果检查器”的check方法
-        ConcurrentHashMap<String, CmdResultChecker> checkerList = hmChecker.get(iotDevice.getDevBid());
-        if (null != checkerList) {
-            for (CmdResultChecker checker : checkerList.values()) {
-
-                if (!checker.getCmdExecModel().getCmdModel().getNeedcheckStatus()) {
-                    continue;
-                }
-
-                if (checker.isDone()) {
-                    log.debug("checker {} 已经done,不能再次check() ", checker);
-                } else {
-                    log.debug("cheker {}" + checker);
-                    checker.check(iotDevice, jobjMSg);
-                }
-            }
-        }
-    }
-
-    /**
-     * 从list中拿出对应的实例
-     *
-     * @param devBid 设备标识
-     * @param ceBid  “单条指令执行”的唯一标识 or
-     * @return
-     */
-    private CmdResultChecker getCheckerByCeid(String devBid, String ceBid) {
-        ConcurrentHashMap<String, CmdResultChecker> list = hmChecker.get(devBid);
-        if (list != null) {
-            for (CmdResultChecker item : list.values()) {
-                if (ceBid.equals(item.getCmdExecModel().getCeBid())) {
-                    return item;
-                }
-            }
-        }
-        log.info("找不到对应的checker   设备:{}  指令:{}", devBid, ceBid);
-        return null;
-    }
-
-    /**
-     * 从list中拿出对应的实例
-     *
-     * @param devBid 设备标识
-     * @param ceBid  “单条指令执行”的唯一标识 or
-     * @return
-     */
-    private CmdCheckScheduledFuture getFutureByCeid(String devBid, String ceBid) {
-        ConcurrentHashMap<String, CmdCheckScheduledFuture> list = hmFuture.get(devBid);
-        if (list != null) {
-            for (CmdCheckScheduledFuture item : list.values()) {
-                if (ceBid.equals(item.getCmdExecModel().getCeBid())) {
-                    return item;
-                }
-            }
-        }
-        log.info("找不到对应的 future   设备:{}  指令:{}", devBid, ceBid);
-        return null;
-    }
-
-    private void removeChecker(CmdExecModel cmdExecModel) {
-        CmdResultChecker findItem = null;
-        ConcurrentHashMap<String, CmdResultChecker> list = hmChecker.get(cmdExecModel.getCmdModel().getDeviceId());
-        if (list != null) {
-            for (CmdResultChecker item : list.values()) {
-                log.info("设备标识{},当前指令执行bid{},当前检查器关联指令执行{}", cmdExecModel.getCmdModel().getDeviceId(), cmdExecModel.getCeBid(), item.getCmdExecModel());
-                if (cmdExecModel.getCeBid().equals(item.getCmdExecModel().getCeBid())) {
-                    findItem = item;
-                    break;
-                }
-            }
-            if (findItem != null) {
-                list.remove(findItem.getCmdExecModel().getCeBid());
-            }
-        }
-
-    }
-
-    private void removeFuture(CmdExecModel cmdExecModel) {
-        CmdCheckScheduledFuture findItem = null;
-        ConcurrentHashMap<String, CmdCheckScheduledFuture> list = hmFuture.get(cmdExecModel.getCmdModel().getDeviceId());
-        if (list != null) {
-            for (CmdCheckScheduledFuture item : list.values()) {
-                if (cmdExecModel.getCeBid().equals(item.getCmdExecModel().getCeBid())) {
-                    findItem = item;
-                    break;
-                }
-            }
-            if (findItem != null) {
-                list.remove(findItem.getCmdExecModel().getCeBid());
-            }
-        }
-    }
-
-
-    /**
-     * 根据设备类型等信息,new对应的子类实例
-     *
-     * @param cmdExecModel
-     * @return
-     */
-    private CmdResultChecker newChecker(IotDevice iotDevice, CmdExecModel cmdExecModel) {
-        if (IotDeviceDictConst.TYPE_YF_CBD.equals(iotDevice.getDevtypeBid()) || IotDeviceDictConst.TYPE_YF_GKCBD.equals(iotDevice.getDevtypeBid())) {
-            return new YfCbdCmdResultChecker(cmdExecModel);
-        } else if (IotDeviceDictConst.TYPE_EZVIZ_JKSB.equals(iotDevice.getDevtypeBid())) {
-            return new EzvizCmdResultChecker(cmdExecModel);
-        }
-        return null;
-    }
-
-}

+ 4 - 8
src/main/java/com/yunfeiyun/agmp/iots/service/checker/EzvizCmdResultChecker.java

@@ -3,20 +3,16 @@ package com.yunfeiyun.agmp.iots.service.checker;
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iots.core.cmd.checker.AbstractCmdResultChecker;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
 import lombok.extern.slf4j.Slf4j;
 
 
 @Slf4j
-public class EzvizCmdResultChecker extends CmdResultChecker {
-    public EzvizCmdResultChecker(CmdExecModel cmdExecModel) {
-        super(cmdExecModel);
-    }
-
-
-    @Override
-    public void reqData() {
+public class EzvizAbstractCmdResultChecker extends AbstractCmdResultChecker {
 
+    public EzvizAbstractCmdResultChecker(CmdExecModel cmdExecModel) {
+        super(cmdExecModel);
     }
 
     /**

+ 3 - 3
src/main/java/com/yunfeiyun/agmp/iots/service/checker/YfBzyCmdResultChecker.java

@@ -6,7 +6,7 @@ import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdDef;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
-import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
+import com.yunfeiyun.agmp.iots.core.cmd.checker.AbstractCmdResultChecker;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
 import com.yunfeiyun.agmp.iots.service.IIotCmdlogService;
 import lombok.extern.slf4j.Slf4j;
@@ -15,10 +15,10 @@ import java.util.Iterator;
 import java.util.Set;
 
 @Slf4j
-public class YfBzyCmdResultChecker extends CmdResultChecker {
+public class YfBzyAbstractCmdResultChecker extends AbstractCmdResultChecker {
 
 
-    public YfBzyCmdResultChecker(CmdExecModel cmdExecModel) {
+    public YfBzyAbstractCmdResultChecker(CmdExecModel cmdExecModel) {
         super(cmdExecModel);
     }
 

+ 3 - 3
src/main/java/com/yunfeiyun/agmp/iots/service/checker/YfCbdCmdResultChecker.java

@@ -6,7 +6,7 @@ import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdDef;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
-import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
+import com.yunfeiyun.agmp.iots.core.cmd.checker.AbstractCmdResultChecker;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
 import com.yunfeiyun.agmp.iots.service.IIotCmdlogService;
 import lombok.extern.slf4j.Slf4j;
@@ -15,10 +15,10 @@ import java.util.Iterator;
 import java.util.Set;
 
 @Slf4j
-public class YfCbdCmdResultChecker extends CmdResultChecker {
+public class YfCbdAbstractCmdResultChecker extends AbstractCmdResultChecker {
 
 
-    public YfCbdCmdResultChecker(CmdExecModel cmdExecModel) {
+    public YfCbdAbstractCmdResultChecker(CmdExecModel cmdExecModel) {
         super(cmdExecModel);
     }
 

+ 35 - 132
src/main/java/com/yunfeiyun/agmp/iots/task/IotStatusService.java

@@ -41,45 +41,43 @@ public class IotStatusService {
     private MqttManager mqttManager;
 
 
-    /**
-     * iots 设备链接通道状态同步
-     */
-    private Map<String, Long> stringLongMap = new HashMap<>();
-
     @PostConstruct
     void init() {
         // 目前基于已经实现将最新数据放到DeviceConfig的设备,没有的将进行实现
-        //validateDeviceType.add(IotDeviceDictConst.TYPE_YF_CBD);// 云飞测报灯
+        validateDeviceType.add(IotDeviceDictConst.TYPE_YF_CBD);// 云飞测报灯
         validateDeviceType.add(IotDeviceDictConst.TYPE_YF_BZY);// 云飞孢子仪
-        validateDeviceType.add(IotDeviceDictConst.TYPE_ZHAO_HE_SFJ);// 赵赫水肥机
-        validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_GSSQ);// 新浦会管式墒情
-        validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_LDSW_JC);// 新浦会雷达水位监测
-        validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_SZZX_JC);// 新浦会水质在线监测
-        validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_WSHJ_JC);// 新浦会温室环境监测
-        validateDeviceType.add(IotDeviceDictConst.TYPE_YF_XYCB_2);// 性诱测报2.0
-        validateDeviceType.add(IotDeviceDictConst.TYPE_YF_GXZW);// 根系作物
-        validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_TRSH_CL);// 新普惠土壤水势
-        validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_WSKZ);// 新浦会温室环境控制
-        validateDeviceType.add(IotDeviceDictConst.TYPE_HPF_WSMJ_ZNKG);// 海普发智能温室控制
+        validateDeviceType.add(IotDeviceDictConst.TYPE_YF_SCD);// 云飞杀虫灯
+        validateDeviceType.add(IotDeviceDictConst.TYPE_YF_SQZ);// 云飞墒情站
+        validateDeviceType.add(IotDeviceDictConst.TYPE_YF_QXZ);// 云飞环境监测
+
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_ZHAO_HE_SFJ);// 赵赫水肥机
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_GSSQ);// 新浦会管式墒情
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_LDSW_JC);// 新浦会雷达水位监测
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_SZZX_JC);// 新浦会水质在线监测
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_WSHJ_JC);// 新浦会温室环境监测
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_YF_XYCB_2);// 性诱测报2.0
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_YF_GXZW);// 根系作物
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_TRSH_CL);// 新普惠土壤水势
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_XPH_WSKZ);// 新浦会温室环境控制
+        //validateDeviceType.add(IotDeviceDictConst.TYPE_HPF_WSMJ_ZNKG);// 海普发智能温室控制
 
         //validateDeviceType.add(IotDeviceDictConst.TYPE_BY_SFJ);// 云飞水肥机
         //validateDeviceType.add(IotDeviceDictConst.TYPE_HT_SFJ);// 宏泰水肥机
 
         //validateDeviceType.add(IotDeviceDictConst.TYPE_YF_QXZ);//云飞气象站
-        validateDeviceType.add(IotDeviceDictConst.TYPE_YF_SCD);// 云飞杀虫灯
-        try{
-            //selectSfStatusByAll();
-        }catch (Exception e){
+        try {
+            validateStatusByDevType();
+        } catch (Exception e) {
             log.error("【设备检测】异常", e);
         }
 
     }
 
     /**
-     * 定期根据类型查设备最新设备数据,是否长时间不上报。,并进行重新订阅
+     * 定期根据类型查设备最新设备数据,是否长时间不上报。
      */
-    //@Scheduled(cron = "0 0 */1 * * ?")
-    public void selectSfStatusByAll() throws MqttException {
+    @Scheduled(cron = "0 0 */1 * * ?")
+    public void validateStatusByDevType(){
         printMqttStatus();
         Iterator<String> iterator = validateDeviceType.iterator();
         while (iterator.hasNext()) {
@@ -90,9 +88,7 @@ public class IotStatusService {
 
     /***
      * 一个一个类型检查
-     * 原因:统计每个类型正常和错误的差值,用于判断是否整体重启
-     * 1. 同一个mqtt有些数据正常上报,有些不正常,则考虑订阅问题,排除mqtt链接问题
-     * 2. 同一个mqtt 都不正常,考虑重启mqtt
+     * 根据时间标记离线
      * @param type
      */
     void validateDeviceStatus(String type) {
@@ -102,8 +98,6 @@ public class IotStatusService {
         // 查出来待检查的设备。根据类型
         List<IotDeviceStatusResVo> iotDeviceStatusResVos = iIotDeviceService.selectAllDeviceConfigStatus(param);
         log.info("【设备检测】【设备类型{} 】设备:{} 个", type, iotDeviceStatusResVos.size());
-        int totalNum = iotDeviceStatusResVos.size();
-        int errorNum = 0;
         for (IotDeviceStatusResVo iotDeviceStatusResVo : iotDeviceStatusResVos) {
             try {
                 // 获取设备类型id,存在的在进行检查,因为有些设备不是基于device config 上报
@@ -114,126 +108,35 @@ public class IotStatusService {
                     String createdTime = iotDeviceStatusResVo.getDevCreateddate();
                     String vTime = StringUtils.isEmpty(updateTime) ? createdTime : updateTime;
                     String devCode = iotDeviceStatusResVo.getDevCode();
-                    String firmBizId = iotDeviceStatusResVo.getFirmBid();
-                    String devtypeBid = iotDeviceStatusResVo.getDevtypeBid();
                     String devName = iotDeviceStatusResVo.getDevName();
                     String devBid = iotDeviceStatusResVo.getDevBid();
                     String devStatus = iotDeviceStatusResVo.getDevStatus();
                     // 检查时间上报间隔是否异常,30分钟没有,则异常
-                    if (validateTime(vTime, 60)) {
-                        // 1. 当前在线的进行检查;
-                        if(devStatus.equals("1")){
-                            // 更新状态
-                            IotDevice iotDevice = new IotDevice();
-                            iotDevice.setDevBid(devBid);
-                            iotDevice.setDevStatus("0");
-                            iotDevice.setDevOfflinedate(DateUtils.dateTimeNow());
-                            iotDevice.setDevModifieddate(DateUtils.dateTimeNow());
-
-                            iIotDeviceService.updateIotDevice(iotDevice);
-                            log.info("【设备检测】【设备状态异常更新-设备类型{} 】设备标识:{} 设备编号 {} 设备名称 {} 异常状态 {}", type, devBid, devCode, devName, iotDeviceStatusResVo.getDevStatus());
-                        }
-
-                        // 2. 如果离线的:订阅的主题还在那还能收到上报数据,变为在线,如果是没有了,那就一直离线了:
-                        // so:不管之前在线,还是一直离线先都尝试重新链接订阅
-                        //if ("1".equals(iotDeviceStatusResVo.getDevStatus())) {
-                        // 获取该类型用的mqttcore 问:会不会没有呢,不会,系统初始化时候必须先建立
-                       /* MqttCore mqttCore = mqttManager.getMqttCoreByFirmAndDevType(firmBizId, devtypeBid);
-                        if (mqttCore != null) {
-                            log.info("【设备检测】【尝试重连】:设备:{} {} 上次时间:{},30分钟没有更新数据,厂家:{},厂家名称:{},设备类型:{}", devName, devCode, vTime, firmBizId, mqttCore.getFirmName(), devtypeBid);
-                            reConnection(mqttCore.getServiceName(), devCode, devName, devtypeBid);
-                            errorNum += 1;
-                        } else {
-                            log.error("【设备检测】【重连异常】【重视!!!】未找到服务 设备:{} code: {} 厂家id:{},厂家名称:{},设备类型:{}", devName, devCode, firmBizId, devtypeBid);
-                        }*/
-                    } else {
-                        cleanStatCount(devCode, iotDeviceStatusResVo.getDevName());
+                    if (!validateTime(vTime, 60)) {
                         log.info("【设备检测】【正常】:设备:{} {} 上次时间:{}", iotDeviceStatusResVo.getDevName(), iotDeviceStatusResVo.getDevCode(), vTime);
+                        return;
+                    }
+                    // 1. 当前在线的进行检查;
+                    if (devStatus.equals("1")) {
+                        // 更新状态
+                        IotDevice iotDevice = new IotDevice();
+                        iotDevice.setDevBid(devBid);
+                        iotDevice.setDevStatus("0");
+                        iotDevice.setDevOfflinedate(DateUtils.dateTimeNow());
+                        iotDevice.setDevModifieddate(DateUtils.dateTimeNow());
+                        iIotDeviceService.updateIotDevice(iotDevice);
+                        log.info("【设备检测】【设备状态异常更新-设备类型{} 】设备标识:{} 设备编号 {} 设备名称 {} 异常状态 {}", type, devBid, devCode, devName, iotDeviceStatusResVo.getDevStatus());
                     }
                 }
             } catch (Exception e) {
                 log.info("【设备检测】【程序异常】:设备:{} {} {} {}", iotDeviceStatusResVo.getDevName(), iotDeviceStatusResVo.getDevCode(), iotDeviceStatusResVo, e);
             }
         }
-        // 检查是否整体重置
-        log.info("【设备检测】【结果:设备类型{} 】 一共{}个,失败:{} ", type, totalNum, errorNum);
-        if (totalNum == errorNum) {
-            log.error("【设备检测】【重视!!!】【结果:设备类型{} 】全部失败{}:考虑重启 ", type, errorNum);
-            //reStartMqtt();
-        }
         log.info("【设备检测结束 类型:{}】【 设备:{} 个#########################", type, iotDeviceStatusResVos.size());
-
-    }
-
-    /**
-     * 重新链接
-     *
-     * @param serviceName
-     * @param devCode
-     * @param devName
-     * @param devType
-     * @throws MqttException
-     */
-    @Deprecated
-    void reConnection(String serviceName, String devCode, String devName, String devType) throws MqttException {
-        // 用于复用前人写的手动订阅和取消,虽然我们这里只有一个id
-        Map<String, List<String>> serviceNameMap = new HashMap<>();
-        List<String> ids = new ArrayList<>();
-        ids.add(devCode);
-        serviceNameMap.put(serviceName, ids);
-        // 最多重连20次,策略待定
-        if (stringLongMap.containsKey(devCode)) {
-            if (stringLongMap.get(devCode) < 20) {
-                //mqttManager.unsubscribeByBatch(serviceNameMap);
-                //mqttManager.subscribeByBatch(serviceNameMap);
-                statCount(devCode, devName);
-                log.info("【设备检测】【异常】【重连】:设备:devName: {} ,devCode: {}, devType:{}", devName, devCode, devType);
-            }
-        } else {
-            //mqttManager.unsubscribeByBatch(serviceNameMap);
-            //mqttManager.subscribeByBatch(serviceNameMap);
-            statCount(devCode, devName);
-            log.info("【设备检测】【异常】【重连】:设备:devName: {} ,devCode: {}, devType:{}", devName, devCode, devType);
-        }
     }
 
 
     /**
-     * 最保守方法进行重启mqtt,重新加载配置
-     *
-     * @throws MqttException
-     */
-    void reStartMqtt() throws MqttException {
-       // mqttManager.init();
-    }
-
-    /**
-     * 临时统计重新订阅次数
-     * 注意:后续成功后需要清除
-     *
-     * @param devCode
-     * @param name
-     */
-    void statCount(String devCode, String name) {
-        if (stringLongMap.containsKey(devCode)) {
-            stringLongMap.replace(devCode, stringLongMap.get(devCode) + 1);
-        } else {
-            stringLongMap.put(devCode, 1L);
-        }
-        log.info("【设备检测】【重连】统计次数:设备:{} {} 次数:{} {}", name, devCode, stringLongMap.get(devCode));
-    }
-
-    /**
-     * 临时统计重新订阅次数
-     */
-    void cleanStatCount(String devCode, String name) {
-        if (stringLongMap.containsKey(devCode)) {
-            log.info("【设备检测】【重连】统计次数:设备:{} {} 次数:{} {}", name, devCode, stringLongMap.get(devCode));
-            stringLongMap.remove(devCode);
-        }
-    }
-
-    /**
      * 超过10分钟返回true
      * true:一直没收到
      *