Explorar el Código

阶段提交:mqtt指令下发和接受实现,里面的链接id是写死的,需要结合业务替换,主要打印的日志

yf_zn hace 1 año
padre
commit
d2faf13faf

+ 28 - 0
src/main/java/com/yunfeiyun/agmp/iots/AgmpIotsApplication.java

@@ -1,5 +1,9 @@
 package com.yunfeiyun.agmp.iots;
 
+import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
+import com.yunfeiyun.agmp.iots.config.TestConst;
+import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
+import com.yunfeiyun.agmp.iots.device.controller.TestController;
 import org.mybatis.spring.annotation.MapperScan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,6 +28,30 @@ public class AgmpIotsApplication {
     public static void main(String[] args) {
         SpringApplication.run(AgmpIotsApplication.class, args);
         log.info("物联网服务子系统启动成功");
+        for (int i = 0; i < 1; i++) {
+            try {
+                Thread.sleep(10000);
+                log.info("我在这里模拟接收到iotm的消息,下发指令第{}次", i);
+                SpringUtils.getBean(TestController.class).orderTest();
+                // 取消订阅
+                log.info("我在这里模拟解绑topic,下发指令第{}次", i);
+                SpringUtils.getBean(MqttManager.class).topicSingleUnSubscribeDevice(TestConst.connectionId, TestConst.serviceName, TestConst.deviceId);
+                log.info("我在这里模拟解绑topic,等10s收到消息在解绑{}", i);
+
+                Thread.sleep(10000);
+                log.info("我在这里模拟解绑后发消息,");
+                SpringUtils.getBean(TestController.class).orderTest();
+                // 取消订阅
+                log.info("我在这里模拟解绑后再订阅");
+                SpringUtils.getBean(MqttManager.class).topicSingleSubscribeDevice(TestConst.connectionId, TestConst.serviceName, TestConst.deviceId);
+                log.info("我在这里模拟解绑后再订阅发消息,");
+                SpringUtils.getBean(TestController.class).orderTest();
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
     }
 
 

+ 10 - 0
src/main/java/com/yunfeiyun/agmp/iots/config/TestConst.java

@@ -0,0 +1,10 @@
+package com.yunfeiyun.agmp.iots.config;
+
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
+
+public class TestConst {
+    // 数据库测报灯的
+    public static final String connectionId="0b006e8d-0193-1000-e000-01aac0a83801";
+    public static final String serviceName= ServiceNameConst.SERVICE_YF_CBD;
+    public static final String deviceId= "864865062128484";
+}

+ 6 - 19
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/core/task/CmdTaskService.java

@@ -10,6 +10,7 @@ import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdDef;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdStatus;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdgroupStatus;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
 import com.yunfeiyun.agmp.iot.common.domain.IotCmdexec;
 import com.yunfeiyun.agmp.iot.common.domain.IotCmdtask;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
@@ -17,6 +18,7 @@ import com.yunfeiyun.agmp.iot.common.domain.resvo.IotFirmdevResVo;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
 import com.yunfeiyun.agmp.iot.common.model.task.TaskResult;
+import com.yunfeiyun.agmp.iots.config.TestConst;
 import com.yunfeiyun.agmp.iots.core.cmd.core.serial.SerialTaskModel;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
 import com.yunfeiyun.agmp.iots.device.common.Device;
@@ -86,36 +88,21 @@ public class CmdTaskService {
         // 获取完整设备信息
         Map<String, IotDevice> iotDeviceMap = iIotDeviceService.selectIotDeviceByDevBidMap(devBidList);
         // 获取设备厂家信息
-        List<IotFirmdevResVo> iotFirmdevResVoList = iIotFirmdevService.selectIotFirmdevListByDevBidList(devBidList);
-        Map<String, JSONObject> firmdevConfigMap = new HashMap<>();
-        // 生成设备厂家信息map
-        for (IotFirmdevResVo iotFirmdevResVo : iotFirmdevResVoList) {
-            log.info("任务标识:【{}】,装载设备厂商配置信息:【{}】", taskId, iotFirmdevResVo);
-            String k = iotFirmdevResVo.getFirmBid() + iotFirmdevResVo.getDevtypeBid();
-            JSONObject jsonConfig;
-            try {
-                jsonConfig = JSONObject.parseObject(iotFirmdevResVo.getFirmdevCfg());
-            } catch (Exception e) {
-                log.error("请求标识:【{}】,设备厂商配置信息不是标准的JSON格式:{}", e);
-                continue;
-            }
-            firmdevConfigMap.put(k, jsonConfig);
-        }
 
         for (CmdExecModel cmdExecModel : cmdExecModels) {
             CmdModel cmdModel = cmdExecModel.getCmdModel();
             log.info("任务标识:【{}】,开始执行最小任务单元:{}", taskId, cmdModel);
             IotDevice iotDevice = iotDeviceMap.get(cmdModel.getDeviceId());
-            String k = iotDevice.getFirmBid() + iotDevice.getDevtypeBid();
-            JSONObject serviceMap = firmdevConfigMap.get(k);
-            if (serviceMap == null || StringUtils.isEmpty(serviceMap.getString("service"))) {
+            //【需要补充】需要换出来ServiceName
+            log.info("!!!!!!!!!!【看这里:临时测试】这里的serviceName写死了SERVICE_YF_CBD,对接业务时候换掉,取出来");
+            String serviceName= TestConst.serviceName;
+            if (StringUtils.isEmpty(serviceName)) {
                 cmdExecModel.setCmdStatus(CmdStatus.CMD_STATUS_FAIL.getCode());
                 cmdExecModel.setErrmsg("指令下发失败:设备:" + iotDevice.getDevCode() + "的对应厂家配置异常,请检查");
                 cmdExecModel.setCmdFinishTime(DateUtils.dateTimeNow());
                 cmdResultCheckService.onChecked(cmdExecModel);
                 throw new IotBizException(IotErrorCode.UNKNOWN_DEVICE_FIRM.getCode(), "设备:" + iotDevice.getDevCode() + "的对应厂家配置异常,请检查");
             }
-            String serviceName = serviceMap.getString("service");
             Device device = mqttManager.getDeviceHandler(serviceName);
             if (null == device) {
                 cmdExecModel.setCmdStatus(CmdStatus.CMD_STATUS_FAIL.getCode());

+ 3 - 3
src/main/java/com/yunfeiyun/agmp/iots/core/init/AfterRunner.java

@@ -23,18 +23,18 @@ public class AfterRunner implements ApplicationRunner {
     public void run(ApplicationArguments args) throws Exception {
         log.info("【数据初始化】【执行设备同步】");
         try {
-            deviceScheduler.synMqDevice();
+            //deviceScheduler.synMqDevice();
         } catch (Exception e) {
             log.error("【数据初始化】【执行物候设备同步】异常", e);
         }
         try {
-            deviceScheduler.autoInsertSfDevice();
+            //deviceScheduler.autoInsertSfDevice();
         } catch (Exception e) {
             log.error("【数据初始化】【执行宏泰设备同步】异常", e);
         }
 
         try {
-            deviceScheduler.autoInsertBySfDevice();
+            //deviceScheduler.autoInsertBySfDevice();
         } catch (Exception e) {
             log.error("【数据初始化】【执行博云设备同步】异常", e);
         }

+ 8 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/manager/test/MqttTest.java

@@ -0,0 +1,8 @@
+package com.yunfeiyun.agmp.iots.core.manager.test;
+
+import org.springframework.stereotype.Service;
+
+@Service
+public class MqttTest {
+
+}

+ 88 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/controller/TestController.java

@@ -0,0 +1,88 @@
+package com.yunfeiyun.agmp.iots.device.controller;
+
+import com.yunfeiyun.agmp.common.core.domain.AjaxResult;
+import com.yunfeiyun.agmp.common.utils.JSONUtils;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
+import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
+import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
+import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+
+/**
+ * 设备相关接口
+ */
+@RestController
+@RequestMapping("test")
+@Slf4j
+public class TestController {
+
+
+    @Autowired
+    private MqttManager mqttManager;
+
+
+    @Autowired
+    private CmdDispatcherService cmdDispatcherService;
+
+    /**
+     * 该方法模拟接收到mq的消息解析发送
+     *
+     * @param
+     * @return
+     * @throws Exception
+     */
+    @GetMapping("/cbd/test/cmd")
+    public AjaxResult orderTest() {
+        log.info("测试发送指令,测报灯");
+        String msg = "{\n" +
+                "\t\"taskUuid\": \"c164133a-0192-1000-e001-6ae6c0a801e6\",\n" +
+                "\t\"taskDesc\": null,\n" +
+                "\t\"cmdModels\": [\n" +
+                "\t\t[{\n" +
+                "\t\t\t\"ctBid\": null,\n" +
+                "\t\t\t\"ceBid\": null,\n" +
+                "\t\t\t\"deviceId\": \"25470430-0191-1000-e000-50297f000001\",\n" +
+                "\t\t\t\"iotDevice\": null,\n" +
+                "\t\t\t\"clogSendresult\": null,\n" +
+                "\t\t\t\"clogSendresultContent\": null,\n" +
+                "\t\t\t\"clogValidresult\": null,\n" +
+                "\t\t\t\"clogDesc\": null,\n" +
+                "\t\t\t\"tag\": null,\n" +
+                "\t\t\t\"delay\": null,\n" +
+                "\t\t\t\"durationTime\": null,\n" +
+                "\t\t\t\"finalStatus\": false,\n" +
+                "\t\t\t\"needcheckStatus\": false,\n" +
+                "\t\t\t\"timeout\": 10,\n" +
+                "\t\t\t\"dockingMode\": null,\n" +
+                "\t\t\t\"cmdFeedback\": null,\n" +
+                "\t\t\t\"cmdDistribution\": {\n" +
+                "\t\t\t\t\"func\": \"refresh\",\n" +
+                "\t\t\t\t\"retry\": 5,\n" +
+                "\t\t\t\t\"jsons\": {\n" +
+                "\t\t\t\t\t\"cmd\": \"read\",\n" +
+                "\t\t\t\t\t\"ext\": \"data\"\n" +
+                "\t\t\t\t}\n" +
+                "\t\t\t}\n" +
+                "\t\t}]\n" +
+                "\t],\n" +
+                "\t\"resetCmdModels\": null,\n" +
+                "\t\"requestId\": \"9828935b-ee4e-41ff-98b4-e60ba99336e2\",\n" +
+                "\t\"receiptAddr\": null,\n" +
+                "\t\"devCode\": \"864865062128484\",\n" +
+                "\t\"ctBiztype\": \"3\",\n" +
+                "\t\"ctBiztitle\": \"测报灯:864865062128484\",\n" +
+                "\t\"ctParam\": \"【测报灯控制】设备ID:25470430-0191-1000-e000-50297f000001,刷新数据;\",\n" +
+                "\t\"ctDevtype\": \"3\",\n" +
+                "\t\"ctDelayTime\": null,\n" +
+                "\t\"cid\": \"1\"\n" +
+                "}";
+        CmdGroupModel cmdGroupModel = JSONUtils.toObject(msg, CmdGroupModel.class);
+        cmdDispatcherService.handleCmd(cmdGroupModel);
+        // 模拟设备指令接入
+        //mqttManager.getDeviceHandler(ServiceNameConst.SERVICE_YF_CBD).sendCmd(null);
+        return AjaxResult.success();
+    }
+}

+ 17 - 15
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/CqCbdDeviceImpl.java

@@ -15,6 +15,7 @@ import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
 import com.yunfeiyun.agmp.iot.common.service.MongoService;
 import com.yunfeiyun.agmp.iot.common.service.IotAddressService;
 import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
+import com.yunfeiyun.agmp.iots.config.TestConst;
 import com.yunfeiyun.agmp.iots.device.domain.cbd.YfCbdMsgUtil;
 import com.yunfeiyun.agmp.iots.device.domain.cbd.YfCbdTakephotoMsg;
 import com.yunfeiyun.agmp.iots.device.service.*;
@@ -76,18 +77,19 @@ public class CqCbdDeviceImpl implements ICbdDevice {
     @Autowired
     private IotDeviceAddressService iotDeviceAddressService;
 
+    String connectionId = TestConst.connectionId;
     @Override
     public Object sendCmd(CmdModel cmdModel) throws Exception {
-        log.info("【CBD】收到指令 任务 cmdModel={}", cmdModel);
+        log.info("【测报灯】发送指令 任务 cmdModel={}", cmdModel);
 
         // 获取执行的指令
         CmdModel.Cmd cmdDistribution = cmdModel.getCmdDistribution();
         // 获取执行的方法 ,方法可以通过反射获取执行,也可以临时case 匹配
         String methodName = cmdModel.getCmdDistribution().getFunc();
 
+
         String mqttMsgContent = "";
         String clogSendresult = "发送指令成功";
-        ;
 
         switch (methodName) {
             case CmdDef.YfCbdCmdDef.CMD_REQ_DATA:
@@ -125,11 +127,11 @@ public class CqCbdDeviceImpl implements ICbdDevice {
                 break;
             }
         }
+        String topic = IotMqttConstant.YFCbdTopic.TOPIC_CBD_CMD_PREFIX + cmdModel.getIotDevice().getDevCode();
+        log.info("!!!!!!!!!!【看这里:临时测试】这里的connectionId写死了,对接业务时候换掉,取出来真实的");
+        mqttManager.publishMsg(connectionId, topic, mqttMsgContent);
 
-        MqttPublisher mqttPublisher = mqttManager.getPublisherByService(SERVICE_NAME);
-        mqttPublisher.publish(IotMqttConstant.YFCbdTopic.TOPIC_CBD_CMD_PREFIX + cmdModel.getIotDevice().getDevCode(), mqttMsgContent);
-
-        log.info("【CBD】发送指令完毕!");
+        log.info("【CBD】发送指令完毕!connectionId:{},topic :{} mqttMsgContent: {}",connectionId,topic, mqttMsgContent);
 
         cmdModel.setClogSendresult(clogSendresult);
         cmdModel.setClogDesc(mqttMsgContent);
@@ -206,9 +208,9 @@ public class CqCbdDeviceImpl implements ICbdDevice {
     }
 
     public void cmdData(JSONObject ext, String devUpdateddate) throws Exception {
-        log.info("测报灯数据解析 {}", ext.toString());
-        String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
-        String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
+        log.info("测报灯数据解析 {}", ext.toString());
+        String devtypeBid = mqttManager.getDeviceTypeBizId(connectionId);
+        String firmBid = mqttManager.getFirmBizId(connectionId);
         boolean isCbd = true;
         String vtype = ext.getString("vtype");
         if (ext.containsKey("dat_f")) {
@@ -304,11 +306,11 @@ public class CqCbdDeviceImpl implements ICbdDevice {
 
     @Override
     public Object receiveData(String topic, JSONObject dataJson) throws Exception {
-        log.debug("测报灯实现类  处理收到的 设备上报数据 {}", dataJson.toString());
+        log.info("【测报灯】收到的 设备上报数据 {}", dataJson.toString());
         // 接收设备上报数据后的处理逻辑
         String devUpdateddate = dataJson.getString("devUpdateddate");
-        if(StringUtils.isEmpty(devUpdateddate)){
-            devUpdateddate= DateUtils.dateTimeNow();
+        if (StringUtils.isEmpty(devUpdateddate)) {
+            devUpdateddate = DateUtils.dateTimeNow();
         }
         if (dataJson.containsKey("Image")) {
             Object result = this.receivePicData(dataJson, devUpdateddate);
@@ -335,7 +337,7 @@ public class CqCbdDeviceImpl implements ICbdDevice {
     }
 
     @Override
-    public Object receivePicData(JSONObject jsonObject, String devUpdateddate){
+    public Object receivePicData(JSONObject jsonObject, String devUpdateddate) {
         String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
         String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
         String deviceTypeId = jsonObject.getString("device_type_id");
@@ -381,8 +383,8 @@ public class CqCbdDeviceImpl implements ICbdDevice {
     @Override
     public IotDevice findIotDevice(String topic, JSONObject jobjMsg) {
         String devCode = topic.substring(topic.lastIndexOf("/") + 1);
-        String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
-        String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
+        String devtypeBid = mqttManager.getDeviceTypeBizId(connectionId);
+        String firmBid = mqttManager.getFirmBizId(connectionId);
         //查询
         IotDevice ret = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
         log.debug("查到了一个iotdevice {}", ret);

+ 4 - 0
src/main/java/com/yunfeiyun/agmp/iots/service/checker/CmdResultCheckService.java

@@ -96,6 +96,10 @@ public class CmdResultCheckService {
      * @param cmdExecModel
      */
     public void onChecked(CmdExecModel cmdExecModel) {
+        if(true){
+            log.info("!!!!!!!!!!【看这里:临时测试】checker 因为iots直接下发指令测试,没有塞缓存task,先强制关闭,联调到iotm iots,再打开");
+            return;
+        }
         if (cmdExecModel == null) {
             log.error("cmdExceModel is null");
             return;

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/service/impl/WarnService.java

@@ -146,7 +146,7 @@ public class WarnService {
      * 每个整点执行一次 预警检查
      */
     //@Async
-    @Scheduled(cron = "0 0 * * * ?")
+    //@Scheduled(cron = "0 0 * * * ?")
     //@Scheduled(cron = "0 */1 * * * ?")  //调试期间,每分钟 执行一次
     private void jobCheck() {
 

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/task/IotStatusService.java

@@ -68,7 +68,7 @@ public class IotStatusService {
         //validateDeviceType.add(IotDeviceDictConst.TYPE_YF_QXZ);//云飞气象站
         validateDeviceType.add(IotDeviceDictConst.TYPE_YF_SCD);// 云飞杀虫灯
         try{
-            selectSfStatusByAll();
+            //selectSfStatusByAll();
         }catch (Exception e){
             log.error("【设备检测】异常", e);
         }
@@ -78,7 +78,7 @@ public class IotStatusService {
     /**
      * 定期根据类型查设备最新设备数据,是否长时间不上报。,并进行重新订阅
      */
-    @Scheduled(cron = "0 0 */1 * * ?")
+    //@Scheduled(cron = "0 0 */1 * * ?")
     public void selectSfStatusByAll() throws MqttException {
         printMqttStatus();
         Iterator<String> iterator = validateDeviceType.iterator();

+ 1 - 1
src/main/resources/mapper/IotCmdexecMapper.xml

@@ -132,7 +132,7 @@
         insert into IotCmdexec
         <trim prefix="(" suffix=")" suffixOverrides=",">
             ceBid,
-            cId,
+            tid,
             ctBid,
             devBid,
             ceGroupbid,