Explorar el Código

redis业务管理:优化为内存管理,common的类拆分iotm和iots

yf_zn hace 1 año
padre
commit
5cf89e4a7b

+ 162 - 0
src/main/java/com/yunfeiyun/agmp/iots/cache/DeviceconnCacheService.java

@@ -0,0 +1,162 @@
+package com.yunfeiyun.agmp.iots.cache;
+
+import com.yunfeiyun.agmp.common.enums.RedisCacheKey;
+import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
+import com.yunfeiyun.agmp.iots.service.BusinessCoreService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@Service
+public class DeviceconnCacheService {
+
+    /**
+     * 用于存储设备连接信息,键为设备连接标识符(devconnBid),值为IotDeviceconn对象。模拟了原先在Redis中保存设备连接信息的功能。
+     */
+    private final Map<String, IotDeviceconn> deviceConnMap = new ConcurrentHashMap<>();
+
+    /**
+     * 用于存储设备连接与MQTT连接标识之间的映射关系,键为设备连接标识符(devconnBid),值为MQTT连接标识(connectionId)。替代了原本在Redis中通过键值对保存MQTT连接ID的方式。
+     */
+    private final Map<String, String> mqttConnectIdMap = new ConcurrentHashMap<>();
+
+    /**
+     * 用于存储MQTT连接标识与一组设备连接标识符之间的映射关系。键为MQTT连接标识(connectionId),值为包含相关设备连接标识符(devconnBid)的集合。这个Map追踪哪些设备连接与特定的MQTT连接有关联,以便于管理和删除操作。
+     */
+    private final Map<String, Set<String>> mqttDeviceConnBidListMap = new ConcurrentHashMap<>();
+
+    /**
+     * 用于存储HTTP通用连接信息,键为设备类型代码(devtypeCode),值为与此类型相关的设备连接标识符(devconnBid)的集合。
+     * 此Map模拟了原系统中基于设备类型代码管理HTTP连接的方式,允许快速查找和管理属于同一类型的设备连接。
+     */
+    private final Map<String, Set<String>> httpDeviceConnBidListMap = new ConcurrentHashMap<>();
+
+    @Autowired
+    private BusinessCoreService businessCoreService;
+
+    /**
+     * 将设备连接信息保存到内存中。
+     *
+     * @param iotDeviceconn 设备连接对象
+     */
+    public void setCache(IotDeviceconn iotDeviceconn) {
+        log.info("【设备连接缓存】保存设备连接信息缓存,设备链接信息标识:{}", iotDeviceconn.getDevconnBid());
+        deviceConnMap.put(iotDeviceconn.getDevconnBid(), iotDeviceconn);
+    }
+
+    /**
+     * 从内存中删除指定的设备连接信息。
+     *
+     * @param iotDeviceconn 设备连接对象
+     */
+    public void deleteCache(IotDeviceconn iotDeviceconn) {
+        log.info("【设备连接缓存】删除设备连接信息缓存,设备链接信息标识:{}", iotDeviceconn.getDevconnBid());
+        deviceConnMap.remove(iotDeviceconn.getDevconnBid());
+    }
+
+    /**
+     * 根据设备连接标识符获取设备连接信息。如果缓存中不存在,则尝试从数据库中查找并更新缓存。
+     *
+     * @param devconnBid 设备连接标识符
+     * @return IotDeviceconn 设备连接对象
+     */
+    public IotDeviceconn getIotDeviceConnByDevconnBid(String devconnBid) {
+        log.info("【设备连接缓存】查询设备连接信息,设备连接标识为:{}", devconnBid);
+        IotDeviceconn iotDeviceconn = deviceConnMap.get(devconnBid);
+        if (iotDeviceconn == null) {
+            log.error("【设备连接缓存】查询设备连接信息失败,设备连接标识为:{}", devconnBid);
+            iotDeviceconn = businessCoreService.selectDevConnByConnId(devconnBid);
+            if (iotDeviceconn == null) {
+                throw new IotBizException(IotErrorCode.INVALID_DEVICE_CONN_BID);
+            }
+            setCache(iotDeviceconn);
+        }
+        return iotDeviceconn;
+    }
+
+    /**
+     * 根据IotDevice对象获取设备连接信息。
+     *
+     * @param iotDevice 设备对象
+     * @return IotDeviceconn 设备连接对象
+     */
+    public IotDeviceconn getIotDeviceConnByIotDevice(IotDevice iotDevice) {
+        return this.getIotDeviceConnByDevconnBid(iotDevice.getDevconnBid());
+    }
+
+    /**
+     * 【mqtt】通过设备连接标识获取mqtt链接标识
+     */
+    public String getMqttConnectIdByDeviceConnBid(String devconnBid) {
+        log.error("【设备连接缓存】查询mqtt链接标识,设备连接标识为:{}", devconnBid);
+        String connectId = mqttConnectIdMap.get(devconnBid);
+        if (connectId == null) {
+            log.error("【设备连接缓存】查询mqtt链接标识失败,设备连接标识为:{}", devconnBid);
+            throw new IotBizException(IotErrorCode.INVALID_DEVICE_CONN_BID);
+        }
+        return connectId;
+    }
+
+    /**
+     * 【mqtt】关联设备连接标识和mqtt链接标识
+     */
+    public void setMqttConnectionIdByConnBid(String devconnBid, String connectionId) {
+        mqttConnectIdMap.put(devconnBid, connectionId);
+        mqttDeviceConnBidListMap.computeIfAbsent(connectionId, k -> ConcurrentHashMap.newKeySet()).add(devconnBid);
+    }
+
+    /**
+     * 【mqtt】删除mqtt链接标识
+     */
+    public void deleteMqttConnectionId(String devconnBid) {
+        String connectionId = mqttConnectIdMap.remove(devconnBid);
+        if (connectionId != null && mqttDeviceConnBidListMap.containsKey(connectionId)) {
+            mqttDeviceConnBidListMap.get(connectionId).remove(devconnBid);
+        }
+    }
+
+    /**
+     * 【mqtt】判定mqtt链接标识是否还有绑定的设备连接标识
+     */
+    public boolean mqttConnectionIdHasLink(String connectionId) {
+        return mqttDeviceConnBidListMap.containsKey(connectionId) && !mqttDeviceConnBidListMap.get(connectionId).isEmpty();
+    }
+
+    /**
+     * 【http】保存租户的http通用连接信息
+     */
+    public void setHttpCommonConnectionByDevtypeCode(String devconnBid, String devtypeCode) {
+        httpDeviceConnBidListMap.computeIfAbsent(devtypeCode, k -> ConcurrentHashMap.newKeySet()).add(devconnBid);
+    }
+
+    /**
+     * 【http】删除租户的http通用连接信息
+     */
+    public void deleteHttpCommonConnectionByDevtypeCode(String devconnBid, String devtypeCode) {
+        if (httpDeviceConnBidListMap.containsKey(devtypeCode)) {
+            httpDeviceConnBidListMap.get(devtypeCode).remove(devconnBid);
+        }
+    }
+
+    /**
+     * 【http】判定http通用连接是否还有绑定的设备连接
+     */
+    public boolean httpConnectionBidHasLink(String devtypeCode) {
+        return httpDeviceConnBidListMap.containsKey(devtypeCode) && !httpDeviceConnBidListMap.get(devtypeCode).isEmpty();
+    }
+
+    public void cleanCache() {
+        deviceConnMap.clear();
+        mqttConnectIdMap.clear();
+        mqttDeviceConnBidListMap.clear();
+        httpDeviceConnBidListMap.clear();
+    }
+}

+ 108 - 0
src/main/java/com/yunfeiyun/agmp/iots/cache/TypeCacheService.java

@@ -0,0 +1,108 @@
+package com.yunfeiyun.agmp.iots.cache;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.yunfeiyun.agmp.common.enums.RedisCacheKey;
+import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
+import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
+import com.yunfeiyun.agmp.iots.service.BusinessCoreService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+/**
+ * 统一维护type(lv2)的缓存,增删改查。
+ */
+@Service
+public class TypeCacheService {
+
+    // RedisTemplate用于操作Redis缓存
+    @Resource
+    private RedisTemplate redisTemplate;
+
+    // BusinessCoreService用于数据库查询等业务逻辑处理
+    @Autowired
+    private BusinessCoreService businessCoreService;
+
+    /**
+     * 将设备类型信息保存到缓存中。
+     * @param tosDevicetype 设备类型对象
+     */
+    public void setCache(TosDevicetype tosDevicetype) {
+        redisTemplate.opsForValue().set(getKey(RedisCacheKey.IOT_DEVICE_TYPE) + tosDevicetype.getDevtypeBid(), tosDevicetype);
+    }
+
+    /**
+     * 从缓存中删除指定设备类型的记录。
+     * @param tosDevicetype 设备类型对象
+     */
+    public void deleteCache(TosDevicetype tosDevicetype) {
+        redisTemplate.delete(getKey(RedisCacheKey.IOT_DEVICE_TYPE) + tosDevicetype.getDevtypeBid());
+    }
+
+    /**
+     * 根据设备类型编码获取缓存中的设备类型对象。如果缓存中不存在,则尝试从数据库中查找并更新缓存。
+     * @param devtypeCode 设备类型编码
+     * @return TosDevicetype 设备类型对象
+     */
+    public TosDevicetype getCacheObjectByDevTypeCode(String devtypeCode) {
+        Object tosDevicetype = redisTemplate.opsForValue().get(getKey(RedisCacheKey.IOT_DEVICE_TYPE) + devtypeCode);
+        if (null == tosDevicetype) {
+            // 如果缓存中找不到,尝试从数据库中获取
+            tosDevicetype = businessCoreService.selectTosDevicetypeByDevtypeCode(devtypeCode);
+            if (tosDevicetype == null) {
+                throw new IotBizException(IotErrorCode.CACHE_NOT_FOUNT);
+            }
+            setCache((TosDevicetype) tosDevicetype);
+        }
+
+        // 返回转换后的设备类型对象
+        return JSONObject.parseObject(JSONObject.toJSONString(tosDevicetype), TosDevicetype.class);
+    }
+
+    /**
+     * 根据设备类型标识符获取缓存中的设备类型对象,实际上调用的是getCacheObjectByDevTypeCode方法。
+     * @param devtypeBid 设备类型标识符
+     * @return TosDevicetype 设备类型对象
+     */
+    public TosDevicetype getCacheObjectByDevTypeBid(String devtypeBid) {
+        return getCacheObjectByDevTypeCode(devtypeBid);
+    }
+
+    /**
+     * 根据设备类型编码获取对应的服务名称。
+     * @param devtypeCode 设备类型编码
+     * @return String 服务名称
+     */
+    public String getServiceNameByDevTypeCode(String devtypeCode) {
+        if (null == devtypeCode) {
+            throw new IotBizException(IotErrorCode.INVALID_DEVICE_TYPE);
+        }
+        IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(devtypeCode);
+        if (null == iotDeviceDictEnum) {
+            throw new IotBizException(IotErrorCode.INVALID_DEVICE_TYPE);
+        }
+        return iotDeviceDictEnum.getServiceName();
+    }
+
+    /**
+     * 根据设备类型标识符获取对应的服务名称,实际上调用的是getServiceNameByDevTypeCode方法。
+     * @param devtypeBid 设备类型标识符
+     * @return String 服务名称
+     */
+    public String getServiceNameByDevTypeBid(String devtypeBid) {
+        return getServiceNameByDevTypeCode(devtypeBid);
+    }
+
+    /**
+     * 获取Redis缓存键值。
+     * @param redisCacheKey 缓存键枚举
+     * @return String 完整的缓存键
+     */
+    private String getKey(RedisCacheKey redisCacheKey) {
+        return redisCacheKey.getPrefix() + ":" + redisCacheKey.getModuleType() + ":" + redisCacheKey.getName() + ":";
+    }
+}

+ 34 - 0
src/main/java/com/yunfeiyun/agmp/iots/cache/TypeCoreService.java

@@ -0,0 +1,34 @@
+package com.yunfeiyun.agmp.iots.cache;
+
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
+import com.yunfeiyun.agmp.iots.service.BusinessCoreService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * 负责型号相关操作,用来替换之前的枚举写死的name获取相关
+ * 如果其他的,自行处理
+ */
+@Service
+public class TypeCoreService {
+
+    @Autowired
+    private TypeCacheService typeCacheService;
+
+
+
+    /**
+     * 获取型号(二级)的名称根据他的code
+     *
+     * @return
+     */
+    public String getTypeLv2NameByTypeCode(String typeCode) {
+        // 从缓存取出来
+        return getLv2ModelByLv2Code(typeCode).getDevtypeName();
+    }
+
+    public TosDevicetype getLv2ModelByLv2Code(String typeCode) {
+        return typeCacheService.getCacheObjectByDevTypeCode(typeCode);
+    }
+
+}

+ 3 - 4
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/core/task/CmdTaskService.java

@@ -16,20 +16,19 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
 import com.yunfeiyun.agmp.iot.common.model.task.TaskResult;
-import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
+import com.yunfeiyun.agmp.iots.cache.TypeCacheService;
 import com.yunfeiyun.agmp.iots.config.TestConst;
 import com.yunfeiyun.agmp.iots.core.cmd.core.serial.SerialTaskModel;
 import com.yunfeiyun.agmp.iots.core.cmd.model.CmdExecModel;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
-import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
 
 import com.yunfeiyun.agmp.iots.service.*;
 import com.yunfeiyun.agmp.iots.core.cmd.checker.CmdResultCheckService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
-
+import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
@@ -61,7 +60,7 @@ public class CmdTaskService {
     private IIotCmdlogService iIotCmdlogService;
 
     @Autowired
-    private TypeCacheService  typeCacheService;
+    private TypeCacheService typeCacheService;
 
 
 

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -8,7 +8,7 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
-import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
+import com.yunfeiyun.agmp.iots.cache.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/manager/HttpManager.java

@@ -6,8 +6,8 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.enums.IotDeviceconnTypeEnum;
-import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
-import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
+import com.yunfeiyun.agmp.iots.cache.DeviceconnCacheService;
+import com.yunfeiyun.agmp.iots.cache.TypeCacheService;
 import com.yunfeiyun.agmp.iots.common.annotate.HttpCore;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.http.*;

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -10,7 +10,7 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.enums.IotDeviceconnTypeEnum;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
-import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
+import com.yunfeiyun.agmp.iots.cache.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
 import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;

+ 6 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/mapper/BusinessCoreMapper.java

@@ -1,5 +1,7 @@
 package com.yunfeiyun.agmp.iots.device.mapper;
 
+import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 
 import java.util.List;
@@ -13,4 +15,8 @@ public interface BusinessCoreMapper {
     List<IotDeviceconnResVo> selectDevConnResVoList();
 
     List<IotDeviceconnResVo> selectDevConnResVoListByDevTypeBid(String devTypeBid);
+
+    TosDevicetype selectTosDevicetypeByDevtypeCode(String devtypeCode);
+
+    IotDeviceconn selectDevConnByConnId(String devconnBid);
 }

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YfXycbIIIDeviceImpl.java

@@ -35,7 +35,7 @@ import java.util.Objects;
 @Component(ServiceNameConst.SERVICE_YF_XYCB_III)
 @Slf4j
 public class YfXycbIIIDeviceImpl extends DeviceAbstractImpl implements IYfXycbIIIDevice {
-    private final String tag = IotDeviceDictEnum.TYPE_YF_XYCB_III.getName();
+    private final String tag = "性诱 III";
 
     @Autowired
     private MqttManager mqttManager;

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -9,7 +9,7 @@ import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
 import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
-import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
+import com.yunfeiyun.agmp.iots.cache.TypeCacheService;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
 import com.yunfeiyun.agmp.iots.core.manager.ConnectionManager;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;

+ 12 - 2
src/main/java/com/yunfeiyun/agmp/iots/service/BusinessCoreService.java

@@ -1,5 +1,7 @@
 package com.yunfeiyun.agmp.iots.service;
 
+import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo;
 import com.yunfeiyun.agmp.iots.device.mapper.BusinessCoreMapper;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -24,11 +26,19 @@ public class BusinessCoreService {
     public List<IotDeviceconnResVo> selectDevConnResVoList() {
         return businessCoreMapper.selectDevConnResVoList();
     }
+
     /**
      * 获取设备型号的信息
-     * */
-    public List<IotDeviceconnResVo> selectDevConnResVoListByDevTypeBid(String devTypeBid){
+     */
+    public List<IotDeviceconnResVo> selectDevConnResVoListByDevTypeBid(String devTypeBid) {
         return businessCoreMapper.selectDevConnResVoListByDevTypeBid(devTypeBid);
     }
 
+    public TosDevicetype selectTosDevicetypeByDevtypeCode(String devtypeCode){
+        return businessCoreMapper.selectTosDevicetypeByDevtypeCode(devtypeCode);
+    };
+
+    public IotDeviceconn selectDevConnByConnId(String devconnBid) {
+        return businessCoreMapper.selectDevConnByConnId(devconnBid);
+    }
 }

+ 8 - 0
src/main/resources/mapper/BusinessCoreMapper.xml

@@ -35,6 +35,14 @@
         LEFT JOIN TosFirm ifd on ifd.firmBid=it.firmBid
         where ic.devtypeBid = #{devtypeBid}
     </select>
+    <select id="selectTosDevicetypeByDevtypeCode"
+            resultType="com.yunfeiyun.agmp.iot.common.domain.TosDevicetype">
+               select * from TosDevicetype where devtypeCode=#{devtypeCode}
+
+    </select>
+    <select id="selectDevConnByConnId" resultType="com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn">
+        select * from IotDeviceconn where  devconnBid=#{devconnBid}
+    </select>
 
 
 </mapper>