Explorar o código

新增 mqtt订阅缓存更新功能

zhaiyifei hai 1 ano
pai
achega
a35d89d7f2

+ 28 - 6
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -1,19 +1,18 @@
 package com.yunfeiyun.agmp.iots.core.manager;
 
 import cn.hutool.core.util.ArrayUtil;
-import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.constant.ErrorCode;
 import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
 import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceTypeLv1Enum;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.enums.IotDeviceconnTypeEnum;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
+import com.yunfeiyun.agmp.iot.common.service.IotMqttTopicCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
@@ -23,14 +22,13 @@ import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
@@ -52,6 +50,12 @@ public class MqttManager {
     @Autowired
     private DeviceconnCacheService deviceconnCacheService;
 
+    @Resource
+    private RedisTemplate redisTemplate;
+
+    @Autowired
+    private IotMqttTopicCacheService iotMqttTopicCacheService;
+
 
     /**
      * 实现类名称-->mqtt
@@ -118,7 +122,8 @@ public class MqttManager {
             if(null != mqttCore){
                 // MqttCore已经存在
                 mqttCore.bindTopicToDeviceId(mqttTopicValues);
-                mqttCore.subscribe(topics,cfgYf);
+                mqttCore.
+                        subscribe(topics,cfgYf);
             }else {
                 // 创建新的mqttCore
                 mqttCore = new MqttCore();
@@ -129,8 +134,10 @@ public class MqttManager {
                 mqttCore.buildMqttCore(cfgYf);
                 addConnectionMap(connectionId, mqttCore);
             }
-            log.info("【开始构建MQTT连接】构建完成 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
 
+            refreshTopicCache();
+
+            log.info("【开始构建MQTT连接】构建完成 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
         } catch (Exception e) {
             log.error("【构建MqttCore失败】 异常信息: {} ,{}", e.getMessage(), e);
         }
@@ -176,6 +183,7 @@ public class MqttManager {
             } finally {
                 // 从map中移除该MqttCore
                 mqttCoreMap.remove(connectionId);
+                refreshTopicCache();
                 log.info("【从映射中移除MQTT连接】 connectionId: {}", connectionId);
             }
         } else {
@@ -433,6 +441,8 @@ public class MqttManager {
         // 调用批量订阅方法
         topicBatchSubscribeDevices(connectionId, serviceName, mqttTopicValues);
 
+        refreshTopicCache();
+
         log.info("【完成单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValues);
     }
 
@@ -465,6 +475,8 @@ public class MqttManager {
             throw e;
         }
 
+        refreshTopicCache();
+
         log.info("【完成批量取消订阅】 connectionId: {}, serviceName: {}, topics: {}", connectionId, serviceName, Arrays.toString(topics));
     }
 
@@ -557,4 +569,14 @@ public class MqttManager {
 
 
     }
+
+    private void refreshTopicCache(){
+        Set<String> topicSet = new HashSet<>();
+        for(Map.Entry<String, MqttCore> entry : mqttCoreMap.entrySet()){
+            MqttCore mqttCore = entry.getValue();
+            Map<String, String> topicToDevId = mqttCore.getTopicToDevId();
+            topicSet.addAll(topicToDevId.keySet());
+        }
+        iotMqttTopicCacheService.refreshTopicCache(topicSet);
+    }
 }

+ 7 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -9,12 +9,13 @@ import com.yunfeiyun.agmp.iots.device.common.Device;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
 /**
@@ -284,4 +285,8 @@ public class MqttCore {
     public String getDevIdByTopic(String topic) {
         return topicToDevId.get(topic);
     }
+
+    public Map<String, String> getTopicToDevId() {
+        return topicToDevId;
+    }
 }