Просмотр исходного кода

性能测试模拟添加数据的代码

yf_zn 11 месяцев назад
Родитель
Сommit
9239acf504

+ 36 - 32
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -47,16 +47,13 @@ public class MqttManager {
 
     @Autowired
     IIotDeviceService iIotDeviceService;
+
     @Autowired
     private DeviceconnCacheService deviceconnCacheService;
 
-    @Resource
-    private RedisTemplate redisTemplate;
-
     @Autowired
     private IotMqttTopicCacheService iotMqttTopicCacheService;
 
-
     /**
      * 实现类名称-->mqtt
      */
@@ -88,12 +85,12 @@ public class MqttManager {
      * @param jsonConfig
      * @throws MqttException
      */
-    public void buildMqttConnection(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig){
+    public void buildMqttConnection(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) {
         try {
             // 构建配置
             MqttConfig cfgYf = buildMqttConfig(iotDeviceconnResVo, jsonConfig);
             // 生成Mqtt连接标识并写入缓存
-            String connectionId = generateMqttConnectionId(iotDeviceconnResVo,jsonConfig);
+            String connectionId = generateMqttConnectionId(iotDeviceconnResVo, jsonConfig);
             log.info("【开始构建MQTT连接】 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
             // 构建topic
             List<IotDevice> devices = deviceTopicService.getDevicesByDevConnBid(iotDeviceconnResVo.getDevconnBid());
@@ -119,12 +116,11 @@ public class MqttManager {
                 }
             }
             MqttCore mqttCore = mqttCoreMap.get(connectionId);
-            if(null != mqttCore){
+            if (null != mqttCore) {
                 // MqttCore已经存在
                 mqttCore.bindTopicToDeviceId(mqttTopicValues);
-                mqttCore.
-                        subscribe(topics,cfgYf);
-            }else {
+                mqttCore.subscribe(topics, cfgYf);
+            } else {
                 // 创建新的mqttCore
                 mqttCore = new MqttCore();
                 mqttCore.setConnectionId(connectionId);
@@ -144,12 +140,11 @@ public class MqttManager {
     }
 
 
-
     /**
      * 【链接管理-删除】
      * 删除mqtt链接,底层统一调这个
      *
-     * @param connectionId
+     * @param iotDeviceconnResVo
      * @throws MqttException
      */
     public void deleteMqttConnection(IotDeviceconnResVo iotDeviceconnResVo) throws MqttException {
@@ -159,7 +154,7 @@ public class MqttManager {
         deviceconnCacheService.deleteMqttConnectionId(iotDeviceconnResVo.getDevconnBid());
         log.info("【删除MQTT连接信息】 获取连接标识 connectionId: {}", connectionId);
         // 运行到这一步的时候,设备的连接信息都已解绑,相关订阅都已取消
-        if(deviceconnCacheService.mqttConnectionIdHasLink(connectionId)){
+        if (deviceconnCacheService.mqttConnectionIdHasLink(connectionId)) {
             // 连接标识下仍有其他连接再使用该MqttCore
             log.info("【删除MQTT连接信息】 连接仍在使用,停止释放连接");
             return;
@@ -415,10 +410,11 @@ public class MqttManager {
         try {
             for (int i = 0; i < topics.length; i++) {
                 topics[i] = mqttTopicValues.get(i).getTopic();
-                mqttCore.getClient().subscribe(topics[i],0);
+                mqttCore.getClient().subscribe(topics[i], 0);
             }
             mqttCore.bindTopicToDeviceId(mqttTopicValues);
-            log.info("【批量订阅成功】 connectionId: {}, serviceName: {}, topics: {}, serviceType: {}", connectionId, serviceName, Arrays.toString(topics), mqttCore.getServiceType());
+            //log.info("【批量订阅成功】 connectionId: {}, serviceName: {}, topics: {}, serviceType: {}", connectionId, serviceName, Arrays.toString(topics), mqttCore.getServiceType());
+            log.info("【批量订阅成功】 connectionId: {}, serviceName: {}, serviceType: {}", connectionId, serviceName, mqttCore.getServiceType());
         } catch (MqttException e) {
             log.error("【批量订阅失败】 connectionId: {}, serviceName: {}, topics: {}, 异常信息: {}", connectionId, serviceName, Arrays.toString(topics), e.getMessage(), e);
             throw e;
@@ -517,47 +513,51 @@ public class MqttManager {
 
     /**
      * 获取连接信息,如果缓存中不存在的话,会根据jsonConfig进行解析
+     *
      * @param iotDeviceconn 连接信息
-     * @param jsonConfig 连接信息配置
+     * @param jsonConfig    连接信息配置
      **/
-    public String getMqttConnectionId(IotDeviceconn iotDeviceconn,JSONObject jsonConfig){
-        if(null == iotDeviceconn){
-            throw new IotBizException(ErrorCode.INVALID_PARAMETER.getCode(),"连接信息为空");
+    public String getMqttConnectionId(IotDeviceconn iotDeviceconn, JSONObject jsonConfig) {
+        if (null == iotDeviceconn) {
+            throw new IotBizException(ErrorCode.INVALID_PARAMETER.getCode(), "连接信息为空");
         }
         return deviceconnCacheService.getMqttConnectIdByDeviceConnBid(iotDeviceconn.getDevconnBid());
     }
+
     /**
      *
-     * */
-    public String generateMqttConnectionId(IotDeviceconn iotDeviceconn,JSONObject jsonConfig){
+     */
+    public String generateMqttConnectionId(IotDeviceconn iotDeviceconn, JSONObject jsonConfig) {
         // 使用mqtt连接的ip+port+user
         String ip = jsonConfig.getString("ip");
-        if(null != ip){
+        if (null != ip) {
             String[] ipItem = ip.split("\\.");
             StringBuilder ipFormat = new StringBuilder();
-            for(String str : ipItem){
-                ipFormat.append(String.format("%03d",Long.parseLong(str)));
+            for (String str : ipItem) {
+                ipFormat.append(String.format("%03d", Long.parseLong(str)));
             }
             ip = ipFormat.toString();
         }
-        String connectionId = ip+jsonConfig.getString("port")+jsonConfig.getString("username");
-        if(!IotDeviceconnTypeEnum.COMMON.getCode().equals(iotDeviceconn.getDevconnType())){
+        String connectionId = ip + jsonConfig.getString("port") + jsonConfig.getString("username");
+        if (!IotDeviceconnTypeEnum.COMMON.getCode().equals(iotDeviceconn.getDevconnType())) {
             // 非通用连接
             // 非通用连接信息会拼装租户标识
             connectionId = connectionId + iotDeviceconn.getTid();
         }
-        deviceconnCacheService.setMqttConnectionIdByConnBid(iotDeviceconn.getDevconnBid(),connectionId);
+        deviceconnCacheService.setMqttConnectionIdByConnBid(iotDeviceconn.getDevconnBid(), connectionId);
         return connectionId;
     }
+
     /**
      * 获取连接信息,如果缓存中不存在的话,会根据jsonConfig进行解析
+     *
      * @param iotDeviceconn 连接信息
      **/
-    public String getMqttConnectionId(IotDeviceconn iotDeviceconn){
+    public String getMqttConnectionId(IotDeviceconn iotDeviceconn) {
         JSONArray jsonArray = JSONArray.parseArray(iotDeviceconn.getDevconnConfig());
-        for(int i = 0;i<jsonArray.size();i++){
+        for (int i = 0; i < jsonArray.size(); i++) {
             JSONObject configObject = jsonArray.getJSONObject(i);
-            if("mqtt".equals(configObject.getString("type"))){
+            if ("mqtt".equals(configObject.getString("type"))) {
                 return getMqttConnectionId(iotDeviceconn, configObject);
             }
         }
@@ -570,9 +570,13 @@ public class MqttManager {
 
     }
 
-    private void refreshTopicCache(){
+    private void refreshTopicCache() {
+        if (true) {
+            //数据量太大,先关闭校验,待优化时候打开
+            return;
+        }
         Set<String> topicSet = new HashSet<>();
-        for(Map.Entry<String, MqttCore> entry : mqttCoreMap.entrySet()){
+        for (Map.Entry<String, MqttCore> entry : mqttCoreMap.entrySet()) {
             MqttCore mqttCore = entry.getValue();
             Map<String, String> topicToDevId = mqttCore.getTopicToDevId();
             topicSet.addAll(topicToDevId.keySet());

+ 2 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -16,6 +16,7 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Stream;
 
 /**
@@ -155,7 +156,7 @@ public class MqttCore {
     private void buildClient() throws MqttException {
         //return new MqttClient("tcp://47.96.123.180:1883", UUID.randomUUID().toString().replace("-",""));
         String url = "tcp://" + mqttConfig.getIp() + ":" + mqttConfig.getPort();
-        this.mqttClient = new MqttClient(url, IpUtils.getHostIp() + ":SAAS:" + connectionId, new MemoryPersistence());
+        this.mqttClient = new MqttClient(url, "SAAS:" + UUID.randomUUID().toString().replace("-", "") + ":" + connectionId, new MemoryPersistence());
         log.info("【初始化】构建 MQTT clientId {}", this.mqttClient.getClientId());
     }
 

+ 507 - 5
src/main/java/com/yunfeiyun/agmp/iots/device/controller/TestController.java

@@ -1,18 +1,35 @@
 package com.yunfeiyun.agmp.iots.device.controller;
 
+import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.core.domain.AjaxResult;
+import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.common.utils.JSONUtils;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
+import com.yunfeiyun.agmp.common.utils.uuid.IdUtils;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
+import com.yunfeiyun.agmp.iot.common.domain.*;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
+import com.yunfeiyun.agmp.iot.common.service.MongoService;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
+import com.yunfeiyun.agmp.iots.device.mapper.IotDeviceMapper;
+import com.yunfeiyun.agmp.iots.device.service.IIotYfScddataService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.web.bind.annotation.*;
 
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
 
 /**
- * 设备相关接口
+ * 测试接口
  */
 @RestController
 @RequestMapping("test")
@@ -20,13 +37,31 @@ import org.springframework.web.bind.annotation.*;
 public class TestController {
 
 
-    @Autowired
-    private MqttManager mqttManager;
-
+    @Resource(name = "threadPoolTaskExecutor")
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
     @Autowired
+    private MongoService<IotYfScddata> mongoService;
+    @Autowired
+    private MongoService<IotCbddata> cbdMongo;
+    @Autowired
+    private MongoService<IotCbdimg> cbdimgMongoService;
+    @Autowired
+    private MongoService<IotPestrecog> iotPestrecogMongoService;
+    @Autowired
+    private MongoTemplate mongoTemplate;
+    @Autowired
     private CmdDispatcherService cmdDispatcherService;
 
+    @Autowired
+    private IIotYfScddataService iIotYfScddataService;
+    @Autowired
+    private IotDeviceMapper iotDeviceMapper;
+
+    private static final Random random = new Random();
+    private static final DateTimeFormatter stampFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
+
+
     /**
      * 该方法模拟接收到mq的消息解析发送
      *
@@ -85,4 +120,471 @@ public class TestController {
         //mqttManager.getDeviceHandler(ServiceNameConst.SERVICE_YF_CBD).sendCmd(null);
         return AjaxResult.success();
     }
+
+    // 定义日期格式
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+    @GetMapping("/inster/scd/data")
+    public AjaxResult insertScdData3Month() throws InterruptedException {
+        List<String> devID = new ArrayList<>();
+        List<String> deals = new ArrayList<>();
+        List<String> devBidList = new ArrayList<>();
+        devBidList.add(IotDeviceDictConst.TYPE_YF_SCD);
+        //devBidList.add(IotDeviceDictConst.TYPE_YF_FXSSCD);
+        //devBidList.add(IotDeviceDictConst.TYPE_YF_JGFXSSCD);
+        List<IotDevice> iotDevices = iotDeviceMapper.selectIotDeviceByDevTypeBidList(devBidList);
+        for (IotDevice iotDevice : iotDevices) {
+//            if(!(iotDevice.getId()>5092&&iotDevice.getId()<5127)){
+//                continue;
+//            }
+
+            if (iotDevice.getId() < 6662) {
+                continue;
+            }
+            try {
+                //查出来最新一条数据
+                IotYfScddata iotYfScddata = mongoService.findOne("devBid", iotDevice.getDevBid(), IotYfScddata.class);
+                if (iotYfScddata == null) {
+                    devID.add(iotDevice.getDevCode());
+                } else {
+                    //循环插入
+                    reBuildData(iotYfScddata, iotYfScddata.getScddataCreatedDate(), iotDevice.getId());
+                }
+
+            } catch (Exception e) {
+                log.error("失败", e.getMessage());
+            }
+
+            deals.add(iotDevice.getDevCode());
+        }
+
+        Map map = new HashMap();
+        map.put("skip", devID);
+        map.put("deals", deals);
+        return AjaxResult.success(map);
+    }
+
+
+    void reBuildData(IotYfScddata iotYfScddata, String endTime, long id) {
+
+
+        // 解析结束时间字符串到LocalDateTime对象
+        LocalDateTime endDateTime = LocalDateTime.parse(endTime, formatter);
+
+        // 计算开始时间(结束时间往前推三个月)
+        LocalDateTime startDateTime = endDateTime.minusMonths(3);
+
+        // 创建一个可变的时间指针,用于迭代
+        LocalDateTime currentTime = endDateTime;
+        int i = 0;
+
+        // 循环直到我们到达开始时间
+        while (!currentTime.isBefore(startDateTime)) {
+            i += 1;
+
+            // 保存新创建的数据到MongoDB
+            int finalI = i;
+            LocalDateTime finalCurrentTime = currentTime;
+            CompletableFuture.runAsync(() -> {
+                try {
+                    // 创建一个新的IotYfScddata实例,避免重复引用同一个对象
+                    IotYfScddata newData = new IotYfScddata();
+                    // 设置新的唯一标识符
+                    newData.setScddataBid(IdUtils.fastSimpleUUID());
+                    newData.setDevBid(iotYfScddata.getDevBid());
+                    newData.setId(IdUtils.fastSimpleUUID());
+                    newData.setCId(iotYfScddata.getCId());
+
+                    // 设置创建日期为当前时间指针所指向的时间
+                    newData.setScddataCreatedDate(finalCurrentTime.format(formatter));
+                    newData.setScddataContent(reBuildJsonData(iotYfScddata.getScddataContent()));
+                    mongoService.saveOne(newData);
+                    log.info(newData.getDevBid() + " 第:{} 条:ID:{}", finalI, id);
+                } catch (Exception e) {
+                    log.error("发MQTT消息失败", e);
+                }
+            }, threadPoolTaskExecutor);
+
+            // 将时间指针向前移动20分钟
+            currentTime = currentTime.minus(Duration.ofMinutes(20));
+
+        }
+    }
+
+    // 预定义静态值列表(简化示例)
+    private static final String[] DS_VALUES = {"0", "1"};
+    private static final String STAMP_FORMATTER = "yyyyMMddHHmmss";
+    private static final String DVER_VALUE = "4.0.02-1-4G-uart";
+    private static final String ET_VALUE = "2";
+    private static final String[] CSQ_VALUES = {"0", "5", "10", "15", "20", "25", "30"}; // 信号强度假设在0到30之间
+    private static final String CT_VALUE = "5"; // 连接类型固定
+    private static final String DTYPE_VALUE = "2";
+    private static final String LNG_VALUE = "113.7250000";
+    private static final String LAT_VALUE = "35.0250000";
+    private static final String DAT_F_VALUE = "20";
+    private static final String[] AH_VALUES = {"0", "25", "50", "75", "100"}; // 湿度假设在0-100之间
+    private static final String RCNT_VALUE = "500"; // 计数器值固定
+    private static final String[] CLT_T_VALUES = {"-30", "-15", "0", "15", "30"}; // 温度假设在-30到30度之间
+    private static final String[] GPS_VALUES = {"0", "1", "2"}; // GPS状态,假设0,1,2
+    private static final String TS_VALUE = "0";
+    private static final String WS_VALUE = "0";
+    private static final String CV_VALUE = "0.500";
+    private static final String BV_VALUE = "12.0";
+    private static final String DPS_VALUE = "0";
+    private static final String ST_VALUE = "50";
+    private static final String TT_VALUE = "3";
+    private static final String AT_VALUE = "30.0";
+    private static final String BT_VALUE = "27.5";
+    private static final String TPS_VALUE = "50";
+    private static final String RPS_VALUE = "50";
+    private static final String LPS_VALUE = "50";
+
+    JSONObject reBuildJsonData(JSONObject originalJson) {
+        // 解析原始JSON对象
+        JSONObject jsonData = new JSONObject();
+
+//        // 动态生成数据
+//        jsonData.put("ds", String.valueOf(random.nextInt(2))); // 假设"ds"是0或1
+//        jsonData.put("stamp", LocalDateTime.now().format(stampFormatter)); // 使用当前时间作为时间戳
+//        jsonData.put("dver", "4.0.02-1-4G-uart"); // 版本号假设固定
+//        jsonData.put("et", "2"); // 假设事件类型固定
+//        jsonData.put("csq", String.valueOf(random.nextInt(31))); // 信号强度假设在0到30之间
+//        jsonData.put("ct", random.nextInt(50));
+//        JSONObject jsonData = new JSONObject();
+        DateTimeFormatter stampFormatter = DateTimeFormatter.ofPattern(STAMP_FORMATTER);
+        LocalDateTime now = LocalDateTime.now();
+        int timestampLastDigit = Integer.parseInt(now.format(DateTimeFormatter.ofPattern("s")).substring(0, 1));
+
+        // 使用时间戳最后一位作为索引构建JSON对象
+        jsonData.put("ds", DS_VALUES[timestampLastDigit % DS_VALUES.length]);
+        jsonData.put("stamp", now.format(stampFormatter));
+        jsonData.put("dver", DVER_VALUE);
+        jsonData.put("et", ET_VALUE);
+        jsonData.put("csq", CSQ_VALUES[timestampLastDigit % CSQ_VALUES.length]);
+        jsonData.put("ct", CSQ_VALUES[timestampLastDigit % CSQ_VALUES.length]);
+        jsonData.put("dtype", DTYPE_VALUE);
+        jsonData.put("lng", LNG_VALUE);
+        jsonData.put("lat", LAT_VALUE);
+        jsonData.put("dat_f", DAT_F_VALUE);
+        jsonData.put("ah", AH_VALUES[timestampLastDigit % AH_VALUES.length]);
+        jsonData.put("rcnt", RCNT_VALUE);
+        jsonData.put("clt_t", CLT_T_VALUES[timestampLastDigit % CLT_T_VALUES.length]);
+        jsonData.put("gps", GPS_VALUES[timestampLastDigit % GPS_VALUES.length]);
+        jsonData.put("ts", TS_VALUE);
+        jsonData.put("ws", WS_VALUE);
+        jsonData.put("cv", CV_VALUE);
+        jsonData.put("bv", BV_VALUE);
+        jsonData.put("dps", DPS_VALUE);
+        jsonData.put("st", ST_VALUE);
+        jsonData.put("tt", TT_VALUE);
+        jsonData.put("at", AT_VALUE);
+        jsonData.put("bt", BT_VALUE);
+        jsonData.put("tps", TPS_VALUE);
+        jsonData.put("rps", RPS_VALUE);
+        jsonData.put("stm8vs", originalJson.get("stm8vs"));
+        jsonData.put("lps", LPS_VALUE);
+        jsonData.put("imei", originalJson.get("imei"));
+        jsonData.put("iccid", originalJson.get("iccid"));
+
+        // 对于其他原需随机生成的值,我们同样创建静态值列表并用索引选取
+        jsonData.put("lng", String.format("%.7f", 113.7 + (timestampLastDigit / 10.0 * 0.05))); // 经度范围内的值
+        jsonData.put("lat", String.format("%.7f", 35.0 + (timestampLastDigit / 10.0 * 0.05))); // 纬度范围内的值
+        jsonData.put("cv", String.format("%.3f", timestampLastDigit / 10.0)); // 控制电压范围内的值
+
+        return jsonData;
+    }
+
+    //模拟测报灯数据
+    @GetMapping("/inster/cbd/data")
+    public AjaxResult insertCBDdData3Month() {
+        List<String> devID = new ArrayList<>();
+        List<String> deals = new ArrayList<>();
+        List<String> devBidList = new ArrayList<>();
+        devBidList.add(IotDeviceDictConst.TYPE_YF_CBD);
+        List<IotDevice> iotDevices = iotDeviceMapper.selectIotDeviceByDevTypeBidList(devBidList);
+        for (IotDevice iotDevice : iotDevices) {
+            if (iotDevice.getId() < 7178) {
+                continue;
+            }
+            //查出来最新一条数据
+            IotCbddata cbddata = cbdMongo.findOne("devBid", iotDevice.getDevBid(), IotCbddata.class);
+            if (cbddata == null) {
+                cbddata = new IotCbddata();
+                cbddata.setTid(iotDevice.getTid());
+                cbddata.setCbddataCreatedDate(DateUtils.dateTimeNow());
+                cbddata.setDevBid(iotDevice.getDevBid());
+                cbddata.setCdbdataContent(reBuildCBDJsonData(null));
+            }
+            //循环插入
+            reBuildCBDData(cbddata, cbddata.getCbddataCreatedDate(), iotDevice.getId());
+            deals.add(iotDevice.getDevCode());
+        }
+        Map map = new HashMap();
+        map.put("skip", devID);
+        map.put("deals", deals);
+        return AjaxResult.success(map);
+    }
+
+
+    public void reBuildCBDData(IotCbddata iotCbddata, String endTime, long id) {
+        // 解析结束时间字符串到LocalDateTime对象
+        LocalDateTime endDateTime = LocalDateTime.parse(endTime, formatter);
+        // 计算开始时间(结束时间往前推三个月)
+        LocalDateTime startDateTime = endDateTime.minusMonths(3);
+        // 创建一个可变的时间指针,用于迭代
+        LocalDateTime currentTime = endDateTime;
+        int i = 0;
+        // 循环直到我们到达开始时间
+        while (!currentTime.isBefore(startDateTime)) {
+            i += 1;
+            LocalDateTime finalCurrentTime = currentTime;
+            int finalI = i;
+            CompletableFuture.runAsync(() -> {
+                try {
+                    // 检查时间是否在20:00-06:00之间
+                    boolean isWithinTimeRange = isTimeInRange(finalCurrentTime);
+                    IotCbddata newData = new IotCbddata();
+                    newData.setCdbdataBid(IdUtils.fastSimpleUUID());
+                    newData.setDevBid(iotCbddata.getDevBid());
+                    newData.setId(newData.getCdbdataBid());
+                    newData.setTid(iotCbddata.getTid());
+                    newData.setCbddataCreatedDate(finalCurrentTime.format(formatter));
+                    newData.setCdbdataContent(reBuildCBDJsonData(iotCbddata.getCdbdataContent()));
+
+                    // 保存监测数据
+                    cbdMongo.saveOne(newData);
+
+                    if (isWithinTimeRange) {
+                        // 保存图片
+                        IotCbdimg iotCbdimg = createCBDImgObject(newData);
+                        cbdimgMongoService.saveOne(iotCbdimg);
+
+                        // 保存识别记录
+                        List<IotCbdPestrecog> iotCbdPestrecogs = iotCbdimg.getCbdrecog().get("2").getPestrecog().get("0");
+                        for (IotCbdPestrecog iotCbdPestrecog : iotCbdPestrecogs) {
+                            IotPestrecog iotPestrecog = new IotPestrecog();
+
+                            // 将IotCbdPestrecog的字段值映射到IotPestrecog
+                            iotPestrecog.setId(iotCbdPestrecog.getId());
+                            iotPestrecog.setPestrecogBid(iotCbdPestrecog.getPestrecogBid());
+                            iotPestrecog.setTid(iotCbdPestrecog.getTid());
+                            iotPestrecog.setDevBid(iotCbdPestrecog.getDevBid());
+                            iotPestrecog.setDevtypeBid(iotCbdPestrecog.getDevtypeBid());
+                            iotPestrecog.setRecogBid(iotCbdPestrecog.getRecogBid());
+                            iotPestrecog.setImgBid(iotCbdPestrecog.getImgBid());
+                            iotPestrecog.setPestrecogMarktype(iotCbdPestrecog.getPestrecogMarktype());
+                            iotPestrecog.setCbdrecogType(iotCbdPestrecog.getCbdrecogType());
+                            iotPestrecog.setPestrecogAt(iotCbdPestrecog.getPestrecogAt());
+                            iotPestrecog.setPestrecogAh(iotCbdPestrecog.getPestrecogAh());
+                            iotPestrecog.setPestrecogLng(iotCbdPestrecog.getPestrecogLng());
+                            iotPestrecog.setPestrecogLat(iotCbdPestrecog.getPestrecogLat());
+                            iotPestrecog.setPestrecogProvince(iotCbdPestrecog.getPestrecogProvince());
+                            iotPestrecog.setPestrecogCity(iotCbdPestrecog.getPestrecogCity());
+                            iotPestrecog.setPestrecogDistrict(iotCbdPestrecog.getPestrecogDistrict());
+                            iotPestrecog.setPestBusid(iotCbdPestrecog.getPestBusid());
+                            iotPestrecog.setPestName(iotCbdPestrecog.getPestName());
+                            iotPestrecog.setPestrecogNum(iotCbdPestrecog.getPestrecogNum());
+                            iotPestrecog.setPestrecogCreatedDate(iotCbdPestrecog.getPestrecogCreatedDate());
+
+                            // 保存识别记录
+                            iotPestrecogMongoService.saveOne(iotPestrecog);
+                        }
+
+                        List<IotCbdPestrecog> aiiotCbdPestrecogs = iotCbdimg.getCbdrecog().get("2").getPestrecog().get("1");
+                        for (IotCbdPestrecog iotCbdPestrecog : aiiotCbdPestrecogs) {
+                            IotPestrecog iotPestrecog = new IotPestrecog();
+
+                            // 将IotCbdPestrecog的字段值映射到IotPestrecog
+                            iotPestrecog.setId(iotCbdPestrecog.getId());
+                            iotPestrecog.setPestrecogBid(iotCbdPestrecog.getPestrecogBid());
+                            iotPestrecog.setTid(iotCbdPestrecog.getTid());
+                            iotPestrecog.setDevBid(iotCbdPestrecog.getDevBid());
+                            iotPestrecog.setDevtypeBid(iotCbdPestrecog.getDevtypeBid());
+                            iotPestrecog.setRecogBid(iotCbdPestrecog.getRecogBid());
+                            iotPestrecog.setImgBid(iotCbdPestrecog.getImgBid());
+                            iotPestrecog.setPestrecogMarktype(iotCbdPestrecog.getPestrecogMarktype());
+                            iotPestrecog.setCbdrecogType(iotCbdPestrecog.getCbdrecogType());
+                            iotPestrecog.setPestrecogAt(iotCbdPestrecog.getPestrecogAt());
+                            iotPestrecog.setPestrecogAh(iotCbdPestrecog.getPestrecogAh());
+                            iotPestrecog.setPestrecogLng(iotCbdPestrecog.getPestrecogLng());
+                            iotPestrecog.setPestrecogLat(iotCbdPestrecog.getPestrecogLat());
+                            iotPestrecog.setPestrecogProvince(iotCbdPestrecog.getPestrecogProvince());
+                            iotPestrecog.setPestrecogCity(iotCbdPestrecog.getPestrecogCity());
+                            iotPestrecog.setPestrecogDistrict(iotCbdPestrecog.getPestrecogDistrict());
+                            iotPestrecog.setPestBusid(iotCbdPestrecog.getPestBusid());
+                            iotPestrecog.setPestName(iotCbdPestrecog.getPestName());
+                            iotPestrecog.setPestrecogNum(iotCbdPestrecog.getPestrecogNum());
+                            iotPestrecog.setPestrecogCreatedDate(iotCbdPestrecog.getPestrecogCreatedDate());
+
+                            // 保存识别记录
+                            iotPestrecogMongoService.saveOne(iotPestrecog);
+                        }
+
+                    }
+                    log.info("第{}条:ID: {}结束", finalI, id);
+                } catch (Exception e) {
+                    log.error("发MQTT消息失败", e);
+                }
+            }, threadPoolTaskExecutor);
+
+            // 将时间指针向前移动20分钟
+            currentTime = currentTime.minus(Duration.ofMinutes(20));
+        }
+    }
+
+    private boolean isTimeInRange(LocalDateTime time) {
+        // 获取当前时间的小时和分钟
+        int hour = time.getHour();
+        int minute = time.getMinute();
+        // 判断是否在20:00-06:00之间
+        return (hour >= 20 || hour < 6) && !(hour == 20 && minute < 0) && !(hour == 6 && minute > 0);
+    }
+
+    JSONObject reBuildCBDJsonData(JSONObject originalJson) {
+        JSONObject jsonData = new JSONObject();
+        LocalDateTime now = LocalDateTime.now();
+        // 固定数据或不需要更改的数据
+        jsonData.put("dver", "1.48.4(2000W)-2.7.1"); // 固定版本号
+        jsonData.put("proj", "A7_RTU_V3"); // 项目名称固定
+        jsonData.put("dtype", "3"); // 设备类型固定为3
+        jsonData.put("vtype", "4"); // 版本类型固定为4
+        jsonData.put("tt", "2"); // 测试类型固定为2
+
+        // 随机数据
+        jsonData.put("ts", String.valueOf(random.nextInt(2))); // 传输状态随机0或1
+        jsonData.put("tps", String.valueOf(random.nextInt(101))); // 转速随机0-100
+        jsonData.put("lps", String.valueOf(random.nextInt(101))); // 负载百分比随机0-100
+        jsonData.put("rps", String.valueOf(random.nextInt(51))); // 轮速随机0-50
+        jsonData.put("batStatus", String.valueOf(random.nextInt(4))); // 电池状态随机0-3
+        jsonData.put("lamp", String.valueOf(random.nextInt(2))); // 灯光状态随机0或1
+        jsonData.put("upds", String.valueOf(random.nextInt(2))); // 更新状态随机0或1
+        jsonData.put("dnds", String.valueOf(random.nextInt(2))); // 下载状态随机0或1
+        jsonData.put("hrt", String.valueOf(random.nextInt(61) + 10)); // 心跳间隔随机10-70
+        jsonData.put("hs", String.valueOf(random.nextInt(2))); // 健康状态随机0或1
+        jsonData.put("collt", String.valueOf(random.nextInt(61))); // 收集时间随机0-60
+        jsonData.put("htim", String.valueOf(random.nextInt(61))); // 持续时间随机0-60
+        jsonData.put("hst", String.valueOf(random.nextInt(181) + 120)); // 历史状态随机120-300
+        jsonData.put("tph", String.valueOf(random.nextInt(51) + 10)); // 最高温度随机10-60
+        jsonData.put("tpl", String.valueOf(random.nextInt(24) - 30)); // 最低温度随机-30至-6
+        jsonData.put("ws", String.valueOf(random.nextInt(5))); // 工作状态随机0-4
+        jsonData.put("fuse_voltage", String.format("%.2f", 20.0 + (28.0 - 20.0) * random.nextDouble())); // 熔断器电压随机20.00-28.00
+        jsonData.put("st", String.valueOf(random.nextInt(2))); // 状态随机0或1
+        jsonData.put("et", String.valueOf(random.nextInt(3) + 1)); // 事件类型随机1-3
+        jsonData.put("shake", String.valueOf(random.nextInt(2))); // 震动检测随机0或1
+        jsonData.put("shake_sec", String.valueOf(random.nextInt(6) + 1)); // 震动持续时间随机1-6秒
+        jsonData.put("gps", String.valueOf(random.nextInt(2))); // GPS状态随机0或1
+        jsonData.put("ds", String.valueOf(random.nextInt(2))); // 数据状态随机0或1
+        jsonData.put("ah", String.valueOf(random.nextInt(100) + 1)); // 剩余电量随机1-100
+        jsonData.put("at", String.valueOf(random.nextInt(31) - 10)); // 温度随机-10至20
+        jsonData.put("imei", originalJson == null ? "-" : originalJson.get("imei")); // IMEI假设固定
+        jsonData.put("iccid", originalJson == null ? "-" : originalJson.get("iccid")); // ICCID假设固定
+        jsonData.put("stamp", now.format(stampFormatter)); // 时间戳使用当前时间
+        jsonData.put("lng", String.format("%.6f", 106.476933 + (-0.005 + random.nextDouble() * 0.01))); // 经度在一定范围内随机
+        jsonData.put("lat", String.format("%.6f", 30.960897 + (-0.005 + random.nextDouble() * 0.01))); // 纬度在一定范围内随机
+        jsonData.put("rcnt", String.valueOf(random.nextInt(1000) + 1)); // 记录数量随机1-1000
+        jsonData.put("current", String.format("%.2f", 300.0 + (320.0 - 300.0) * random.nextDouble())); // 电流随机300.00-320.00
+        jsonData.put("vbat", String.format("%.2f", 24.0 + (25.0 - 24.0) * random.nextDouble())); // 电池电压随机24.00-25.00
+        jsonData.put("dat_f", String.valueOf(random.nextInt(121))); // 数据频率随机0-120
+        jsonData.put("csq", String.valueOf(random.nextInt(31) + 1)); // 信号质量随机1-31
+        jsonData.put("datt", String.valueOf(random.nextInt(121))); // 数据传输时间随机0-120
+
+        return jsonData;
+    }
+
+    public static IotCbdimg createCBDImgObject(IotCbddata newData) {
+        IotCbdimg iotCbdimg = new IotCbdimg();
+        // IMG-设置IotCbdimg的属性
+        iotCbdimg.setId(IdUtils.fastUUID());
+        iotCbdimg.setCbdimgBid(iotCbdimg.getId());
+        iotCbdimg.setTid(newData.getTid());
+        iotCbdimg.setDevBid(newData.getDevBid());
+        iotCbdimg.setCbdimgTotalnum(24L);
+        iotCbdimg.setCbdimgAddr("https://web.hnyfwlw.com:58003/Basics/cbd/860048072600412/2025/1/6/860048072600412-20250106-032238-80000.000000.jpg");
+        iotCbdimg.setCbdimgModifieddate(newData.getCbddataCreatedDate());
+        iotCbdimg.setCbdimgCreatedDate(newData.getCbddataCreatedDate());
+        iotCbdimg.setCbdimgDelstatus("0");
+
+        // IMG-构建IotCbdrecog对象
+        IotCbdrecog cbdrecog = new IotCbdrecog();
+        cbdrecog.setId(IdUtils.fastUUID());
+        cbdrecog.setCbdrecogBid(cbdrecog.getId());
+        cbdrecog.setTid(newData.getTid());
+        cbdrecog.setCbdimgBid(iotCbdimg.getCbdimgBid());
+        cbdrecog.setDevBid(newData.getDevBid());
+        cbdrecog.setCbdrecogType("2");
+        cbdrecog.setCbdrecogMarktype("1");
+        cbdrecog.setCbdrecogManualaddr("https://yunfei-agm.oss-cn-hangzhou.aliyuncs.com/agmp/cbdRecognitionMarkersAi/20250108/baf7a08b-a896-4b66-9ffe-35dae517dd60_13e9f10c03db463da3518f07ed135bb2.jpg");
+        cbdrecog.setCbdrecogManualmark("[{\"小虫子\":[2524,2055,2668,2247]}]");
+        //ai
+        cbdrecog.setCbdrecogMachinemark("[{'26': [2488, 1808, 3040, 2470, 0.46]}]");
+        cbdrecog.setCbdrecogAddr("https://yunfei-agm.oss-cn-hangzhou.aliyuncs.com/agmp/cbdRecognitionMarkersAi/20250105/2ba3e8f2-0194-1000-e000-b8617f000101_c054a2e5fd7c4328a620c6e990643d6f.jpg");
+        cbdrecog.setCbdrecogTotalnumManual(1L);
+        cbdrecog.setCbdrecogPestnum(1L);
+        cbdrecog.setCbdrecogTypenum(1L);
+        cbdrecog.setCbdrecogPestnumManual(1L);
+        cbdrecog.setCbdrecogTypenumManual(1L);
+        //cbdrecog.setCbdrecogCreator("2b9d939c-0194-1000-e000-88097f000101");
+        //cbdrecog.setCbdrecogModifier("2b9d939c-0194-1000-e000-88097f000101");
+        cbdrecog.setCbdrecogModifieddate(newData.getCbddataCreatedDate());
+        cbdrecog.setCbdrecogCreatedDate(newData.getCbddataCreatedDate());
+        cbdrecog.setCbdrecogDelstatus("0");
+
+        // IMG-构建IotCbdPestrecog对象并添加到cbdrecog中
+        IotCbdPestrecog pestrecog = new IotCbdPestrecog();
+        pestrecog.setPestrecogBid(IdUtils.fastUUID());
+        pestrecog.setTid(cbdrecog.getId());
+        pestrecog.setDevBid(newData.getDevBid());
+        pestrecog.setDevtypeBid("CQCBD00001");
+        pestrecog.setRecogBid(cbdrecog.getCbdrecogBid());
+        pestrecog.setImgBid(iotCbdimg.getCbdimgBid());
+        pestrecog.setPestrecogMarktype("0");
+        pestrecog.setCbdrecogType("2");
+        pestrecog.setPestrecogLng(new BigDecimal("106.293450"));
+        pestrecog.setPestrecogLat(new BigDecimal("29.291224"));
+        pestrecog.setPestrecogProvince("重庆市");
+        pestrecog.setPestrecogCity("九龙坡区");
+        pestrecog.setPestrecogDistrict("九龙坡区");
+        pestrecog.setPestBusid("43fab5402067aeb941357186a6a085ef");
+        pestrecog.setPestName("小虫子");
+        pestrecog.setPestrecogNum(1L);
+        pestrecog.setPestrecogCreatedDate(newData.getCbddataCreatedDate());
+
+        IotCbdPestrecog pestrecog2 = new IotCbdPestrecog();
+        pestrecog2.setPestrecogBid(IdUtils.fastUUID());
+        pestrecog2.setTid(cbdrecog.getId());
+        pestrecog2.setDevBid(newData.getDevBid());
+        pestrecog2.setDevtypeBid("CQCBD00001");
+        pestrecog2.setRecogBid(cbdrecog.getCbdrecogBid());
+        pestrecog2.setImgBid(iotCbdimg.getCbdimgBid());
+        pestrecog2.setPestrecogMarktype("1");
+        pestrecog2.setCbdrecogType("2");
+        pestrecog2.setPestrecogLng(new BigDecimal("106.293450"));
+        pestrecog2.setPestrecogLat(new BigDecimal("29.291224"));
+        pestrecog2.setPestrecogProvince("重庆市");
+        pestrecog2.setPestrecogCity("九龙坡区");
+        pestrecog2.setPestrecogDistrict("九龙坡区");
+        pestrecog2.setPestBusid("f265b662-018c-1000-e000-0132c0a8015f");
+        pestrecog2.setPestName("小地老虎");
+        pestrecog2.setPestrecogNum(1L);
+        pestrecog2.setPestrecogCreatedDate(newData.getCbddataCreatedDate());
+
+        // 将IotCbdPestrecog对象放入list并设置到cbdrecog中
+        List<IotCbdPestrecog> pestList = new ArrayList<>();
+        List<IotCbdPestrecog> aipestList = new ArrayList<>();
+        pestList.add(pestrecog);
+        aipestList.add(pestrecog2);
+        Map<String, List<IotCbdPestrecog>> pestMap = new HashMap<>();
+        pestMap.put("0", pestList);
+        pestMap.put("1", aipestList);
+        cbdrecog.setPestrecog(pestMap);
+
+        // 将cbdrecog对象放入map并设置到iotCbdimg中
+        Map<String, IotCbdrecog> recogMap = new HashMap<>();
+        recogMap.put("2", cbdrecog);
+        iotCbdimg.setCbdrecog(recogMap);
+
+        return iotCbdimg;
+    }
+
+
 }

+ 1 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/mapper/IotDeviceMapper.java

@@ -41,6 +41,7 @@ public interface IotDeviceMapper {
     IotDevice selectIotDeviceByDevBid(String devBid);
 
     List<IotDevice> selectIotDeviceByDevBidList(List<String> devBidList);
+    List<IotDevice> selectIotDeviceByDevTypeBidList(List<String> devTypeBidList);
 
     List<IotDevice> getDeviceIdByFirmBizId(@Param("firmBizId") String firmBizId, @Param("deviceTypeBizId") String deviceTypeBizId);
 

+ 1 - 10
src/main/java/com/yunfeiyun/agmp/iots/task/IotStatusService.java

@@ -37,9 +37,6 @@ public class IotStatusService {
     @Autowired
     IIotDeviceconfigService iIotDeviceconfigService;
 
-    @Autowired
-    private MqttManager mqttManager;
-
     @PostConstruct
     void init() {
         // 目前基于已经实现将最新数据放到DeviceConfig的设备,没有的将进行实现
@@ -75,9 +72,8 @@ public class IotStatusService {
      * 定期根据类型查设备最新设备数据,是否长时间不上报。
      */
     //@Scheduled(cron = "0 0 */1 * * ?")
-    @Scheduled(cron = "0 0/2 0/1 * * ?")
+    @Scheduled(cron = "0 0/20 * * * ? ")
     public void validateStatusByDevType() {
-        printMqttStatus();
         Iterator<String> iterator = validateDeviceType.iterator();
         while (iterator.hasNext()) {
             String type = iterator.next();
@@ -177,9 +173,4 @@ public class IotStatusService {
         }
         return false;
     }
-
-    void printMqttStatus() {
-        // 显示所有的加载厂家配置
-        mqttManager.showConfig();
-    }
 }

+ 7 - 0
src/main/resources/mapper/IotDeviceMapper.xml

@@ -532,6 +532,13 @@
             </foreach>
         </where>
     </select>
+    <select id="selectIotDeviceByDevTypeBidList" resultType="com.yunfeiyun.agmp.iot.common.domain.IotDevice">
+        <include refid="selectIotDeviceVo"/>
+        where devTypeBid in
+        <foreach item="devTypeBid" collection="list" open="(" separator="," close=")">
+            #{devTypeBid}
+        </foreach>
+    </select>
 </mapper>