Просмотр исходного кода

阶段性提交:定义mqtt topic通道的订阅、解除

yf_zn 1 год назад
Родитель
Сommit
793ff2d778

+ 169 - 116
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -1,36 +1,38 @@
 package com.yunfeiyun.agmp.iots.core.manager;
 
 import com.alibaba.fastjson2.JSONObject;
+import com.yunfeiyun.agmp.common.constant.ErrorCode;
 import com.yunfeiyun.agmp.common.utils.DateUtils;
+import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotFirmdev;
 import com.yunfeiyun.agmp.iot.common.domain.resvo.IotFirmdevResVo;
 import com.yunfeiyun.agmp.iot.common.enums.DevType;
-import com.yunfeiyun.agmp.iots.core.manager.HttpManager;
-import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
-import com.yunfeiyun.agmp.iots.device.common.Device;
-import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
+import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttConfig;
 import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttCore;
 import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
+import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.device.service.IIotDeviceService;
 import com.yunfeiyun.agmp.iots.device.service.IIotFirmdevService;
 import com.yunfeiyun.agmp.iots.service.CmdResultCheckService;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.http.util.TextUtils;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.util.*;
 
 
 /**
  * mqtt连接管理
+ * 须知:
+ * 1. 只要获取与mqtt通道有关的,硬通货就是connectionId
+ * 2. 只要和数据代码解析,复用代码类的硬通货就是serviceName
+ *
  */
 @Component
 @Slf4j
@@ -60,8 +62,13 @@ public class MqttManager {
     @Autowired
     IIotFirmdevService iotFirmdevService;
 
-    @Autowired
-    private HttpManager httpManager;
+    /**
+     * 强制对于已经废弃/逻辑需要重新梳理抛出异常
+     */
+    void throwDeprecatedMethod(){
+        throw new IotBizException(ErrorCode.FAILURE.getCode(),"方法已经废弃/逻辑需要重新梳理,重新更换");
+    }
+
 
     /**
      * 根据厂家配置构建MqttCore
@@ -79,16 +86,18 @@ public class MqttManager {
         String deviceTypeId = cfgYf.getDeviceTypeBizId();
         log.info("【初始化厂商 配置 】 {} {} 开始 ##########################", firmName, type);
         MqttCore mqttCore = new MqttCore();
-        String[] topics = deviceTopicService.getBatchTopic(serviceName, getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
+        //查询topics
+        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceTopicService.getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
         cfgYf.setSubTopic(topics);
         cfgYf.setServiceName(serviceName);
         log.info("【mqtt:{} 】  {} ", cfgYf.getDeviceType(), firmBizId);
         log.info("【初始化厂商 加载 配置】 {} {} {} ", firmName, type, topics);
         //这里从配置获取到一个链接的唯一标识,内部处理复用问题,目前来看,链接id只要相同就实现复用链接
+        //【这个id的生成用一个公共的方法】
         if("私有".equals("查询字段")){
             addPrivateConnectionMap("connectionId", mqttCore);
         }else{
-            addConmonConnectionMap("connectionId", mqttCore);
+            addCommonConnectionMap("connectionId", mqttCore);
         }
         // 这个checker待梳理
         mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
@@ -126,53 +135,57 @@ public class MqttManager {
         return cfgYf;
     }
 
+    //链接相关
+
 
     /**
-     * 初始化http
+     * 获取Publisher 根据服务名称
      *
-     * @param iotFirmdevResVo
-     * @param jsonConfig
+     * @param connectionId
+     * @return
      */
-    private void initHttp(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
-        httpManager.init(iotFirmdevResVo, jsonConfig);
+    public MqttPublisher getPublisherByService(String connectionId) {
+        MqttCore mqttCore = serviceMqttMap.get(connectionId);
+        if (mqttCore == null) {
+            throw new IotBizException(IotErrorCode.FAILURE.getCode(), connectionId + "暂无对应处理器");
+        }
+        return mqttCore.getMqttPublisher();
     }
 
+    /**
+     * 添加私有的链接:自行修改入参,链接的id生成
+     * @param connectionId
+     * @param mqttCore
+     */
+    public void addPrivateConnectionMap(String connectionId, MqttCore mqttCore) {
+        putConnection(connectionId,mqttCore);
+
+    }
 
     /**
-     * 根据不同厂家获取设备id,拼接topic
-     *
-     * @param firmBizId
-     * @return
+     * 添加公有的链接:自行修改入参,链接的id生成
+     * @param connectionId
+     * @param mqttCore
      */
-    public String[] getDeviceIdByFirmBizId(String firmBizId, String deviceTypeBizId) {
-        List<IotDevice> devices = iIotDeviceService.getDeviceIdByFirmBizId(firmBizId, deviceTypeBizId);
-        if (devices == null || devices.size() == 0) {
-            return null;
-        }
-        String[] deviceIds = new String[devices.size()];
-        int i = 0;
-        for (IotDevice d : devices) {
-            deviceIds[i] = d.getDevCode();
-            i++;
-        }
-        return deviceIds;
+    public void addCommonConnectionMap(String connectionId, MqttCore mqttCore) {
+        putConnection(connectionId,mqttCore);
+
     }
 
     /**
-     * 获取Publisher 根据服务名称
-     *
-     * @param serviceName
-     * @return
+     * 放链接
+     * @param connectionId
+     * @param mqttCore
      */
-    public MqttPublisher getPublisherByService(String serviceName) {
-        MqttCore mqttCore = serviceMqttMap.get(serviceName);
-        if (mqttCore == null) {
-            throw new IotBizException(IotErrorCode.FAILURE.getCode(), serviceName + "暂无对应处理器");
+    void putConnection(String connectionId, MqttCore mqttCore){
+        if (!serviceMqttMap.containsKey(connectionId)) {
+            // 维护 服务名字--> mqtt
+            serviceMqttMap.put(connectionId, mqttCore);
+        } else {
+            log.info("【MQTT】重复配置 connectionId:{} ", connectionId);
         }
-        return mqttCore.getMqttPublisher();
     }
 
-
     /**
      * 获取设备实现类
      *
@@ -188,98 +201,147 @@ public class MqttManager {
     }
 
     /**
-     * 根据实现类的名称获取对应的设备类型id
+     * 根据connectionId获取对应的设备类型id
      *
-     * @param serviceName
+     * @param connectionId
      * @return
      */
-    public String getDeviceTypeBizId(String serviceName) {
-        return getMqttCoreByService(serviceName).getServiceBizId();
+    public String getDeviceTypeBizId(String connectionId) {
+        return getMqttCoreByService(connectionId).getServiceBizId();
     }
 
     /**
-     * 根据实现类的名称获取对应的厂家id
+     * 根据connectionId获取对应的厂家id
      *
-     * @param serviceName
+     * @param connectionId
      * @return
      */
-    public String getFirmBizId(String serviceName) {
-        return getMqttCoreByService(serviceName).getFirmBizId();
+    public String getFirmBizId(String connectionId) {
+        return getMqttCoreByService(connectionId).getFirmBizId();
     }
 
     /**
-     * 根据实现类的名称获取对应mqtt核心类
+     * 根据connectionId获取对应mqtt核心类
      *
-     * @param serviceName
+     * @param connectionId
      * @return
      */
-    private MqttCore getMqttCoreByService(String serviceName) {
-        MqttCore mqttCore = getMqttCore(serviceName);
+    private MqttCore getMqttCoreByService(String connectionId) {
+        MqttCore mqttCore = getMqttCore(connectionId);
         if (mqttCore == null) {
-            throw new IotBizException(IotErrorCode.FAILURE.getCode(), "【获取MQTT对象失败】:" + serviceName + "对应不存在");
+            throw new IotBizException(IotErrorCode.FAILURE.getCode(), "【获取MQTT对象失败】:" + connectionId + "对应不存在");
         }
         return mqttCore;
     }
 
+
+
     /**
-     * 添加私有的链接:自行修改入参,链接的id生成
+     * 根据connectionId 获取mqttcore
+     *
      * @param connectionId
-     * @param mqttCore
+     * @return
      */
-    public void addPrivateConnectionMap(String connectionId, MqttCore mqttCore) {
-        putConnection(connectionId,mqttCore);
-
+    public MqttCore getMqttCore(String connectionId) {
+        return serviceMqttMap.get(connectionId);
     }
 
     /**
-     * 添加公有的链接:自行修改入参,链接的id生成
-     * @param connectionId
-     * @param mqttCore
+     * 打印已经加载的配置信息
      */
-    public void addConmonConnectionMap(String connectionId, MqttCore mqttCore) {
-        putConnection(connectionId,mqttCore);
+    public void showConfig() {
+        for (MqttCore mqttCore : serviceMqttMap.values()) {
+            log.info("【MQTT】已经记载的配置 FirmBizName:{} ,FirmBizId-devType:{}, ServiceType:{}, ServiceName:{} ,SubTopic:{}", mqttCore.getFirmName(), mqttCore.getServiceName(), mqttCore.getServiceType(), mqttCore.getSubTopic());
+        }
+    }
+
+    //订阅相关开始############################################################################
 
+    /**
+     * 批量订阅,根据一种链接的同一类serviceName的多个设备
+     * 如此分类的原因
+     * 1. 相同connectionId下serviceName不一定相同
+     * 2. 相同的serviceName的connectionId不一定相同
+     * @param serviceName
+     * @param deviceIds
+     * @throws MqttException
+     */
+    public void topicSubscribeBatchByType(String connectionId,String serviceName,List<String>  deviceIds) throws MqttException {
+        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceIds.toArray(new String[0]));
+        MqttCore mqttCore = getMqttCoreByService(connectionId);
+        MqttClient mqttClient = mqttCore.getClient();
+        mqttClient.subscribe(topics);
+        log.info("【设备订阅】【订阅】【重连】:设备:{} {} ", topics, mqttCore.getServiceType());
     }
 
     /**
-     * 放链接
+     * 单个设备订阅
      * @param connectionId
-     * @param mqttCore
+     * @param serviceName
+     * @param deviceId
+     * @throws MqttException
      */
-    void putConnection(String connectionId, MqttCore mqttCore){
-        if (!serviceMqttMap.containsKey(connectionId)) {
-            // 维护 服务名字--> mqtt
-            serviceMqttMap.put(connectionId, mqttCore);
-        } else {
-            log.info("【MQTT】重复配置 connectionId:{} ", connectionId);
-        }
+    public void subscribe(String connectionId,String serviceName,String deviceId) throws MqttException {
+        List<String>  deviceIds =new ArrayList<>();
+        deviceIds.add(deviceId);
+        topicSubscribeBatchByType(connectionId,serviceName,deviceIds);
+
     }
 
+
     /**
-     * 根据serviceName 获取mqttcore
+     * 批量取消订阅
      *
-     * @param serviceName
-     * @return
+     * @param
+     * @param
+     * @throws MqttException
      */
-    public MqttCore getMqttCore(String serviceName) {
-        return serviceMqttMap.get(serviceName);
+    public void topicSubscribeCancelByBatch(String connectionId,String serviceName,List<String>  deviceIds) throws MqttException {
+        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceIds.toArray(new String[0]));
+        MqttCore mqttCore = getMqttCoreByService(connectionId);
+        MqttClient mqttClient = mqttCore.getClient();
+        mqttClient.unsubscribe(topics);
     }
 
     /**
-     * 打印已经加载的配置信息
+     * 单个设备取消订阅
+     * @param connectionId
+     * @param serviceName
+     * @param deviceId
+     * @throws MqttException
      */
-    public void showConfig() {
-        for (MqttCore mqttCore : serviceMqttMap.values()) {
-            log.info("【MQTT】已经记载的配置 FirmBizName:{} ,FirmBizId-devType:{}, ServiceType:{}, ServiceName:{} ,SubTopic:{}", mqttCore.getFirmName(), mqttCore.getServiceName(), mqttCore.getServiceType(), mqttCore.getSubTopic());
-        }
+    public void subtopicSubscribeCancel(String connectionId,String serviceName,String deviceId) throws MqttException {
+        List<String>  deviceIds =new ArrayList<>();
+        deviceIds.add(deviceId);
+        topicSubscribeCancelByBatch(connectionId,serviceName,deviceIds);
     }
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+    //¥##########################废弃代码,有替换的替换,没替换的重新梳理
+
     /**
      * mqtt 手动动态订阅
-     *
+     * 废弃,换topicSubscribeBatchByType
      * @throws MqttException
+     *
      */
+    @Deprecated
     public void subscribeByBatch(Map<String, List<String>> serviceNameMap) throws MqttException {
+        throwDeprecatedMethod();
         for (Map.Entry<String, List<String>> entry : serviceNameMap.entrySet()) {
             String serviceName = entry.getKey();
             String[] topics = deviceTopicService.getBatchTopic(serviceName, entry.getValue().toArray(new String[0]));
@@ -289,27 +351,36 @@ public class MqttManager {
             log.info("【设备订阅】【订阅】【重连】:设备:{} {} ", topics, mqttCore.getServiceType());
         }
     }
-
+    @Deprecated
     public void subscribeByBatch(String[] topics, MqttCore mqttCore) throws MqttException {
+        throwDeprecatedMethod();
         MqttClient mqttClient = mqttCore.getClient();
         mqttClient.subscribe(topics);
         log.info("【设备订阅】【多个相同服务共用一个客户端 需要手动订阅】:设备:{} {} {}", topics, mqttCore.getServiceType(), mqttCore);
     }
 
+
+
+
     /**
-     * mqtt 手动动态取消订阅
-     *
-     * @param
-     * @param
-     * @throws MqttException
+     * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
      */
-    public void unsubscribeByBatch(Map<String, List<String>> serviceNameMap) throws MqttException {
-        for (Map.Entry<String, List<String>> entry : serviceNameMap.entrySet()) {
-            String serviceName = entry.getKey();
-            String[] topics = deviceTopicService.getBatchTopic(serviceName, entry.getValue().toArray(new String[0]));
-            MqttCore mqttCore = getMqttCoreByService(serviceName);
-            MqttClient mqttClient = mqttCore.getClient();
-            mqttClient.unsubscribe(topics);
+    @Deprecated
+    public void reSubscribe() {
+        throwDeprecatedMethod();
+        IotFirmdev iotFirmdev = new IotFirmdev();
+        List<IotFirmdevResVo> iotFirmdevResVos = iotFirmdevService.selectIotFirmdevList(iotFirmdev);
+        for (IotFirmdevResVo iotFirmdevResVo : iotFirmdevResVos) {
+            try {
+                JSONObject jsonConfig = JSONObject.parseObject(iotFirmdevResVo.getFirmdevCfg());
+                String type = jsonConfig.getString("type");
+                if (!Objects.equals(type, "mqtt")) {
+                    continue;
+                }
+                startSubscribe(iotFirmdevResVo, jsonConfig);
+            } catch (Exception e) {
+                log.error("【设备重新订阅】【订阅】【重连】 解析配置文件错误: \n" + iotFirmdevResVo.getFirmdevCfg() + "\n" + e);
+            }
         }
     }
 
@@ -319,6 +390,7 @@ public class MqttManager {
      * @param iotFirmdevResVo
      * @param jsonConfig
      */
+    @Deprecated
     private void startSubscribe(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
         log.info("【设备重新订阅】【订阅】【重连】:设备:{} {}", iotFirmdevResVo, jsonConfig);
         MqttConfig cfgYf = getMqttConfig(iotFirmdevResVo, jsonConfig);
@@ -364,23 +436,4 @@ public class MqttManager {
         }
     }
 
-    /**
-     * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
-     */
-    public void reSubscribe() {
-        IotFirmdev iotFirmdev = new IotFirmdev();
-        List<IotFirmdevResVo> iotFirmdevResVos = iotFirmdevService.selectIotFirmdevList(iotFirmdev);
-        for (IotFirmdevResVo iotFirmdevResVo : iotFirmdevResVos) {
-            try {
-                JSONObject jsonConfig = JSONObject.parseObject(iotFirmdevResVo.getFirmdevCfg());
-                String type = jsonConfig.getString("type");
-                if (!Objects.equals(type, "mqtt")) {
-                    continue;
-                }
-                startSubscribe(iotFirmdevResVo, jsonConfig);
-            } catch (Exception e) {
-                log.error("【设备重新订阅】【订阅】【重连】 解析配置文件错误: \n" + iotFirmdevResVo.getFirmdevCfg() + "\n" + e);
-            }
-        }
-    }
 }

+ 26 - 18
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/DeviceTopicService.java

@@ -7,6 +7,7 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iots.device.service.IIotDeviceService;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties;
 import org.springframework.stereotype.Service;
 
@@ -22,9 +23,34 @@ import java.util.List;
 @Service
 @Slf4j
 public class DeviceTopicService {
+
     @Resource
     private IIotDeviceService iotDeviceService;
 
+    @Autowired
+    IIotDeviceService iIotDeviceService;
+
+    /**
+     * 根据不同厂家获取设备id,拼接topic
+     *
+     * @param firmBizId
+     * @return
+     */
+    public String[] getDeviceIdByFirmBizId(String firmBizId, String deviceTypeBizId) {
+        List<IotDevice> devices = iIotDeviceService.getDeviceIdByFirmBizId(firmBizId, deviceTypeBizId);
+        if (devices == null || devices.size() == 0) {
+            return null;
+        }
+        String[] deviceIds = new String[devices.size()];
+        int i = 0;
+        for (IotDevice d : devices) {
+            deviceIds[i] = d.getDevCode();
+            i++;
+        }
+        return deviceIds;
+    }
+
+
     /**
      * 根据实现类获取topic
      *
@@ -82,20 +108,6 @@ public class DeviceTopicService {
     }
 
     private String[] getYfCbdBatchSubTopic(String[] deviceId) {
-        // 临时测试
-//        if (deviceId == null) {
-//            deviceId = new String[]{"861551058867599"};
-//        }
-//        String[] topics = new String[0];
-//        if (deviceId != null) {
-//            topics = new String[deviceId.length * 3];
-//            for (int i = 0; i < deviceId.length; i++) {
-//                topics[i * 3 + 0] = IotMqttConstant.YFCbdTopic.TOPIC_CBD_REPORT_PREFIX + deviceId[i];
-//                topics[i * 3 + 1] = IotMqttConstant.YFCbdTopic.TOPIC_CBD_OFFLINE_PREFIX + deviceId[i];
-//                topics[i * 3 + 2] = IotMqttConstant.YFCbdTopic.TOPIC_CBD_PHOTO_PREFIX + deviceId[i];
-//            }
-//        }
-//        return topics;
 
         String[] topicArray = {
                 IotMqttConstant.YFCbdTopic.TOPIC_CBD_REPORT_PREFIX,
@@ -108,10 +120,6 @@ public class DeviceTopicService {
     }
 
     private String[] getBySfBatchSubTopic(String[] deviceId) {
-        // 临时测试
-//        if (deviceId == null) {
-//            deviceId = new String[]{"1257"};
-//        }
 
         String[] topics = new String[0];
         if (deviceId != null) {