yf_zn 1 год назад
Родитель
Сommit
4c87189f9a
18 измененных файлов с 275 добавлено и 135 удалено
  1. 29 24
      src/main/java/com/yunfeiyun/agmp/iots/AgmpIotsApplication.java
  2. 53 26
      src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java
  3. 18 1
      src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/DeviceTopicService.java
  4. 10 0
      src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/modal/MqttTopicValue.java
  5. 48 0
      src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java
  6. 2 2
      src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java
  7. 4 2
      src/main/java/com/yunfeiyun/agmp/iots/device/common/Device.java
  8. 2 0
      src/main/java/com/yunfeiyun/agmp/iots/device/mapper/IotDeviceMapper.java
  9. 1 1
      src/main/java/com/yunfeiyun/agmp/iots/device/service/ICbdDevice.java
  10. 2 2
      src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/BzyDeviceImpl.java
  11. 87 69
      src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/CqCbdDeviceImpl.java
  12. 2 2
      src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IHikVisionEzvizDeviceImpl.java
  13. 2 2
      src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IotBigDataMonitorDeviceImpl.java
  14. 2 2
      src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YfQxzDeviceImpl.java
  15. 2 2
      src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YfScdDeviceImpl.java
  16. 2 0
      src/main/java/com/yunfeiyun/agmp/iots/service/IIotDeviceService.java
  17. 6 0
      src/main/java/com/yunfeiyun/agmp/iots/service/impl/IotDeviceServiceImpl.java
  18. 3 0
      src/main/resources/mapper/IotDeviceMapper.xml

+ 29 - 24
src/main/java/com/yunfeiyun/agmp/iots/AgmpIotsApplication.java

@@ -3,6 +3,7 @@ package com.yunfeiyun.agmp.iots;
 import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
 import com.yunfeiyun.agmp.iots.config.TestConst;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
+import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.device.controller.TestController;
 import org.mybatis.spring.annotation.MapperScan;
 import org.slf4j.Logger;
@@ -28,30 +29,34 @@ public class AgmpIotsApplication {
     public static void main(String[] args) {
         SpringApplication.run(AgmpIotsApplication.class, args);
         log.info("物联网服务子系统启动成功");
-        for (int i = 0; i < 1; i++) {
-            try {
-                Thread.sleep(10000);
-                log.info("我在这里模拟接收到iotm的消息,下发指令第{}次", i);
-                SpringUtils.getBean(TestController.class).orderTest();
-                // 取消订阅
-                log.info("我在这里模拟解绑topic,下发指令第{}次", i);
-                SpringUtils.getBean(MqttManager.class).topicSingleUnSubscribeDevice(TestConst.connectionId, TestConst.serviceName, TestConst.deviceId);
-                log.info("我在这里模拟解绑topic,等10s收到消息在解绑{}", i);
-
-                Thread.sleep(10000);
-                log.info("我在这里模拟解绑后发消息,");
-                SpringUtils.getBean(TestController.class).orderTest();
-                // 取消订阅
-                log.info("我在这里模拟解绑后再订阅");
-                SpringUtils.getBean(MqttManager.class).topicSingleSubscribeDevice(TestConst.connectionId, TestConst.serviceName, TestConst.deviceId);
-                log.info("我在这里模拟解绑后再订阅发消息,");
-                SpringUtils.getBean(TestController.class).orderTest();
-
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-        }
+//        for (int i = 0; i < 1; i++) {
+//            try {
+//                Thread.sleep(10000);
+//                MqttTopicValue mqttTopicValue=new MqttTopicValue();
+//                mqttTopicValue.setDevCode(TestConst.deviceId);
+//                mqttTopicValue.setDevId("xxx");
+//                log.info("我在这里模拟接收到iotm的消息,下发指令第{}次", i);
+//                SpringUtils.getBean(TestController.class).orderTest();
+//                // 取消订阅
+//                log.info("我在这里模拟解绑topic,下发指令第{}次", i);
+//                SpringUtils.getBean(MqttManager.class).topicSingleUnSubscribeDevice(TestConst.connectionId, TestConst.serviceName, mqttTopicValue);
+//                log.info("我在这里模拟解绑topic,等10s收到消息在解绑{}", i);
+//
+//                Thread.sleep(10000);
+//                log.info("我在这里模拟解绑后发消息,");
+//                SpringUtils.getBean(TestController.class).orderTest();
+//                // 取消订阅
+//                log.info("我在这里模拟解绑后再订阅");
+//
+//                SpringUtils.getBean(MqttManager.class).topicSingleSubscribeDevice(TestConst.connectionId, TestConst.serviceName, mqttTopicValue);
+//                log.info("我在这里模拟解绑后再订阅发消息,");
+//                SpringUtils.getBean(TestController.class).orderTest();
+//
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//
+//        }
     }
 
 

+ 53 - 26
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -9,6 +9,7 @@ import com.yunfeiyun.agmp.iot.common.enums.DevType;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
+import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttConfig;
 import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttCore;
 import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
@@ -100,8 +101,26 @@ public class MqttManager {
 
         // 查询topics【需要实现:重新更改获取该型号下的设备】
         String connectionId=iotDeviceconnResVo.getDevconnBid();
-        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceTopicService.getDeviceCodesByConectionId(connectionId));
-        cfgYf.setSubTopic(topics);
+        mqttCore.setConnectionId(connectionId);
+        // 构建topic
+        List<IotDevice> devices= deviceTopicService.getDevicesByConectionId(connectionId);
+        List<MqttTopicValue> mqttTopicValues=new ArrayList<>();
+        for(IotDevice iotDevice:devices){
+            MqttTopicValue mqttTopicValue=new MqttTopicValue();
+            mqttTopicValue.setDevCode(iotDevice.getDevCode());
+            mqttTopicValue.setDevId(iotDevice.getDevBid());
+            mqttTopicValue.setTopic(deviceTopicService.getTopic(serviceName, iotDevice.getDevCode()));
+            mqttTopicValues.add(mqttTopicValue);
+        }
+
+        String[] topics=new String[mqttTopicValues.size()];
+        if(mqttTopicValues!=null&&!mqttTopicValues.isEmpty()){
+            for(int i=0;i<devices.size();i++){
+                topics[i]=mqttTopicValues.get(i).getTopic();
+            }
+           cfgYf.setSubTopic(topics);
+        }
+        mqttCore.bindTopicToDeviceId(mqttTopicValues);
         cfgYf.setServiceName(serviceName);
         log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
 
@@ -369,14 +388,14 @@ public class MqttManager {
      *
      * @param connectionId
      * @param serviceName
-     * @param deviceIds
+     * @param mqttTopicValues
      * @throws MqttException
      */
-    public void topicBatchSubscribeDevices(String connectionId, String serviceName, List<String> deviceIds) throws MqttException {
-        log.info("【开始批量订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, deviceIds);
+    public void topicBatchSubscribeDevices(String connectionId, String serviceName, List<MqttTopicValue> mqttTopicValues) throws MqttException {
+        log.info("【开始批量订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, mqttTopicValues);
 
         // 获取批量订阅的主题
-        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceIds.toArray(new String[0]));
+        String[] topics = deviceTopicService.getBatchTopic(serviceName, mqttTopicValues);
         log.info("【获取批量订阅主题】 topics: {}", Arrays.toString(topics));
 
         // 获取MqttCore实例
@@ -401,20 +420,19 @@ public class MqttManager {
      *
      * @param connectionId
      * @param serviceName
-     * @param deviceId
+     * @param mqttTopicValue
      * @throws MqttException
      */
-    public void topicSingleSubscribeDevice(String connectionId, String serviceName, String deviceId) throws MqttException {
-        log.info("【开始单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
+    public void topicSingleSubscribeDevice(String connectionId, String serviceName, MqttTopicValue mqttTopicValue) throws MqttException {
+        log.info("【开始单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValue);
 
         // 转换为批量订阅
-        List<String> deviceIds = new ArrayList<>();
-        deviceIds.add(deviceId);
-
+        List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
+        mqttTopicValues.add(mqttTopicValue);
         // 调用批量订阅方法
-        topicBatchSubscribeDevices(connectionId, serviceName, deviceIds);
+        topicBatchSubscribeDevices(connectionId, serviceName, mqttTopicValues);
 
-        log.info("【完成单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
+        log.info("【完成单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValues);
     }
 
 
@@ -423,14 +441,14 @@ public class MqttManager {
      *
      * @param connectionId
      * @param serviceName
-     * @param deviceIds
+     * @param mqttTopicValues
      * @throws MqttException
      */
-    public void topicBatchUnSubscribeDevices(String connectionId, String serviceName, List<String> deviceIds) throws MqttException {
-        log.info("【开始批量取消订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, deviceIds);
+    public void topicBatchUnSubscribeDevices(String connectionId, String serviceName, List<MqttTopicValue> mqttTopicValues) throws MqttException {
+        log.info("【开始批量取消订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, mqttTopicValues);
 
         // 获取批量取消订阅的主题
-        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceIds.toArray(new String[0]));
+        String[] topics = deviceTopicService.getBatchTopic(serviceName,mqttTopicValues);
         log.info("【获取批量取消订阅主题】 topics: {}", Arrays.toString(topics));
 
         // 获取MqttCore实例
@@ -454,20 +472,20 @@ public class MqttManager {
      *
      * @param connectionId
      * @param serviceName
-     * @param deviceId
+     * @param mqttTopicValue
      * @throws MqttException
      */
-    public void topicSingleUnSubscribeDevice(String connectionId, String serviceName, String deviceId) throws MqttException {
-        log.info("【开始单个设备取消订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
+    public void topicSingleUnSubscribeDevice(String connectionId, String serviceName, MqttTopicValue mqttTopicValue) throws MqttException {
+        log.info("【开始单个设备取消订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValue);
 
         // 转换为批量取消订阅
-        List<String> deviceIds = new ArrayList<>();
-        deviceIds.add(deviceId);
+        List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
+        mqttTopicValues.add(mqttTopicValue);
 
         // 调用批量取消订阅方法
-        topicBatchUnSubscribeDevices(connectionId, serviceName, deviceIds);
+        topicBatchUnSubscribeDevices(connectionId, serviceName, mqttTopicValues);
 
-        log.info("【完成单个设备取消订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, deviceId);
+        log.info("【完成单个设备取消订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValue);
     }
 
 
@@ -527,7 +545,7 @@ public class MqttManager {
         }
 
         String[] deviceId = iotDeviceList.stream().map(IotDevice::getDevBid).toArray(String[]::new);
-        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceId);
+        String[] topics =null ;// deviceTopicService.getBatchTopic(serviceName, deviceId);
         if (topics == null || topics.length == 0) {
             log.warn("【设备重新订阅】【订阅】【重连】:设备:{} 没有找到topic", serviceName);
             return;
@@ -548,4 +566,13 @@ public class MqttManager {
         }
     }
 
+    /**
+     * 根据topic 获取id
+     * @param connectionId
+     * @param topic
+     * @return
+     */
+    public String getDevIdByTopic(String connectionId,String topic){
+        return getMqttCore(connectionId).getDevIdByTopic(topic);
+    }
 }

+ 18 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/DeviceTopicService.java

@@ -5,6 +5,7 @@ import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
 import com.yunfeiyun.agmp.iot.common.constant.mqtt.IotMqttConstant;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
+import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -58,8 +59,13 @@ public class DeviceTopicService {
      * @param deviceId
      * @return
      */
-    public String[] getBatchTopic(String serviceName, String[] deviceId) {
+    public String[] getBatchTopic(String serviceName,List<MqttTopicValue> mqttTopicValues) {
+
+        String [] deviceId=new String[mqttTopicValues.size()];
         log.info("【加载topic】serviceName:{},原始topic {}", serviceName, deviceId);
+        for(int i=0;i<mqttTopicValues.size();i++){
+            deviceId[i]=mqttTopicValues.get(i).getDevCode();
+        }
         switch (serviceName) {
             case ServiceNameConst.SERVICE_YF_CBD: {
                 return getYfCbdBatchSubTopic(deviceId);
@@ -94,6 +100,13 @@ public class DeviceTopicService {
         }
     }
 
+    public String getTopic(String serviceName,String deviceCode) {
+        String [] deviceId=new String[]{deviceCode};
+        log.info("【加载topic】serviceName:{},原始topic {}", serviceName, deviceId);
+        List<MqttTopicValue> mqttTopicValues=new ArrayList<>();
+        return getBatchTopic(serviceName,mqttTopicValues)[0];
+    }
+
     private String[] getHKBatchSubTopic() {
         List<IotDevice> iotDeviceList = iotDeviceService.selectAllDeviceByDeviceServiceName(ServiceNameConst.SERVICE_EZVIZ_MINITOR);
         String[] topics = new String[0];
@@ -255,4 +268,8 @@ public class DeviceTopicService {
         }
         return ids.toArray(new String[]{});
     }
+
+    public List<IotDevice> getDevicesByConectionId(String connectionId) {
+        return iotDeviceService.getDevicesByConectionId(connectionId);
+    }
 }

+ 10 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/modal/MqttTopicValue.java

@@ -0,0 +1,10 @@
+package com.yunfeiyun.agmp.iots.core.mqtt.modal;
+
+import lombok.Data;
+
+@Data
+public class MqttTopicValue {
+    private String devCode;
+    private String devId;
+    private String topic;
+}

+ 48 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -1,5 +1,7 @@
 package com.yunfeiyun.agmp.iots.core.mqtt.network;
 
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.service.checker.CmdResultCheckService;
 import lombok.Getter;
@@ -9,6 +11,9 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 /**
@@ -27,6 +32,10 @@ public class MqttCore {
      */
     private MqttClient mqttClient;
 
+    private String connectionId;
+
+
+
     /**
      * 内部维护的发布者
      */
@@ -37,7 +46,18 @@ public class MqttCore {
      * 内部维护的订阅者
      */
     private MqttSubscriber mqttSubscriber;
+    /**
+     * topic -> device
+     */
+    Map<String,String> topicToDevId=new HashMap<>();
 
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
 
     public void setMqttConfig(MqttConfig mqttConfig) {
         this.mqttConfig = mqttConfig;
@@ -210,5 +230,33 @@ public class MqttCore {
      */
     public void unsubscribe(String[] topics) throws MqttException {
         getClient().unsubscribe(topics);
+        for(String topic:topics){
+            topicToDevId.remove(topic);
+        }
+    }
+
+    /**
+     * 将topic映射到设备id
+     * @param mqttTopicValues
+     */
+    public void bindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues){
+        for(MqttTopicValue mqttTopicValue:mqttTopicValues){
+            topicToDevId.put(mqttTopicValue.getTopic(),mqttTopicValue.getDevId());
+
+        }
+    }
+//    public void unBindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues){
+//        for(MqttTopicValue mqttTopicValue:mqttTopicValues){
+//            topicToDevId.remove(mqttTopicValue.getTopic());
+//        }
+//    }
+
+    /**
+     * 根据topic 获取设备id
+     * @param topic
+     * @return
+     */
+    public String getDevIdByTopic(String topic){
+        return topicToDevId.get(topic);
     }
 }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -58,11 +58,11 @@ public class MqttSubscriber implements MqttCallbackExtended {
                 // 当收到mqtt消息无对应的设备类型时,则丢弃消息。
                 IotException.UNKNOWN_DEVICE.throwException();
             }
-            Object result = device.receiveData(topic, obj);
+            Object result = device.receiveData(topic, obj,mqttCore.getConnectionId());
             log.info("【上报数据:处理结果】{}", result);
             //如果是“设备属性”消息,进行执行结果检查
             if (device.isDeviceProps(obj)) {
-                SpringUtils.getBean(CmdResultCheckService.class).check(device.findIotDevice(topic, obj), obj);
+                SpringUtils.getBean(CmdResultCheckService.class).check(device.findIotDevice(topic, obj,mqttCore.getConnectionId()), obj);
             } else {
                 log.error("其它数据");
             }

+ 4 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/common/Device.java

@@ -25,7 +25,9 @@ public interface Device {
      * @param orderCmdParam 该参数待定,还不知道需要哪些基础参数
      * @return
      */
-    public Object receiveData(String topic, JSONObject cmdJson) throws Exception;
+    //public Object receiveData(String topic, JSONObject cmdJson) throws Exception;
+
+    public Object receiveData(String topic, JSONObject cmdJson,String connectionId) throws Exception;
 
     public boolean isDeviceProps(JSONObject cmdJson);
 
@@ -36,5 +38,5 @@ public interface Device {
      * @param jobjMsg
      * @return
      */
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg);
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId);
 }

+ 2 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/mapper/IotDeviceMapper.java

@@ -97,6 +97,8 @@ public interface IotDeviceMapper {
     int updateIotDeviceStatusByType(@Param("devStatus") String devStatus,@Param("firmBid") String firmBid,@Param("devtypeBid")String devtypeBid);
 
     List<String> getDeviceCodesByConectionId(@Param("connectionId") String connectionId);
+
+    List<IotDevice> getDevicesByConectionId(String connectionId);
 }
 
 

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/device/service/ICbdDevice.java

@@ -10,6 +10,6 @@ import com.yunfeiyun.agmp.iots.device.common.Device;
  */
 public interface ICbdDevice extends Device {
 
-    Object receivePicData(JSONObject jsonObject, String devUpdateddate);
+    Object receivePicData(JSONObject jsonObject, String devUpdateddate,String topic,String connectionId);
 
 }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/BzyDeviceImpl.java

@@ -245,7 +245,7 @@ public class BzyDeviceImpl implements IBzyDevice {
     }
 
     @Override
-    public Object receiveData(String topic, JSONObject dataJson) throws Exception {
+    public Object receiveData(String topic, JSONObject dataJson,String connectionId) throws Exception {
         log.info("【孢子仪实现类】  【收到上报数据】  {}", dataJson.toString());
         // 接收设备上报数据后的处理逻辑
         String devUpdateddate = dataJson.getString("devUpdateddate");
@@ -310,7 +310,7 @@ public class BzyDeviceImpl implements IBzyDevice {
     }
 
     @Override
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg) {
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
         String devCode = topic.substring(topic.lastIndexOf("/") + 1);
         String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
         String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);

+ 87 - 69
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/CqCbdDeviceImpl.java

@@ -70,7 +70,6 @@ public class CqCbdDeviceImpl implements ICbdDevice {
     @Autowired
     private IotDeviceAddressService iotDeviceAddressService;
 
-    String connectionId = TestConst.connectionId;
     @Override
     public Object sendCmd(CmdModel cmdModel) throws Exception {
         log.info("【测报灯】发送指令 任务 cmdModel={}", cmdModel);
@@ -121,11 +120,10 @@ public class CqCbdDeviceImpl implements ICbdDevice {
             }
         }
         String topic = IotMqttConstant.YFCbdTopic.TOPIC_CBD_CMD_PREFIX + cmdModel.getIotDevice().getDevCode();
-        log.info("!!!!!!!!!!【看这里:临时测试】这里的connectionId写死了,对接业务时候换掉,取出来真实的");
-        //String connectionId = cmdModel.getIotDevice().getDevconnBid();
-        mqttManager.publishMsg(connectionId, topic, mqttMsgContent);
+        IotDevice iotDevice= iIotDeviceService.selectIotDeviceByDevBid(cmdModel.getIotDevice().getDevBid());
+        mqttManager.publishMsg(iotDevice.getDevconnBid(), topic, mqttMsgContent);
 
-        log.info("【CBD】发送指令完毕!connectionId:{},topic :{} mqttMsgContent: {}",connectionId,topic, mqttMsgContent);
+        log.info("【CBD】发送指令完毕!connectionId:{},topic :{} mqttMsgContent: {}",iotDevice.getDevconnBid(),topic, mqttMsgContent);
 
         cmdModel.setClogSendresult(clogSendresult);
         cmdModel.setClogDesc(mqttMsgContent);
@@ -201,26 +199,34 @@ public class CqCbdDeviceImpl implements ICbdDevice {
         return JSONUtils.toJSONString(extConf);
     }
 
-    public void cmdData(JSONObject ext, String devUpdateddate) throws Exception {
+    /**
+     *
+     * @param ext
+     * @param devUpdateddate
+     * @param topic
+     * @param connectionId
+     * @throws Exception
+     */
+    public void cmdData(JSONObject ext, String devUpdateddate,String topic,String connectionId) throws Exception {
         log.info("【测报灯】数据解析 {}", ext.toString());
-        TestConst.printError("!!!!!!!!!!【看这里:临时测试】这里的connectionId写死了,对接业务时候换掉,取出来");
-        String devtypeBid = mqttManager.getDeviceTypeBizId(connectionId);
-        String firmBid = mqttManager.getFirmBizId(connectionId);
+        String devId=mqttManager.getDevIdByTopic(connectionId,topic);
+        IotDevice iotDeviceFromDb= iIotDeviceService.selectIotDeviceByDevBid(devId);
+        if (iotDeviceFromDb == null) {
+            return ;
+        }
         boolean isCbd = true;
         String vtype = ext.getString("vtype");
         if (ext.containsKey("dat_f")) {
             ext.put("datt", ext.getString("dat_f"));
         }
         if (StringUtils.isNotEmpty(vtype) && vtype.equals("6")) {
-            devtypeBid = "xct";
             isCbd = false;
         }
-
-        IotDevice iotDevice = new IotDevice();
-        iotDevice.setDevtypeBid(devtypeBid);
-        iotDevice.setFirmBid(firmBid);
-        iotDevice.setDevUpdateddate(devUpdateddate);
-        iotDevice.setDevStatus("1");
+        if(!isCbd){
+            return;
+        }
+        iotDeviceFromDb.setDevUpdateddate(devUpdateddate);
+        iotDeviceFromDb.setDevStatus("1");
 
         HashMap<String, String> keyMaps = new HashMap<>();
         keyMaps.put("dver", "devVersion");
@@ -231,76 +237,76 @@ public class CqCbdDeviceImpl implements ICbdDevice {
             String k = entry.getValue();
             String v = ext.getString(entry.getKey());
             if (StringUtils.isNotEmpty(v)) {
-                ReflectUtils.invokeSetter(iotDevice, k, v);
+                ReflectUtils.invokeSetter(iotDeviceFromDb, k, v);
             }
         }
-        // 设备不存在 就不在处理
-        IotDevice iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(iotDevice.getDevtypeBid(), iotDevice.getFirmBid(), iotDevice.getDevCode());
-        if (iotDeviceOld == null) {
-            return;
-        }
-        iotDevice.setDevBid(iotDeviceOld.getDevBid());
+
         String lat = ext.getString("lat");
         String lng = ext.getString("lng");
 
-        if (!Objects.equals(iotDeviceOld.getDevPositionstatus(), "0")) {
-            iotDeviceAddressService.setDeviceAddress(iotDevice, lng, lat);
+        if (!Objects.equals(iotDeviceFromDb.getDevPositionstatus(), "0")) {
+            iotDeviceAddressService.setDeviceAddress(iotDeviceFromDb, lng, lat);
         }
 
         try {
             // 单独处理是否识别是否开启识别:0禁用(不带识别)1识别 2 计数
             String disable = ext.getString("disable");
             log.info("【测报灯-解析识别状态】disable:{}", disable);
-            /*String extInfoOld = iotDeviceOld.getExtInfo();
-            String extInfo = extInfoOld == null ? "{}" : extInfoOld;
-            JSONObject jsonObject = JSONObject.parseObject(extInfo);
-            jsonObject.put("disable", disable);
-            iotDevice.setExtInfo(jsonObject.toString());*/
         } catch (Exception e) {
             log.error("【测报灯-解析识别状态 异常】disable:{}", e.getMessage());
         }
         // 更新设备基础信息数据库 mysql
-        iIotDeviceService.updateIotDevice(iotDevice);
+        iIotDeviceService.updateIotDevice(iotDeviceFromDb);
 
         // 更新设备数据信息到数据库 mongodb
-        String cId = iotDeviceOld.getTid();
+        String cId = iotDeviceFromDb.getTid();
         String devConfig = "";
         if (isCbd) {
-            devConfig = this.cbdData(ext, cId, iotDevice);
+            devConfig = this.cbdData(ext, cId, iotDeviceFromDb);
         } else {
-            devConfig = this.xctData(ext, cId, iotDevice);
+            devConfig = this.xctData(ext, cId, iotDeviceFromDb);
         }
 
         // 创建或更新设备配置信息
         if (StringUtils.isNotEmpty(devConfig)) {
-            iIotDeviceconfigService.createOrUpdateDevConfig(iotDeviceOld, devConfig, iotDevice.getDevUpdateddate());
+            iIotDeviceconfigService.createOrUpdateDevConfig(iotDeviceFromDb, devConfig, iotDeviceFromDb.getDevUpdateddate());
         }
 
         // 保存 设备最新数据 到redis
         iIotDevicelasteddataService.createOrUpdateDeviceLastedData(
-                ext, iotDeviceOld, iotDevice.getDevUpdateddate(), 60 * 60 * 24L);
+                ext, iotDeviceFromDb, iotDeviceFromDb.getDevUpdateddate(), 60 * 60 * 24L);
     }
 
-
-    public void cmdOffline(JSONObject ext) {
+    /**
+     *
+     * @param ext
+     * @param topic
+     * @param connectionId
+     */
+    public void cmdOffline(JSONObject ext,String topic,String connectionId) {
         log.debug("测报灯离线数据 {}", ext.toString());
-        String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
-        String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
-        String devCode = ext.getString("imei");
-        // 设备不存在 就不在处理
-        IotDevice iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-        if (iotDeviceOld == null) {
-            return;
+        String devId=mqttManager.getDevIdByTopic(connectionId,topic);
+        IotDevice iotDevice= iIotDeviceService.selectIotDeviceByDevBid(devId);
+        if (iotDevice == null) {
+            return ;
         }
         IotDevice newIotDevice = new IotDevice();
-        newIotDevice.setDevBid(iotDeviceOld.getDevBid());
+        newIotDevice.setDevBid(devId);
         newIotDevice.setDevStatus("0");
         newIotDevice.setDevOfflinedate(DateUtils.dateTimeNow());
         iIotDeviceService.updateIotDevice(newIotDevice);
     }
 
+    /**
+     *
+     * @param topic
+     * @param dataJson
+     * @param connectionId
+     * @return
+     * @throws Exception
+     */
     @Override
-    public Object receiveData(String topic, JSONObject dataJson) throws Exception {
+    public Object receiveData(String topic, JSONObject dataJson,String connectionId) throws Exception {
         log.info("【测报灯】收到的 设备上报数据 {}", dataJson.toString());
         // 接收设备上报数据后的处理逻辑
         String devUpdateddate = dataJson.getString("devUpdateddate");
@@ -308,7 +314,7 @@ public class CqCbdDeviceImpl implements ICbdDevice {
             devUpdateddate = DateUtils.dateTimeNow();
         }
         if (dataJson.containsKey("Image")) {
-            Object result = this.receivePicData(dataJson, devUpdateddate);
+            Object result = this.receivePicData(dataJson, devUpdateddate, topic,connectionId);
             log.info("测报灯实现类 接收数据:" + result);
         } else {
             String cmd = dataJson.getString("cmd");
@@ -323,21 +329,33 @@ public class CqCbdDeviceImpl implements ICbdDevice {
             }
 
             if (cmd.equals("data")) {
-                this.cmdData(ext, devUpdateddate);
+                this.cmdData(ext, devUpdateddate,topic,connectionId);
             } else if (cmd.equals("offline")) {
-                this.cmdOffline(ext);
+                this.cmdOffline(ext,topic,connectionId);
             }
         }
         return true;
     }
 
+    /**
+     *
+     * @param jsonObject
+     * @param devUpdateddate
+     * @param topic
+     * @param connectionId
+     * @return
+     */
     @Override
-    public Object receivePicData(JSONObject jsonObject, String devUpdateddate) {
-        String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
-        String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
+    public Object receivePicData(JSONObject jsonObject, String devUpdateddate ,String topic,String connectionId) {
+        String devId=mqttManager.getDevIdByTopic(connectionId,topic);
+        IotDevice iotDevice= iIotDeviceService.selectIotDeviceByDevBid(devId);
+        if (iotDevice == null) {
+            return "设备不存在 就不在处理";
+        }
+        String devtypeBid = iotDevice.getDevtypeBid();
         String deviceTypeId = jsonObject.getString("device_type_id");
         String devCode = jsonObject.getString("imei");
-        log.info("【解析测报灯数据】:devtypeBid:{},firmBid:{},deviceTypeId,{},devCode:{}", devtypeBid, firmBid, deviceTypeId, devCode);
+        log.info("【解析测报灯数据】:devtypeBid:{},deviceTypeId,{},devCode:{}", devtypeBid, deviceTypeId, devCode);
         boolean isCbd = true;
         if (StringUtils.isNotEmpty(deviceTypeId) && !deviceTypeId.equals("3")) {
             devtypeBid = "xct";
@@ -348,13 +366,6 @@ public class CqCbdDeviceImpl implements ICbdDevice {
             return "暂不处理吸虫塔设备";
         }
 
-        // 设备不存在 就不在处理
-        IotDevice iotDevice = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-
-        if (iotDevice == null) {
-            return "设备不存在 就不在处理";
-        }
-
         String cbdimgAddr = jsonObject.getString("Image");
         if (StringUtils.isEmpty(cbdimgAddr)) {
             return "图片地址不存在,则不处理";
@@ -365,23 +376,30 @@ public class CqCbdDeviceImpl implements ICbdDevice {
         return "测报灯指令上报的图片结果";
     }
 
-
+    /**
+     *
+     * @param jobjMsg
+     * @return
+     */
     @Override
     public boolean isDeviceProps(JSONObject jobjMsg) {
-
         if ("data".equals(jobjMsg.getString("cmd"))) {
             return true;
         }
         return false;
     }
 
+    /**
+     *
+     * @param topic
+     * @param jobjMsg
+     * @param connectionId
+     * @return
+     */
     @Override
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg) {
-        String devCode = topic.substring(topic.lastIndexOf("/") + 1);
-        String devtypeBid = mqttManager.getDeviceTypeBizId(connectionId);
-        String firmBid = mqttManager.getFirmBizId(connectionId);
-        //查询
-        IotDevice ret = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
+        String devId = mqttManager.getDevIdByTopic(connectionId,topic);
+        IotDevice ret = iIotDeviceService.selectIotDeviceByDevBid(devId);
         log.debug("查到了一个iotdevice {}", ret);
         return ret;
     }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IHikVisionEzvizDeviceImpl.java

@@ -80,7 +80,7 @@ public class IHikVisionEzvizDeviceImpl extends HttpDeviceAbstractImpl implements
 
     // 无用方法,暂不实现
     @Override
-    public Object receiveData(String topic, JSONObject cmdJson){
+    public Object receiveData(String topic, JSONObject cmdJson,String connectionId){
         return null;
     }
     // 无用方法,暂不实现
@@ -90,7 +90,7 @@ public class IHikVisionEzvizDeviceImpl extends HttpDeviceAbstractImpl implements
     }
     // 无用方法,暂不实现
     @Override
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg) {
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
         return null;
     }
 

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IotBigDataMonitorDeviceImpl.java

@@ -29,7 +29,7 @@ public class IotBigDataMonitorDeviceImpl implements Device {
     }
 
     @Override
-    public Object receiveData(String topic, JSONObject cmdJson) throws Exception {
+    public Object receiveData(String topic, JSONObject cmdJson,String connectionId) throws Exception {
         String devCode = cmdJson.getString("imei");
         String picUrl = cmdJson.getString("Image");
         IotDevice iotDevice = iotDeviceService.selectDeviceByDeviceServiceNameAndDevCode(ServiceNameConst.SERVICE_EZVIZ_MINITOR,devCode);
@@ -51,7 +51,7 @@ public class IotBigDataMonitorDeviceImpl implements Device {
     }
 
     @Override
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg) {
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
         return null;
     }
 }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YfQxzDeviceImpl.java

@@ -165,7 +165,7 @@ public class YfQxzDeviceImpl implements IYfQxzDevice {
 
 
     @Override
-    public Object receiveData(String topic, JSONObject dataJson) throws Exception {
+    public Object receiveData(String topic, JSONObject dataJson,String connectionId) throws Exception {
 
 
         if (TextUtils.isEmpty(topic)) {
@@ -468,7 +468,7 @@ public class YfQxzDeviceImpl implements IYfQxzDevice {
     }
 
     @Override
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg) {
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
         String devCode = topic.substring(topic.lastIndexOf("/") + 1);
         //查询
         IotDevice ret = findQxzsqzDevice(devCode);

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YfScdDeviceImpl.java

@@ -278,7 +278,7 @@ public class YfScdDeviceImpl implements IYfScdDevice {
      * @return
      */
     @Override
-    public Object receiveData(String topic, JSONObject dataJson) throws Exception {
+    public Object receiveData(String topic, JSONObject dataJson,String connectionId) throws Exception {
         log.info("杀虫灯实现类  处理收到的 设备上报数据 " + dataJson.toString());
         // 接收设备上报数据后的处理逻辑
         String cmd = dataJson.getString("cmd");
@@ -321,7 +321,7 @@ public class YfScdDeviceImpl implements IYfScdDevice {
      * @return
      */
     @Override
-    public IotDevice findIotDevice(String topic, JSONObject jobjMsg) {
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
         String devCode = topic.substring(topic.lastIndexOf("/") + 1);
         //String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);// 对于实现类复用的情况,目前这个写法会失效
         String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);

+ 2 - 0
src/main/java/com/yunfeiyun/agmp/iots/service/IIotDeviceService.java

@@ -131,5 +131,7 @@ public interface IIotDeviceService {
     public void updateIotDeviceBatch(List<IotDevice> iotDeviceList);
 
     List<String> getDeviceCodesByConectionId(String connectionId);
+
+    List<IotDevice> getDevicesByConectionId(String connectionId);
 }
 

+ 6 - 0
src/main/java/com/yunfeiyun/agmp/iots/service/impl/IotDeviceServiceImpl.java

@@ -324,6 +324,12 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
         return iotDeviceMapper.getDeviceCodesByConectionId(connectionId);
     }
 
+    @Override
+    public List<IotDevice> getDevicesByConectionId(String connectionId) {
+
+        return iotDeviceMapper.getDevicesByConectionId(connectionId);
+    }
+
 
     /**
      * 是否是属于需要忽略状态的设备

+ 3 - 0
src/main/resources/mapper/IotDeviceMapper.xml

@@ -512,6 +512,9 @@
     <select id="getDeviceCodesByConectionId" resultType="java.lang.String">
         select  devCode from IotDevice where devconnBid=#{connectionId} and devDelstatus='0'
     </select>
+    <select id="getDevicesByConectionId" resultType="com.yunfeiyun.agmp.iot.common.domain.IotDevice">
+                select  * from IotDevice where devconnBid=#{connectionId} and devDelstatus='0'
+    </select>
 </mapper>