ソースを参照

阶段提交:异常处理

yf_zn 1 年間 前
コミット
56e759dddd

+ 59 - 60
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -59,8 +59,6 @@ public class MqttManager {
     private Map<String, Device> deviceHandlerMap;
 
 
-
-
     /**
      * 强制对于已经废弃/逻辑需要重新梳理抛出异常
      */
@@ -80,66 +78,66 @@ public class MqttManager {
      * @param jsonConfig
      * @throws MqttException
      */
-    public void buildMqttConnection(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) throws MqttException {
-        log.info("【开始构建MQTT连接】 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}",iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
+    public void buildMqttConnection(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig){
+        try {
+            log.info("【开始构建MQTT连接】 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
+
+            // 构建配置
+            MqttConfig cfgYf = buildMqttConfig(iotDeviceconnResVo, jsonConfig);
+            //服务类Bean名称
+            String serviceName = cfgYf.getServiceName();
+            // 之前干嘛的待定,
+            String type = cfgYf.getType();
+            //厂家名称
+            String firmName = cfgYf.getFirmName();
+            log.info("【初始化厂商配置】 {} {} 开始", firmName, type);
+            // 创建MqttCore实例
+            MqttCore mqttCore = new MqttCore();
+
+            // 查询topics【需要实现:重新更改获取该型号下的设备】
+            String connectionId = iotDeviceconnResVo.getDevconnBid();
+            mqttCore.setConnectionId(connectionId);
+            // 构建topic
+            List<IotDevice> devices = deviceTopicService.getDevicesByConectionId(connectionId);
+            log.info("【创建MqttCore实例】 mqttCore: {} connectionId:{} devSize:{}", mqttCore, connectionId, devices.size());
+            List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
+            for (IotDevice iotDevice : devices) {
+                // 根据设备code获取topics
+                String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
+                if (topics != null) {
+                    for (String s : topics) {
+                        MqttTopicValue mqttTopicValue = new MqttTopicValue();
+                        mqttTopicValue.setDevCode(iotDevice.getDevCode());
+                        mqttTopicValue.setDevId(iotDevice.getDevBid());
+                        mqttTopicValue.setTopic(s);
+                        mqttTopicValues.add(mqttTopicValue);
+                    }
+                }
 
-        // 构建配置
-        MqttConfig cfgYf = buildMqttConfig(iotDeviceconnResVo, jsonConfig);
-        //服务类Bean名称
-        String serviceName = cfgYf.getServiceName();
-        // 之前干嘛的待定,
-        String type = cfgYf.getType();
-        //厂家名称
-        String firmName = cfgYf.getFirmName();
-        log.info("【初始化厂商配置】 {} {} 开始", firmName, type);
-        // 创建MqttCore实例
-        MqttCore mqttCore = new MqttCore();
-
-        // 查询topics【需要实现:重新更改获取该型号下的设备】
-        String connectionId=iotDeviceconnResVo.getDevconnBid();
-        mqttCore.setConnectionId(connectionId);
-        // 构建topic
-        List<IotDevice> devices= deviceTopicService.getDevicesByConectionId(connectionId);
-        log.info("【创建MqttCore实例】 mqttCore: {} connectionId:{} devSize:{}", mqttCore,connectionId,devices.size());
-        List<MqttTopicValue> mqttTopicValues=new ArrayList<>();
-        for(IotDevice iotDevice:devices){
-            // 根据设备code获取topics
-            String[] topics= deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
-            for(String s: topics){
-                MqttTopicValue mqttTopicValue=new MqttTopicValue();
-                mqttTopicValue.setDevCode(iotDevice.getDevCode());
-                mqttTopicValue.setDevId(iotDevice.getDevBid());
-                mqttTopicValue.setTopic(s);
-                mqttTopicValues.add(mqttTopicValue);
             }
-        }
 
-        String[] topics=new String[mqttTopicValues.size()];
-        if(mqttTopicValues!=null&&!mqttTopicValues.isEmpty()){
-            for(int i=0;i<mqttTopicValues.size();i++){
-                topics[i]=mqttTopicValues.get(i).getTopic();
+            String[] topics = new String[mqttTopicValues.size()];
+            if (mqttTopicValues != null && !mqttTopicValues.isEmpty()) {
+                for (int i = 0; i < mqttTopicValues.size(); i++) {
+                    topics[i] = mqttTopicValues.get(i).getTopic();
+                }
+                cfgYf.setSubTopic(topics);
             }
-           cfgYf.setSubTopic(topics);
-        }
-        mqttCore.bindTopicToDeviceId(mqttTopicValues);
-        cfgYf.setServiceName(serviceName);
-        log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
+            mqttCore.bindTopicToDeviceId(mqttTopicValues);
+            cfgYf.setServiceName(serviceName);
+            log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
 
-        // 处理连接ID的逻辑 IP+port+name
-        log.info("【添加公共连接】 connectionId: {}",connectionId);
-        addConnectionMap(connectionId, mqttCore);
+            // 处理连接ID的逻辑 IP+port+name
+            log.info("【添加公共连接】 connectionId: {}", connectionId);
+            addConnectionMap(connectionId, mqttCore);
 
-        // 构建MqttCore
-        try {
+            // 构建MqttCore
             mqttCore.buildMqttCore(cfgYf);
             log.info("【成功构建MqttCore】 mqttCore: {}", mqttCore);
-        } catch (MqttException e) {
-            log.error("【构建MqttCore失败】 异常信息: {}", e.getMessage(), e);
-            throw e; // 重新抛出异常,以便上层处理
+            log.info("【完成构建MQTT连接】");
+        } catch (Exception e) {
+            log.error("【构建MqttCore失败】 异常信息: {} ,{}", e.getMessage(), e);
         }
-
-        log.info("【初始化厂商配置】 {} {} 完成", firmName, type);
-        log.info("【完成构建MQTT连接】");
     }
 
 
@@ -160,7 +158,7 @@ public class MqttManager {
                 // 尝试关闭MqttCore
                 log.info("【尝试关闭MQTT连接】 connectionId: {}", connectionId);
                 String[] topics = mqttCore.getSubTopic();
-                if(!ArrayUtil.isEmpty(topics)){
+                if (!ArrayUtil.isEmpty(topics)) {
                     mqttCore.unsubscribe(topics);
                 }
                 mqttCore.close();
@@ -399,8 +397,8 @@ public class MqttManager {
 
         // 获取批量订阅的主题
         String[] topics = new String[mqttTopicValues.size()];
-        for(int i=0;i<topics.length;i++){
-            topics[i]=mqttTopicValues.get(i).getTopic();
+        for (int i = 0; i < topics.length; i++) {
+            topics[i] = mqttTopicValues.get(i).getTopic();
         }
         log.info("【获取批量订阅主题】 topics: {}", Arrays.toString(topics));
         // 获取MqttCore实例
@@ -428,7 +426,7 @@ public class MqttManager {
      * @param mqttTopicValues
      * @throws MqttException
      */
-    public void topicSingleSubscribeDevice(String connectionId, String serviceName, List<MqttTopicValue>  mqttTopicValues) throws MqttException {
+    public void topicSingleSubscribeDevice(String connectionId, String serviceName, List<MqttTopicValue> mqttTopicValues) throws MqttException {
         log.info("【开始单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValues);
         // 调用批量订阅方法
         topicBatchSubscribeDevices(connectionId, serviceName, mqttTopicValues);
@@ -449,7 +447,7 @@ public class MqttManager {
         log.info("【开始批量取消订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, mqttTopicValues);
 
         // 获取批量取消订阅的主题
-        String[] topics = deviceTopicService.getBatchTopic(serviceName,mqttTopicValues);
+        String[] topics = deviceTopicService.getBatchTopic(serviceName, mqttTopicValues);
         log.info("【获取批量取消订阅主题】 topics: {}", Arrays.toString(topics));
 
         // 获取MqttCore实例
@@ -546,7 +544,7 @@ public class MqttManager {
         }
 
         String[] deviceId = iotDeviceList.stream().map(IotDevice::getDevBid).toArray(String[]::new);
-        String[] topics =null ;// deviceTopicService.getBatchTopic(serviceName, deviceId);
+        String[] topics = null;// deviceTopicService.getBatchTopic(serviceName, deviceId);
         if (topics == null || topics.length == 0) {
             log.warn("【设备重新订阅】【订阅】【重连】:设备:{} 没有找到topic", serviceName);
             return;
@@ -569,11 +567,12 @@ public class MqttManager {
 
     /**
      * 根据topic 获取id
+     *
      * @param connectionId
      * @param topic
      * @return
      */
-    public String getDevIdByTopic(String connectionId,String topic){
+    public String getDevIdByTopic(String connectionId, String topic) {
         return getMqttCore(connectionId).getDevIdByTopic(topic);
     }
 }