Explorar el Código

Merge branch 'master_connection'

# Conflicts:
#	src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java
#	src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java
#	src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java
#	src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java
liuyaowen hace 1 año
padre
commit
f3e609dfed

+ 13 - 0
src/main/java/com/yunfeiyun/agmp/iots/config/CmdAsyncTaskConfig.java

@@ -30,4 +30,17 @@ public class CmdAsyncTaskConfig {
         threadPoolTaskExecutor.initialize();
         return threadPoolTaskExecutor;
     }
+
+    @Bean("MqttTopicPublishExecutor")
+    public ThreadPoolTaskExecutor MqttTopicPublishExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setThreadNamePrefix("MqttTopicPublishExecutor-");//线程前缀
+        threadPoolTaskExecutor.setCorePoolSize( 2);//核心线程数
+        threadPoolTaskExecutor.setMaxPoolSize(4);//最大线程数
+        threadPoolTaskExecutor.setQueueCapacity(50);//等待队列
+        threadPoolTaskExecutor.setKeepAliveSeconds(30);//线程池维护线程所允许的空闲时间,单位为秒
+        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 线程池对拒绝任务(无线程可用)的处理策略
+        threadPoolTaskExecutor.initialize();
+        return threadPoolTaskExecutor;
+    }
 }

+ 32 - 23
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -2,8 +2,9 @@ package com.yunfeiyun.agmp.iots.core.manager;
 
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
-import com.yunfeiyun.agmp.common.utils.JSONUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceTypeLv1Enum;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
@@ -22,7 +23,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.RegEx;
 import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.List;
@@ -38,7 +38,7 @@ public class ConnectionManager {
     private HttpManager httpManager;
 
     @Autowired
-    BusinessCoreService businessCoreService;
+    private BusinessCoreService businessCoreService;
     @Resource
     private TypeCacheService typeCacheService;
     @Resource
@@ -56,35 +56,36 @@ public class ConnectionManager {
      */
     @PostConstruct
     public void init() throws MqttException {
-        //先把所有型号查出来
-        List<IotDeviceconnResVo> iotDeviceConnResVoList = businessCoreService.selectTosDevicetypeResVoList();
-        log.info("【初始化】设备型号 构建链接 协议: {}个", iotDeviceConnResVoList.size());
+        deviceconnCacheService.cleanCache();
+        //把所有私有连接信息查询出来
+        List<IotDeviceconnResVo> iotDeviceConnResVoList = businessCoreService.selectDevConnResVoList();
+        log.info("【初始化】链接协议加载: {}个", iotDeviceConnResVoList.size());
         for (IotDeviceconnResVo iotDeviceconnResVo : iotDeviceConnResVoList) {
             deviceconnCacheService.setCache(iotDeviceconnResVo);
-            log.info("【初始化】【开始】协议加载,连接名称:{},厂家:{},类型:{} ,配置:{}", iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getFirmName(), iotDeviceconnResVo.getDevtypeBid(), iotDeviceconnResVo.getDevconnConfig());
+            log.info("【初始化】【开始】链接协议加载,连接名称:{},厂家:{},类型:{} ,配置:{}", iotDeviceconnResVo.getDevconnName(),iotDeviceconnResVo.getFirmName(), iotDeviceconnResVo.getDevtypeBid(), iotDeviceconnResVo.getDevconnConfig());
             //将配置信息转换成jsonObject,这是个数组
             JSONArray jsonConfig = parseConfigJson(iotDeviceconnResVo.getDevconnConfig());
-
             if (jsonConfig == null) {
-                log.info("【初始化】协议加载,厂家:{},失败:配置为空", iotDeviceconnResVo.getFirmName());
+                log.info("【初始化】链接协议加载,厂家:{},失败:配置为空", iotDeviceconnResVo.getFirmName());
                 continue;
             }
             // 遍历多个配置
             for (int j = 0; j < jsonConfig.size(); j++) {
-                buildSingleMqttCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
+                buildConnCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
             }
-
         }
 
     }
 
+
+
     /**
      * 基于单个配置json创建连接,原子类方法,不可拆分
      *
      * @param iotDeviceconnResVo
      * @param jsonConfig
      */
-    void buildSingleMqttCoreByConfig(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) {
+    void buildConnCoreByConfig(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) {
         String type = jsonConfig.getString("type");
         if (TextUtils.isEmpty(type)) {
             log.info("【初始化】协议加载,厂家:{} type 为空:跳过", iotDeviceconnResVo.getFirmName());
@@ -182,7 +183,7 @@ public class ConnectionManager {
             JSONObject connItem = JSONObject.from(connItemObject);
             String type = connItem.getString("type");
             if (TextUtils.isEmpty(type)) {
-                log.info("【设备创建初始化连接】协议加载 设备:{} 的连接信息中的type 为空:跳过", iotDevice.getDevCode());
+                log.info("【设备:{} 删除】协议加载 连接信息中的type 为空:跳过", iotDevice.getDevCode());
                 return;
             }
             try {
@@ -199,12 +200,12 @@ public class ConnectionManager {
                         break;
                     }
                     default: {
-                        log.info("【设备:{} 创建初始化连接】其它类型:{},跳过", iotDevice.getDevCode(), type);
+                        log.info("【设备:{} 删除】其它类型:{},跳过", iotDevice.getDevCode(), type);
                         return;
                     }
                 }
             } catch (Exception e) {
-                log.error("【设备:{} 创建初始化连接】失败 异常信息:", iotDevice.getDevCode(), e);
+                log.error("【设备:{} 删除】失败 异常信息:", iotDevice.getDevCode(), e);
             }
         }
     }
@@ -223,8 +224,8 @@ public class ConnectionManager {
     public void createIotDeviceconnHandle(IotDeviceconn iotDeviceconn) {
         log.info("【初始化】【开始】协议加载,连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
         //将配置信息转换成jsonObject,这是个数组
-        JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
         deviceconnCacheService.setCache(iotDeviceconn);
+        JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
         if (jsonConfig == null) {
             log.error("【初始化】协议加载失败,连接名称:{},配置为空", iotDeviceconn.getDevconnName());
             return;
@@ -233,7 +234,7 @@ public class ConnectionManager {
         BeanUtils.copyProperties(iotDeviceconn, iotDeviceconnResVo);
         // 遍历多个配置
         for (int j = 0; j < jsonConfig.size(); j++) {
-            buildSingleMqttCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
+            buildConnCoreByConfig(iotDeviceconnResVo, jsonConfig.getJSONObject(j));
         }
     }
 
@@ -242,7 +243,6 @@ public class ConnectionManager {
      */
     public void deleteIotDeviceconnHandle(IotDeviceconn iotDeviceconn) {
         log.info("【连接信息删除】连接名称:{},类型:{} ,配置:{}", iotDeviceconn.getDevconnName(), iotDeviceconn.getDevtypeBid(), iotDeviceconn.getDevconnConfig());
-        deviceconnCacheService.deleteCache(iotDeviceconn);
         //将配置信息转换成jsonObject,这是个数组
         JSONArray jsonConfig = parseConfigJson(iotDeviceconn.getDevconnConfig());
         if (jsonConfig == null) {
@@ -263,7 +263,7 @@ public class ConnectionManager {
             try {
                 switch (type) {
                     case "mqtt":
-                        mqttManager.deleteMqttConnection(iotDeviceconnResVo.getDevconnBid());
+                        mqttManager.deleteMqttConnection(iotDeviceconnResVo);
                         break;
                     case "modbus-tcp": {
                         //先不处理,对接到了再梳理
@@ -283,6 +283,7 @@ public class ConnectionManager {
                 log.error("【连接信息{}删除失败】", iotDeviceconnResVo.getDevconnName(), e);
             }
         }
+        deviceconnCacheService.deleteCache(iotDeviceconn);
     }
 
     /**
@@ -300,7 +301,8 @@ public class ConnectionManager {
      * @throws Exception
      */
     private void mqttDeviceCreateHandle(IotDevice iotDevice) throws Exception {
-        String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(iotDevice.getDevtypeBid());
+        String serviceName = iotDeviceDictEnum.getServiceName();
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
         List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
         //改设备的所有topics
@@ -308,10 +310,13 @@ public class ConnectionManager {
             MqttTopicValue mqttTopicValue = new MqttTopicValue();
             mqttTopicValue.setDevCode(iotDevice.getDevCode());
             mqttTopicValue.setDevId(iotDevice.getDevBid());
+            mqttTopicValue.setServiceName(IotDeviceDictEnum.findServiceNameByDevTypeBid(iotDevice.getDevtypeBid()));
             mqttTopicValue.setTopic(s);
             mqttTopicValues.add(mqttTopicValue);
         }
-        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(), serviceName, mqttTopicValues);
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
+        String connId = mqttManager.getMqttConnectionId(iotDeviceconn);
+        mqttManager.topicSingleSubscribeDevice(connId, serviceName, mqttTopicValues);
     }
 
     private void httpDeviceCreateHandle(IotDevice iotDevice) {
@@ -325,18 +330,22 @@ public class ConnectionManager {
      * @throws Exception
      */
     private void mqttDeleteDeviceHandle(IotDevice iotDevice) throws Exception {
-        String serviceName = typeCacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+        IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(iotDevice.getDevtypeBid());
+        String serviceName = iotDeviceDictEnum.getServiceName();
         String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
         List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
         //改设备的所有topics
         for (String s : topics) {
             MqttTopicValue mqttTopicValue = new MqttTopicValue();
             mqttTopicValue.setDevCode(iotDevice.getDevCode());
+            mqttTopicValue.setServiceName(IotDeviceDictEnum.findServiceNameByDevTypeBid(iotDevice.getDevtypeBid()));
             mqttTopicValue.setDevId(iotDevice.getDevBid());
             mqttTopicValue.setTopic(s);
             mqttTopicValues.add(mqttTopicValue);
         }
-        mqttManager.topicBatchUnSubscribeDevices(iotDevice.getDevconnBid(), serviceName, mqttTopicValues);
+        IotDeviceconn iotDeviceconn = deviceconnCacheService.getIotDeviceConnByIotDevice(iotDevice);
+        String connId = mqttManager.getMqttConnectionId(iotDeviceconn);
+        mqttManager.topicBatchUnSubscribeDevices(connId, serviceName, mqttTopicValues);
     }
 
     private void httpDeleteDeviceHandle(IotDevice iotDevice) {

+ 117 - 63
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -1,12 +1,18 @@
 package com.yunfeiyun.agmp.iots.core.manager;
 
 import cn.hutool.core.util.ArrayUtil;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.constant.ErrorCode;
-import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceTypeLv1Enum;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.enums.IotDeviceconnTypeEnum;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
+import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
@@ -23,6 +29,7 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
@@ -41,12 +48,14 @@ public class MqttManager {
 
     @Autowired
     IIotDeviceService iIotDeviceService;
+    @Autowired
+    private DeviceconnCacheService deviceconnCacheService;
 
 
     /**
      * 实现类名称-->mqtt
      */
-    private Map<String, MqttCore> serviceMqttMap = new HashMap<>();
+    private Map<String, MqttCore> mqttCoreMap = new HashMap<>();
 
     /**
      * spring 自动注入
@@ -76,67 +85,56 @@ public class MqttManager {
      */
     public void buildMqttConnection(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig){
         try {
-            log.info("【开始构建MQTT连接】 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
-
             // 构建配置
             MqttConfig cfgYf = buildMqttConfig(iotDeviceconnResVo, jsonConfig);
-            //服务类Bean名称
-            String serviceName = cfgYf.getServiceName();
-            // 之前干嘛的待定,
-            String type = cfgYf.getType();
-            //厂家名称
-            String firmName = cfgYf.getFirmName();
-            log.info("【初始化厂商配置】 {} {} 开始", firmName, type);
-            // 创建MqttCore实例
-            MqttCore mqttCore = new MqttCore();
-
-            // 查询topics【需要实现:重新更改获取该型号下的设备】
-            String connectionId = iotDeviceconnResVo.getDevconnBid();
-            mqttCore.setConnectionId(connectionId);
+            // 生成Mqtt连接标识并写入缓存
+            String connectionId = generateMqttConnectionId(iotDeviceconnResVo,jsonConfig);
+            log.info("【开始构建MQTT连接】 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
             // 构建topic
-            List<IotDevice> devices = deviceTopicService.getDevicesByConectionId(connectionId);
-            log.info("【创建MqttCore实例】 mqttCore: {} connectionId:{} devSize:{}", mqttCore, connectionId, devices.size());
+            List<IotDevice> devices = deviceTopicService.getDevicesByDevConnBid(iotDeviceconnResVo.getDevconnBid());
             List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
             for (IotDevice iotDevice : devices) {
                 // 根据设备code获取topics
-                String[] topics = deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
+                String[] topics = deviceTopicService.getTopic(cfgYf.getServiceName(), iotDevice.getDevCode());
                 if (topics != null) {
                     for (String s : topics) {
                         MqttTopicValue mqttTopicValue = new MqttTopicValue();
                         mqttTopicValue.setDevCode(iotDevice.getDevCode());
+                        mqttTopicValue.setServiceName(IotDeviceDictEnum.findServiceNameByDevTypeBid(iotDevice.getDevtypeBid()));
                         mqttTopicValue.setDevId(iotDevice.getDevBid());
                         mqttTopicValue.setTopic(s);
                         mqttTopicValues.add(mqttTopicValue);
                     }
                 }
-
             }
-
             String[] topics = new String[mqttTopicValues.size()];
-            if (mqttTopicValues != null && !mqttTopicValues.isEmpty()) {
+            if (!mqttTopicValues.isEmpty()) {
                 for (int i = 0; i < mqttTopicValues.size(); i++) {
                     topics[i] = mqttTopicValues.get(i).getTopic();
                 }
+            }
+            MqttCore mqttCore = mqttCoreMap.get(connectionId);
+            if(null != mqttCore){
+                // MqttCore已经存在
+               mqttCore.subscribe(topics,cfgYf);
+               mqttCore.bindTopicToDeviceId(mqttTopicValues);
+            }else {
+                // 创建新的mqttCore
+                mqttCore = new MqttCore();
+                mqttCore.setConnectionId(connectionId);
+                mqttCore.bindTopicToDeviceId(mqttTopicValues);
                 cfgYf.setSubTopic(topics);
+                // 构建MqttCore
+                mqttCore.buildMqttCore(cfgYf);
+                addConnectionMap(connectionId, mqttCore);
             }
-            mqttCore.bindTopicToDeviceId(mqttTopicValues);
-            cfgYf.setServiceName(serviceName);
-            log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
-
-            // 处理连接ID的逻辑 IP+port+name
-            log.info("【添加公共连接】 connectionId: {}", connectionId);
-            addConnectionMap(connectionId, mqttCore);
-
-            // 构建MqttCore
-            mqttCore.buildMqttCore(cfgYf);
-            log.info("【成功构建MqttCore】 mqttCore: {}", mqttCore);
-            log.info("【完成构建MQTT连接】");
         } catch (Exception e) {
             log.error("【构建MqttCore失败】 异常信息: {} ,{}", e.getMessage(), e);
         }
     }
 
 
+
     /**
      * 【链接管理-删除】
      * 删除mqtt链接,底层统一调这个
@@ -144,11 +142,20 @@ public class MqttManager {
      * @param connectionId
      * @throws MqttException
      */
-    public void deleteMqttConnection(String connectionId) throws MqttException {
-        log.info("【开始删除MQTT连接】 connectionId: {}", connectionId);
-
+    public void deleteMqttConnection(IotDeviceconnResVo iotDeviceconnResVo) throws MqttException {
+        // 首先获取连接标识
+        String connectionId = deviceconnCacheService.getMqttConnectIdByDeviceConnBid(iotDeviceconnResVo.getDevconnBid());
+        // 删除相关缓存
+        deviceconnCacheService.deleteMqttConnectionId(iotDeviceconnResVo.getDevconnBid());
+        log.info("【删除MQTT连接信息】 获取连接标识 connectionId: {}", connectionId);
+        // 运行到这一步的时候,设备的连接信息都已解绑,相关订阅都已取消
+        if(deviceconnCacheService.mqttConnectionIdHasLink(connectionId)){
+            // 连接标识下仍有其他连接再使用该MqttCore
+            log.info("【删除MQTT连接信息】 连接仍在使用,停止释放连接");
+            return;
+        }
         // 从map中获取MqttCore
-        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        MqttCore mqttCore = mqttCoreMap.get(connectionId);
         if (mqttCore != null) {
             try {
                 // 尝试关闭MqttCore
@@ -165,7 +172,7 @@ public class MqttManager {
                 throw e;
             } finally {
                 // 从map中移除该MqttCore
-                serviceMqttMap.remove(connectionId);
+                mqttCoreMap.remove(connectionId);
                 log.info("【从映射中移除MQTT连接】 connectionId: {}", connectionId);
             }
         } else {
@@ -173,7 +180,6 @@ public class MqttManager {
             log.warn("【尝试关闭不存在的MQTT连接】 connectionId: {}", connectionId);
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), "关闭失败:" + connectionId + "暂无对应对象");
         }
-
         log.info("【完成删除MQTT连接】 connectionId: {}", connectionId);
     }
 
@@ -228,16 +234,16 @@ public class MqttManager {
     private void putConnection(String connectionId, MqttCore mqttCore) {
         log.info("【开始维护连接】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
 
-        if (!serviceMqttMap.containsKey(connectionId)) {
+        if (!mqttCoreMap.containsKey(connectionId)) {
             // 如果 connectionId 不存在于 map 中,则添加新的连接
-            serviceMqttMap.put(connectionId, mqttCore);
+            mqttCoreMap.put(connectionId, mqttCore);
             log.info("【新增连接】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
         } else {
             // 如果 connectionId 已存在于 map 中,则记录重复配置的日志
             log.info("【重复配置】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
         }
 
-        log.info("【完成维护连接】 connectionId: {}, 当前连接数: {}", connectionId, serviceMqttMap.size());
+        log.info("【完成维护连接】 connectionId: {}, 当前连接数: {}", connectionId, mqttCoreMap.size());
     }
 
 
@@ -249,9 +255,9 @@ public class MqttManager {
      * @return
      */
     @Deprecated
-    public MqttPublisher getPublisherByService(String connectionId) {
+    public MqttPublisher getPublisherByDevConnBid(String connectionId) {
         throwDeprecatedMethod("getPublisherByService废弃,发布消息直接通过提供connection 调用 publishMsg");
-        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        MqttCore mqttCore = mqttCoreMap.get(connectionId);
         if (mqttCore == null) {
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
         }
@@ -262,20 +268,19 @@ public class MqttManager {
      * 【操作-发布消息】
      * 发布消息
      *
-     * @param connectionId 链接id
+     * @param devConnBid 设备链接id
      * @param topic
      * @param message
      * @throws MqttException
      */
-    public void publishMsg(String connectionId, String topic, String message) throws MqttException {
+    public void publishMsg(String devConnBid, String topic, String message) throws MqttException {
+        String connectionId = deviceconnCacheService.getMqttConnectIdByDeviceConnBid(devConnBid);
         log.info("【开始发布消息】 connectionId: {}, topic: {}, message: {}", connectionId, topic, message);
-
-        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        MqttCore mqttCore = mqttCoreMap.get(connectionId);
         if (mqttCore == null) {
             log.error("【发布消息失败】 connectionId: {} 暂无对应处理器", connectionId);
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
         }
-
         try {
             mqttCore.publish(topic, message);
             log.info("【消息发布成功】 connectionId: {}, topic: {}, message: {}", connectionId, topic, message);
@@ -348,7 +353,7 @@ public class MqttManager {
             throw new IotBizException(IotErrorCode.FAILURE.getCode(), "【获取MQTT对象失败】:" + connectionId + "对应不存在");
         }
 
-        log.info("【获取MQTT核心类成功】 connectionId: {}, mqttCore: {}", connectionId, mqttCore);
+        log.info("【获取MQTT核心类成功】 connectionId: {}, mqttClientId: {}", connectionId, mqttCore.getClient().getClientId());
         return mqttCore;
     }
 
@@ -360,14 +365,14 @@ public class MqttManager {
      * @return
      */
     private MqttCore getMqttCore(String connectionId) {
-        return serviceMqttMap.get(connectionId);
+        return mqttCoreMap.get(connectionId);
     }
 
     /**
      * 打印已经加载的配置信息
      */
     public void showConfig() {
-        for (MqttCore mqttCore : serviceMqttMap.values()) {
+        for (MqttCore mqttCore : mqttCoreMap.values()) {
             log.info("【MQTT】已经记载的配置 FirmBizName:{} ,FirmBizId-devType:{}, ServiceType:{}, ServiceName:{} ,SubTopic:{}", mqttCore.getFirmName(), mqttCore.getServiceName(), mqttCore.getServiceType(), mqttCore.getSubTopic());
         }
     }
@@ -390,19 +395,17 @@ public class MqttManager {
      */
     public void topicBatchSubscribeDevices(String connectionId, String serviceName, List<MqttTopicValue> mqttTopicValues) throws MqttException {
         log.info("【开始批量订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, mqttTopicValues);
-
-        // 获取批量订阅的主题
-        String[] topics = new String[mqttTopicValues.size()];
-        for (int i = 0; i < topics.length; i++) {
-            topics[i] = mqttTopicValues.get(i).getTopic();
-        }
-        log.info("【获取批量订阅主题】 topics: {}", Arrays.toString(topics));
         // 获取MqttCore实例
         MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
-        log.info("【获取MqttCore实例】 mqttCore: {}", mqttCore);
+        log.info("【获取MqttCore实例】 mqttClientId: {}", mqttCore.getClient().getClientId());
+        // 获取批量订阅的主题
+        String[] topics = new String[mqttTopicValues.size()];
         // 执行订阅
         try {
-            mqttCore.subscribe(topics);
+            for (int i = 0; i < topics.length; i++) {
+                topics[i] = mqttTopicValues.get(i).getTopic();
+                mqttCore.getClient().subscribe(topics[i],0);
+            }
             mqttCore.bindTopicToDeviceId(mqttTopicValues);
             log.info("【批量订阅成功】 connectionId: {}, serviceName: {}, topics: {}, serviceType: {}", connectionId, serviceName, Arrays.toString(topics), mqttCore.getServiceType());
         } catch (MqttException e) {
@@ -496,4 +499,55 @@ public class MqttManager {
     public String getDevIdByTopic(String connectionId, String topic) {
         return getMqttCore(connectionId).getDevIdByTopic(topic);
     }
+
+    /**
+     * 获取连接信息,如果缓存中不存在的话,会根据jsonConfig进行解析
+     * @param iotDeviceconn 连接信息
+     * @param jsonConfig 连接信息配置
+     **/
+    public String getMqttConnectionId(IotDeviceconn iotDeviceconn,JSONObject jsonConfig){
+        if(null == iotDeviceconn){
+            throw new IotBizException(ErrorCode.INVALID_PARAMETER.getCode(),"连接信息为空");
+        }
+        return deviceconnCacheService.getMqttConnectIdByDeviceConnBid(iotDeviceconn.getDevconnBid());
+    }
+    /**
+     *
+     * */
+    public String generateMqttConnectionId(IotDeviceconn iotDeviceconn,JSONObject jsonConfig){
+        // 使用mqtt连接的ip+port+user
+        String ip = jsonConfig.getString("ip");
+        if(null != ip){
+            String[] ipItem = ip.split("\\.");
+            StringBuilder ipFormat = new StringBuilder();
+            for(String str : ipItem){
+                ipFormat.append(String.format("%03d",Long.parseLong(str)));
+            }
+            ip = ipFormat.toString();
+        }
+        String connectionId = ip+jsonConfig.getString("port")+jsonConfig.getString("username");
+        if(!IotDeviceconnTypeEnum.COMMON.getCode().equals(iotDeviceconn.getDevconnType())){
+            // 非通用连接
+            // 非通用连接信息会拼装租户标识
+            connectionId = connectionId + iotDeviceconn.getTid();
+        }
+        deviceconnCacheService.setMqttConnectionIdByConnBid(iotDeviceconn.getDevconnBid(),connectionId);
+        return connectionId;
+    }
+    /**
+     * 获取连接信息,如果缓存中不存在的话,会根据jsonConfig进行解析
+     * @param iotDeviceconn 连接信息
+     **/
+    public String getMqttConnectionId(IotDeviceconn iotDeviceconn){
+        JSONArray jsonArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
+        for(int i = 0;i<jsonArray.size();i++){
+            JSONObject configObject = jsonArray.getJSONObject(i);
+            if("mqtt".equals(configObject.getString("type"))){
+                return getMqttConnectionId(iotDeviceconn, configObject);
+            }
+        }
+        return null;
+    }
+
+
 }

+ 1 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/modal/MqttTopicValue.java

@@ -7,4 +7,5 @@ public class MqttTopicValue {
     private String devCode;
     private String devId;
     private String topic;
+    private String serviceName;
 }

+ 23 - 14
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -1,18 +1,19 @@
 package com.yunfeiyun.agmp.iots.core.mqtt.network;
 
+import com.yunfeiyun.agmp.common.utils.ip.IpUtils;
+import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.stream.Stream;
 
 /**
  * mqtt 核心类
@@ -48,6 +49,7 @@ public class MqttCore {
      * topic -> device
      */
     Map<String,String> topicToDevId=new HashMap<>();
+    Map<String,MqttTopicValue> topicValueMap = new HashMap<>();
 
     public String getConnectionId() {
         return connectionId;
@@ -71,8 +73,8 @@ public class MqttCore {
      *
      * @return
      */
-    public Device getDevice() {
-        return mqttConfig.getDevice();
+    public Device getDevice(String topic) {
+       return SpringUtils.getBean(topicValueMap.get(topic).getServiceName());
     }
 
     public String getServiceType() {
@@ -133,10 +135,9 @@ public class MqttCore {
         log.info("【初始化】MQTT CORE {},{},{},{},{},", mqttConfig.getDeviceType(), mqttConfig.getIp(), mqttConfig.getPort(), mqttConfig.getUsername(), mqttConfig.getPassword());
         this.mqttConfig = mqttConfig;
         buildClient();
-        connection();
         buildPublisher();
         buildSubscriber();
-
+        connection();
     }
 
     /**
@@ -147,9 +148,8 @@ public class MqttCore {
     private void buildClient() throws MqttException {
         //return new MqttClient("tcp://47.96.123.180:1883", UUID.randomUUID().toString().replace("-",""));
         String url = "tcp://" + mqttConfig.getIp() + ":" + mqttConfig.getPort();
-        String clientID = UUID.randomUUID().toString().replace("-", "");
-        this.mqttClient = new MqttClient(url, clientID, new MemoryPersistence());
-        log.info("【初始化】构建 MQTT client {}", this.mqttClient);
+        this.mqttClient = new MqttClient(url, connectionId+ IpUtils.getHostIp(), new MemoryPersistence());
+        log.info("【初始化】构建 MQTT clientId {}", this.mqttClient.getClientId());
     }
 
     /**
@@ -224,8 +224,16 @@ public class MqttCore {
      * @param topics
      * @throws MqttException
      */
-    public void subscribe(String[] topics) throws MqttException {
-        getClient().subscribe(topics);
+    public void subscribe(String[] topics,MqttConfig cfgYf) throws MqttException {
+        // 否则向mqttConfig中追加订阅
+        mqttConfig.setSubTopic(Stream.concat(Stream.of(topics),Stream.of(mqttConfig.getSubTopic())).toArray(String[]::new));
+        if(getClient().isConnected()){
+            // 如果此mqtt已经建立了连接,则正常订阅
+            for(String topic :topics){
+                getClient().subscribe(topic);
+                log.info("[MQTT] {} 连接已建立 追加订阅主题 {}", cfgYf.getDeviceType(), topic);
+            }
+        }
     }
 
     /**
@@ -247,8 +255,9 @@ public class MqttCore {
     public void bindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues){
         for(MqttTopicValue mqttTopicValue:mqttTopicValues){
             topicToDevId.put(mqttTopicValue.getTopic(),mqttTopicValue.getDevId());
-
+            topicValueMap.put(mqttTopicValue.getTopic(),mqttTopicValue);
         }
+
     }
 //    public void unBindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues){
 //        for(MqttTopicValue mqttTopicValue:mqttTopicValues){

+ 23 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttPublisher.java

@@ -1,9 +1,16 @@
 package com.yunfeiyun.agmp.iots.core.mqtt.network;
 
 
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.extra.spring.SpringUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 
 @Slf4j
 public class MqttPublisher {
@@ -12,13 +19,27 @@ public class MqttPublisher {
 
     private MqttClient mqttClient;
 
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+
+
     public void init(MqttCore mqttCore) {
         this.mqttCore = mqttCore;
         this.mqttClient = mqttCore.getClient();
+        this.threadPoolTaskExecutor = SpringUtil.getBean("MqttTopicPublishExecutor");
     }
 
     public void publish(String topic, String message) throws MqttException {
-        log.info("发MQTT消息,topic:{},message:{}",topic,message);
-        mqttClient.publish(topic, message.getBytes(), 0, false);
+         CompletableFuture.runAsync(() -> {
+            try {
+                log.info("发MQTT消息,topic:{},message:{}",topic,message);
+                mqttClient.publish(topic, message.getBytes(), 0, false);
+            }catch (Exception e){
+                log.error("发MQTT消息失败",e);
+            }
+        },threadPoolTaskExecutor);
+
     }
+
+
 }

+ 11 - 20
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -24,35 +24,23 @@ public class MqttSubscriber implements MqttCallbackExtended {
     public void init(MqttCore mqttCore) throws MqttException {
         this.mqttCore = mqttCore;
         this.mqttClient = mqttCore.getClient();
-        String[] topics = mqttCore.getSubTopic();
         mqttClient.setCallback(this);
-        if(topics!=null&&topics.length!=0){
-            for (String topic : topics) {
-                //TODO 测试设备乱发离线指令,暂时不订阅它
-                if ("/yfkj/cbd/offline/861551058867599".equals(topic)) {
-                    continue;
-                }
-                mqttClient.subscribe(topic);
-                log.debug("mqtt 主题已订阅 {},{}", mqttCore.getServiceType(), topic);
-            }
-        }
-
     }
 
     @Override
     public void connectionLost(Throwable throwable) {
         // 连接丢失时的处理逻辑
-        log.info("[MQTT] 连接断开,正在尝试重连...");
-        log.info(mqttClient.getServerURI());
+        log.info("[MQTT] 链接标识:{} URL:{} 连接断开,正在尝试重连...",mqttClient.getClientId(),mqttClient.getServerURI());
+
     }
 
     /**
      * 实际接收消息
      */
     @Override
-    //@Async("asyncServiceExecutor")
     public void messageArrived(String topic, MqttMessage mqttMessage) {
         try {
+
             String msgContent = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
             log.info("【上报数据:收到】收到mqtt消息:" + topic + ", " + msgContent);
             JSONObject obj = null;
@@ -63,7 +51,7 @@ public class MqttSubscriber implements MqttCallbackExtended {
                 return;
             }
 
-            Device device = mqttCore.getDevice();
+            Device device = mqttCore.getDevice(topic);
             if (null == device) {
                 // 当收到mqtt消息无对应的设备类型时,则丢弃消息。
                 IotException.UNKNOWN_DEVICE.throwException();
@@ -96,12 +84,15 @@ public class MqttSubscriber implements MqttCallbackExtended {
      */
     @Override
     public void connectComplete(boolean reconnect, String serverURI) {
-        log.info("[MQTT] 连接断开,重连成功..." + serverURI);
-
+        String loggerMsg = "初始化连接成功";
+        if(reconnect){
+            loggerMsg = "连接断开,重连成功";
+        }
+        log.info("[MQTT] clientId :{} {}...{}",this.mqttClient.getClientId() ,loggerMsg, serverURI);
         String[] topics = mqttCore.getSubTopic();
         if(topics==null||topics.length==0){
             mqttClient.setCallback(this);
-            log.info("[MQTT] 连接断开,重连成功 无topic ");
+            log.info("[MQTT] clientId :{} {} 无topic ",this.mqttClient.getClientId(),loggerMsg);
             return;
         }
         for (String topic : topics) {
@@ -111,7 +102,7 @@ public class MqttSubscriber implements MqttCallbackExtended {
             }
             try {
                 mqttClient.subscribe(topic);
-                log.info("[MQTT] 连接断开,重连成功 重新订阅主题 {},{}", mqttCore.getServiceType(), topic);
+                log.info("[MQTT] clientId :{} {} {} 订阅主题 {}",this.mqttClient.getClientId(),mqttCore.getServiceType(), loggerMsg, topic);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }

+ 1 - 1
src/main/resources/application-dev.yml

@@ -248,7 +248,7 @@ policy:
           qcloudSecretKey:
           qcloudBucketName: yunfei-agm
   monitor:
-    uploadType: localSpace
+    uploadType: cloud
   # table(表), cache(缓存)
   water: table
   queue: