Explorar o código

Merge remote-tracking branch 'origin/master'

liuyaowen hai 1 ano
pai
achega
a9fcef085b

+ 5 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/http/HttpClient.java

@@ -65,6 +65,11 @@ public class HttpClient {
     public void deviceCreateHandle(IotDevice iotDevice) {
 
     }
+
+    public JSONObject getClientConfig() {
+        return clientConfig;
+    }
+
     /**
      * 弃用
      */

+ 21 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/service/IotYbqEnvDataService.java

@@ -0,0 +1,21 @@
+package com.yunfeiyun.agmp.iots.device.service;
+
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqEnvData;
+
+/**
+ * @author zhangn
+ */
+public interface IotYbqEnvDataService {
+    /**
+     * 插入预测数据
+     */
+    void insertData(IotYbqEnvData iotYbqEnvData);
+    /**
+     * 删除预测数据
+     *
+     * @param id
+     */
+    void deleteData(String id);
+
+
+}

+ 39 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/service/IotYbqPredictDataService.java

@@ -0,0 +1,39 @@
+package com.yunfeiyun.agmp.iots.device.service;
+
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqPredictData;
+
+/**
+ * @author zhangn
+ */
+public interface IotYbqPredictDataService {
+    /**
+     * 插入预测数据
+     *
+     * @param iotYbqPredictData
+     */
+    void insertData(IotYbqPredictData iotYbqPredictData);
+
+    /**
+     * 插入预测数据
+     *
+     * @param iotYbqPredictData
+     */
+    void updateData(IotYbqPredictData iotYbqPredictData);
+
+    /**
+     * 删除预测数据
+     *
+     * @param id
+     */
+    void deleteData(String id);
+
+    /**
+     * 根据设备获取当天的预测记录
+     *
+     * @param devBid
+     * @return
+     */
+    IotYbqPredictData getTodayData(String devBid);
+
+}
+

+ 27 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IotYbqEnvDataServiceImpl.java

@@ -0,0 +1,27 @@
+package com.yunfeiyun.agmp.iots.device.serviceImp;
+
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqEnvData;
+import com.yunfeiyun.agmp.iot.common.service.MongoService;
+import com.yunfeiyun.agmp.iots.device.service.IotYbqEnvDataService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author zhangn
+ */
+@Service
+public class IotYbqEnvDataServiceImpl implements IotYbqEnvDataService {
+
+    @Autowired
+    private MongoService<IotYbqEnvData> mongoService;
+
+    @Override
+    public void insertData(IotYbqEnvData iotYbqEnvData) {
+        mongoService.saveOne(iotYbqEnvData);
+    }
+
+    @Override
+    public void deleteData(String id) {
+        mongoService.removeAllByParam("id",id,"IotYbqEnvData");
+    }
+}

+ 52 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IotYbqPredictDataServiceImpl.java

@@ -0,0 +1,52 @@
+package com.yunfeiyun.agmp.iots.device.serviceImp;
+
+import com.yunfeiyun.agmp.common.utils.DateUtils;
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqPredictData;
+import com.yunfeiyun.agmp.iot.common.service.MongoService;
+import com.yunfeiyun.agmp.iots.device.service.IotYbqPredictDataService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class IotYbqPredictDataServiceImpl implements IotYbqPredictDataService {
+
+    @Autowired
+    private MongoService<IotYbqPredictData> mongoService;
+
+    @Override
+    public void insertData(IotYbqPredictData iotYbqPredictData) {
+        mongoService.saveOne(iotYbqPredictData);
+    }
+
+    @Override
+    public void updateData(IotYbqPredictData iotYbqPredictData) {
+        Map param = new HashMap<>();
+        param.put("id", iotYbqPredictData.getDevBid());
+        param.put("computeDate", iotYbqPredictData.getComputeDate());
+        param.put("ybqdataModifiedDate", DateUtils.dateTimeNow());
+        param.put("value", iotYbqPredictData.getValue());
+        mongoService.update(IotYbqPredictData.class, param);
+    }
+
+    @Override
+    public void deleteData(String id) {
+        mongoService.removeAllByParam("id", id, "IotYbqPredictData");
+    }
+
+    @Override
+    public IotYbqPredictData getTodayData(String devBid) {
+        String start = DateUtils.getDate();
+        Map param = new HashMap();
+        param.put("devBid", devBid);
+        param.put("date_ybqdataCreatedDate", start + " 00:00" + "," + start + " 23:59:59");
+        List<IotYbqPredictData> iotYbqPredictDataList = mongoService.findAll(IotYbqPredictData.class, param);
+        if (iotYbqPredictDataList == null || iotYbqPredictDataList.size() == 0) {
+            return null;
+        }
+        return iotYbqPredictDataList.get(0);
+    }
+}

+ 147 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YbqCmbService.java

@@ -0,0 +1,147 @@
+package com.yunfeiyun.agmp.iots.device.serviceImp;
+
+import cn.hutool.http.HttpUtil;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.yunfeiyun.agmp.common.constant.ErrorCode;
+import com.yunfeiyun.agmp.common.log.LogCore;
+import com.yunfeiyun.agmp.common.utils.StringUtils;
+import com.yunfeiyun.agmp.common.utils.spring.SpringUtils;
+import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
+import com.yunfeiyun.agmp.iots.common.annotate.HttpCore;
+import com.yunfeiyun.agmp.iots.core.http.HttpClient;
+import com.yunfeiyun.agmp.iots.device.domain.hsyjjc.YjjcResponse;
+import com.yunfeiyun.agmp.iots.task.YbqScheduler;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author zhangn
+ */
+@Slf4j
+@HttpCore(serviceName = ServiceNameConst.SERVICE_HS_YBQ)
+public class YbqCmbService extends HttpClient {
+
+    /**
+     * 获取该设备对应的调用api的host
+     * https://www.cebaowang.com:8089
+     *
+     * @return
+     */
+    String getHostUrl() {
+        JSONObject jsonObject = getClientConfig();
+        String hostUrl = jsonObject.getString("hostUrl");
+        log.info("获取到的hostUrl: {},jsonObject:{}", hostUrl, jsonObject);
+        if (StringUtils.isEmpty(hostUrl)) {
+            throw new IotBizException(ErrorCode.FAILURE.getCode(), "黄氏生物配置错误:没有hostUrl");
+        }
+        return hostUrl;
+    }
+
+    /**
+     * 获取token
+     * 黄氏正式:e8f2becfeacc4664abe2f611b50ea1ee(小板桥)
+     * 云飞临时:4c1a2defc2a84c548f59b541af0c4170
+     */
+    String getToken() {
+        JSONObject jsonObject = getClientConfig();
+        String token = jsonObject.getString("token");
+        return token;
+    }
+
+    /**
+     * 获取预测数据
+     *
+     * @param devId     YBQ00000xxx 设备号
+     * @param startDate 20210303 开始日期
+     * @param endDate   20210505 结束日期
+     * @param token     xxxx不可修改
+     * @param type      1:赤霉病,2:小麦白粉病,3:小麦白粉病,条锈病
+     */
+    public JSONArray getPredictedData(String devId, String startDate, String endDate, String type) {
+        StringBuffer url = new StringBuffer();
+        url.append(getHostUrl());
+        url.append("/");
+        url.append("external/data/predict?");
+        url.append("deviceId=").append(devId);
+        url.append("&startDate=").append(startDate);
+        url.append("&endDate=").append(endDate);
+        url.append("&token=").append(getToken());
+        url.append("&type=").append(type);
+        log.info("【数据同步:{}】", url.toString());
+        String response = HttpUtil.createGet(url.toString()).execute().body();
+        YjjcResponse yjjcResponse = resolveData(response);
+        if (yjjcResponse.isSuccess()) {
+            return JSONArray.parseArray(yjjcResponse.getData());
+        } else {
+            throw new IotBizException(IotErrorCode.FAILURE.getCode(), yjjcResponse.getMessage());
+        }
+    }
+
+    /**
+     * 获取上报数据
+     *
+     * @param devId
+     * @param startDate
+     * @param endDate
+     * @param token
+     * @return
+     */
+    public JSONArray getReportData(String devId, String startDate, String endDate) {
+        StringBuffer url = new StringBuffer();
+        url.append(getHostUrl());
+        url.append("/");
+        url.append("external/data/yunfei/upload?");
+        url.append("deviceId=").append(devId);
+        url.append("&startDate=").append(startDate);
+        url.append("&endDate=").append(endDate);
+        url.append("&token=").append(getToken());
+        log.info("【数据同步:{}】", url.toString());
+        String response = HttpUtil.createGet(url.toString()).execute().body();
+        YjjcResponse yjjcResponse = resolveData(response);
+        if (yjjcResponse.isSuccess()) {
+            return JSONArray.parseArray(yjjcResponse.getData());
+        } else {
+            throw new IotBizException(IotErrorCode.FAILURE.getCode(), yjjcResponse.getMessage());
+        }
+    }
+
+
+    /**
+     * 解析响应的数据
+     *
+     * @param response
+     * @return
+     */
+    private YjjcResponse resolveData(String response) {
+        log.info("【{}】【黄氏预警监测】【请求URL结果】{}", LogCore.getSeq(), StringEscapeUtils.unescapeJava(response));
+        JSONObject result = JSONObject.parseObject(response);
+        String message = result.getString("message");
+        String code = result.getString("code");
+        if ("0".equals(code)) {
+            String data = result.getString("data");
+            if (data == null) {
+                // 有些成功后,没有data信息,比如删除,直接返回0 代表成功
+                return new YjjcResponse(true, null, message);
+            }
+            return new YjjcResponse(true, data, message);
+        } else {
+            return new YjjcResponse(false, null, message);
+        }
+    }
+
+    /**
+     * 单独获取设备的上报数据,用于第一次添加后激活
+     * 查询130分钟内,只要有数据就认为在线
+     *
+     * @param iotDevice 设备
+     */
+    @Override
+    public void deviceCreateHandle(IotDevice iotDevice) {
+        SpringUtils.getBean(YbqScheduler.class).deviceCreateHandle(iotDevice);
+    }
+}

+ 60 - 150
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YfScdDeviceImpl.java

@@ -6,16 +6,15 @@ import com.yunfeiyun.agmp.common.utils.JSONUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
 import com.yunfeiyun.agmp.common.utils.reflect.ReflectUtils;
 import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdDef;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
 import com.yunfeiyun.agmp.iot.common.constant.mqtt.IotMqttConstant;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
-import com.yunfeiyun.agmp.iot.common.service.IotAddressService;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
-import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
 import com.yunfeiyun.agmp.iots.device.common.DeviceAbstractImpl;
-import com.yunfeiyun.agmp.iots.device.service.*;
+import com.yunfeiyun.agmp.iots.device.service.IIotYfScddataService;
+import com.yunfeiyun.agmp.iots.device.service.IYfScdDevice;
 import com.yunfeiyun.agmp.iots.service.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.util.TextUtils;
@@ -23,9 +22,7 @@ import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 
 /** 云飞杀虫灯 */
 @Component(ServiceNameConst.SERVICE_YF_SCD)
@@ -39,9 +36,6 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
     private IIotDeviceService iIotDeviceService;
 
     @Autowired
-    private IotAddressService iotAddressService;
-
-    @Autowired
     private IIotDeviceconfigService iIotDeviceconfigService;
 
     @Autowired
@@ -56,7 +50,24 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
     private IotDeviceAddressService iotDeviceAddressService;
 
 
-    private static final String SERVICE_NAME = ServiceNameConst.SERVICE_YF_SCD;
+    private void publish(IotDevice iotDevice, String mqttMsgContent) {
+        String devCode = iotDevice.getDevCode();
+        List<String> topicList = new ArrayList<>();
+        topicList.add(IotMqttConstant.YFScdTopic.TOPIC_SCD_CMD_PREFIX + devCode);
+        topicList.add(IotMqttConstant.YFScdTopic.TOPIC_SCD_2_CMD_PREFIX + devCode);
+        if(Objects.equals(iotDevice.getDevtypeBid(), IotDeviceDictConst.TYPE_YF_FXSSCD)){
+            //新款风吸式杀虫灯
+            topicList.add(IotMqttConstant.YFScdTopic.TOPIC_FXSSCD_CMD_PREFIX + devCode);
+        }
+        for(String topic:topicList){
+            try{
+                mqttManager.publishMsg(iotDevice.getDevconnBid(), topic, mqttMsgContent);
+                log.info("【YFSCD】发送指令完毕!connectionId:{},topic :{} mqttMsgContent: {}",iotDevice.getDevconnBid(),topic, mqttMsgContent);
+            }catch (Exception e){
+                log.error("【YFSCD】发送指令失败!connectionId:{},topic :{} mqttMsgContent: {}",iotDevice.getDevconnBid(),topic, mqttMsgContent);
+            }
+        }
+    }
 
     /**
      * 下发指令
@@ -92,19 +103,12 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
                 break;
             }
         }
-        IotDevice iotDevice = cmdModel.getIotDevice();
-        String devCode = iotDevice.getDevCode();
-        MqttPublisher mqttPublisher = mqttManager.getPublisherByService(SERVICE_NAME);
-        mqttPublisher.publish(IotMqttConstant.YFScdTopic.TOPIC_SCD_CMD_PREFIX + devCode, mqttMsgContent);
-        mqttPublisher.publish(IotMqttConstant.YFScdTopic.TOPIC_SCD_2_CMD_PREFIX + devCode, mqttMsgContent);
 
-        if("3".equals(iotDevice.getDevTag())){
-            //新款风吸式杀虫灯
-            mqttPublisher.publish(IotMqttConstant.YFScdTopic.TOPIC_FXSSCD_CMD_PREFIX + devCode, mqttMsgContent);
+        if(StringUtils.isNotEmpty(mqttMsgContent)){
+            IotDevice iotDevice= iIotDeviceService.selectIotDeviceByDevBid(cmdModel.getIotDevice().getDevBid());
+            publish(iotDevice, mqttMsgContent);
         }
 
-        log.info("【杀虫灯】发送指令完毕!");
-
         cmdModel.setClogSendresult(clogSendresult);
         cmdModel.setClogDesc(mqttMsgContent);
 
@@ -112,50 +116,19 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
         return null;
     }
 
-    public void cmdData(JSONObject ext, String topic, String devUpdateddate) throws Exception {
-        log.info("杀虫灯数据解析 {},topic:{}", ext.toString(),topic);
+    public Object cmdData(JSONObject dataJson, String topic, String connectionId, String devUpdateddate) throws Exception {
+        log.info("杀虫灯数据解析 {},topic:{}", dataJson.toString(),topic);
 
-        //区分 普通杀虫灯、风吸式杀虫灯:尝试查找,找到设备就继续处理
-
-        //String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
-        String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
-
-
-        //尝试查找设备
-
-        String devCode = topic.substring(topic.lastIndexOf("/") + 1);
-        String devtypeBid = IotDeviceDictEnum.TYPE_YF_SCD.getCode();
-
-        IotDevice iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-
-        if(iotDeviceOld==null){
-            log.info("查不到设备(继续查找),devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            devtypeBid = IotDeviceDictEnum.TYPE_YF_FXSSCD.getCode();
-            iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-        }
-
-        if(iotDeviceOld==null){
-            log.info("查不到设备(继续查找),devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            devtypeBid = IotDeviceDictEnum.TYPE_YF_JGFXSSCD.getCode();
-            iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-        }
-
-        // 设备不存在 就不再处理
-        if (iotDeviceOld == null) {
-            log.error("查不到设备,devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            return;
+        IotDevice oldIotDevice = findIotDevice(topic, dataJson, connectionId);
+        if (oldIotDevice == null) {
+            log.error("未取到 iotDevice");
+            return null;
         }
 
         IotDevice iotDevice = new IotDevice();
-        iotDevice.setDevtypeBid(devtypeBid);
-        iotDevice.setFirmBid(firmBid);
+        iotDevice.setDevBid(oldIotDevice.getDevBid());
         iotDevice.setDevUpdateddate(devUpdateddate);
         iotDevice.setDevStatus("1");//设备上线
-        if(topic.startsWith(IotMqttConstant.YFScdTopic.TOPIC_SCD_2_REPORT_PREFIX)){
-            iotDevice.setDevTag("2");
-        }else if(topic.startsWith(IotMqttConstant.YFScdTopic.TOPIC_FXSSCD_REPORT_PREFIX)){//新风吸式
-            iotDevice.setDevTag("3");
-        }
 
         HashMap<String, String> keyMaps = new HashMap<>();
         keyMaps.put("dver", "devVersion");
@@ -164,16 +137,15 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
 
         for (Map.Entry<String, String> entry : keyMaps.entrySet()) {
             String k = entry.getValue();
-            String v = ext.getString(entry.getKey());
+            String v = dataJson.getString(entry.getKey());
             if (StringUtils.isNotEmpty(v)) {
                 ReflectUtils.invokeSetter(iotDevice, k, v);
             }
         }
-
-        iotDevice.setDevBid(iotDeviceOld.getDevBid());
-
-        if (!Objects.equals(iotDeviceOld.getDevPositionstatus(), "0")) {
-            iotDeviceAddressService.setDeviceAddress(iotDevice, ext.getString("lng"), ext.getString("lat"));
+        String lng = dataJson.getString("lng");
+        String lat = dataJson.getString("lat");
+        if (!Objects.equals(oldIotDevice.getDevPositionstatus(), "0") && StringUtils.isNotEmpty(lng) && StringUtils.isNotEmpty(lat)) {
+            iotDeviceAddressService.setDeviceAddress(iotDevice, lng, lat);
         }
 
         String[] keyArrays = {
@@ -190,8 +162,8 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
         JSONObject extConf = new JSONObject();
         for (String k : keyArrays) {
             String v = "0";
-            if (ext.containsKey(k)) {
-                v = ext.getString(k);
+            if (dataJson.containsKey(k)) {
+                v = dataJson.getString(k);
             }
             extConf.put(k, v);
         }
@@ -202,61 +174,31 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
         iIotDeviceService.updateIotDevice(iotDevice);
         // 创建或更新设备配置信息
         if (StringUtils.isNotEmpty(devConfig)) {
-            iIotDeviceconfigService.createOrUpdateDevConfig(iotDeviceOld, devConfig, iotDevice.getDevUpdateddate());
+            iIotDeviceconfigService.createOrUpdateDevConfig(oldIotDevice, devConfig, iotDevice.getDevUpdateddate());
         }
 
         // 更新设备数据到mongodb
-        iIotYfScddataService.insertScddata(iotDevice, ext);
-        log.info("杀虫灯上报数据-》mongodb");
+        iIotYfScddataService.insertScddata(iotDevice, dataJson);
 
         // 保存 设备最新数据 到redis
-        iIotDevicelasteddataService.createOrUpdateDeviceLastedData(
-                ext, iotDeviceOld, iotDevice.getDevUpdateddate(), 60 * 60 * 24L);
-        log.info("杀虫灯上报数据-》redis");
-
+        iIotDevicelasteddataService.updateDeviceLastedData(oldIotDevice, String.valueOf(dataJson), devUpdateddate);
+        return null;
     }
 
-    public void cmdOffline(JSONObject ext) throws MqttException {
-        log.debug("杀虫灯离线数据 {}", ext.toString());
-        //String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);
-        String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
-        String devCode = ext.getString("imei");
-
-        String devtypeBid = IotDeviceDictEnum.TYPE_YF_SCD.getCode();
-
-        IotDevice iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-
-        if(iotDeviceOld==null){
-            log.info("查不到设备(继续查找),devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            devtypeBid = IotDeviceDictEnum.TYPE_YF_FXSSCD.getCode();
-            iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-        }
-
-        if(iotDeviceOld==null){
-            log.info("查不到设备(继续查找),devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            devtypeBid = IotDeviceDictEnum.TYPE_YF_JGFXSSCD.getCode();
-            iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-        }
-
-        // 设备不存在 就不再处理
-        if (iotDeviceOld == null) {
-            log.error("查不到设备,devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
+    public void cmdOffline(JSONObject dataJson, String topic, String connectionId) throws MqttException {
+        log.debug("杀虫灯离线数据 {}", dataJson.toString());
+        IotDevice oldIotDevice = findIotDevice(topic, dataJson, connectionId);
+        if (oldIotDevice == null) {
+            log.error("未取到 iotDevice");
             return;
         }
 
         IotDevice newIotDevice = new IotDevice();
-        newIotDevice.setDevBid(iotDeviceOld.getDevBid());
+        newIotDevice.setDevBid(oldIotDevice.getDevBid());
         newIotDevice.setDevStatus("0");
         newIotDevice.setDevOfflinedate(DateUtils.dateTimeNow());
         iIotDeviceService.updateIotDevice(newIotDevice);
 
-        String topic = IotMqttConstant.YFScdTopic.TOPIC_SCD_CMD_PREFIX;
-        if(Objects.equals(iotDeviceOld.getDevTag(), "2")){
-            topic = IotMqttConstant.YFScdTopic.TOPIC_SCD_2_CMD_PREFIX;
-        }else if(Objects.equals(iotDeviceOld.getDevTag(), "3")){
-            topic = IotMqttConstant.YFScdTopic.TOPIC_FXSSCD_CMD_PREFIX;
-        }
-
         /**
          * 下发刷新指令,检测设备是否真离线
          */
@@ -264,11 +206,9 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
         payload.put("cmd", "read");
         payload.put("ext", "data");
         String mqttMsgContent = JSONUtils.toJSONString(payload);
+        publish(oldIotDevice, mqttMsgContent);
 
-        MqttPublisher mqttPublisher = mqttManager.getPublisherByService(SERVICE_NAME);
-        mqttPublisher.publish(topic + devCode, mqttMsgContent);
-
-        log.info("[杀虫灯] 下发刷新指令,检测设备是否真离线: " + devCode);
+        log.info("[杀虫灯] 下发刷新指令,检测设备是否真离线: " + oldIotDevice.getDevCode());
     }
 
     /**
@@ -297,21 +237,17 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
             return false;
         }
 
-        if (cmd.equals("data")) {
-            this.cmdData(ext, topic, devUpdateddate);
-        } else if (cmd.equals("offline")) {
-            this.cmdOffline(ext);
+        if ("data".equals(cmd)) {
+            this.cmdData(ext, topic, connectionId, devUpdateddate);
+        } else if ("offline".equals(cmd)) {
+            this.cmdOffline(ext, topic, connectionId);
         }
         return null;
     }
 
-    /**
-     * @param cmdJson
-     * @return
-     */
     @Override
-    public boolean isDeviceProps(JSONObject cmdJson) {
-        return false;
+    public boolean isDeviceProps(JSONObject jobjMsg) {
+        return "data".equals(jobjMsg.getString("cmd"));
     }
 
     /**
@@ -323,33 +259,7 @@ public class YfScdDeviceImpl extends DeviceAbstractImpl implements IYfScdDevice
      */
     @Override
     public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
-        String devCode = topic.substring(topic.lastIndexOf("/") + 1);
-        //String devtypeBid = mqttManager.getDeviceTypeBizId(SERVICE_NAME);// 对于实现类复用的情况,目前这个写法会失效
-        String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
-
-
-        String devtypeBid = IotDeviceDictEnum.TYPE_YF_SCD.getCode();
-
-        IotDevice iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-
-        if(iotDeviceOld==null){
-            log.info("查不到设备(继续查找),devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            devtypeBid = IotDeviceDictEnum.TYPE_YF_FXSSCD.getCode();
-            iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-        }
-
-        if(iotDeviceOld==null){
-            log.info("查不到设备(继续查找),devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            devtypeBid = IotDeviceDictEnum.TYPE_YF_JGFXSSCD.getCode();
-            iotDeviceOld = iIotDeviceService.selectIotDeviceByTypeFirmCode(devtypeBid, firmBid, devCode);
-        }
-
-        // 设备不存在 就不再处理
-        if (iotDeviceOld == null) {
-            log.error("查不到设备,devtypeid[{}],firmid[{}],devcode[{}],SERVICE_NAME[{}]",devtypeBid, firmBid, devCode,SERVICE_NAME);
-            return iotDeviceOld;
-        }
-
-        return iotDeviceOld;
+        String devId = mqttManager.getDevIdByTopic(connectionId,topic);
+        return iIotDeviceService.selectIotDeviceByDevBid(devId);
     }
 }

+ 322 - 0
src/main/java/com/yunfeiyun/agmp/iots/task/YbqScheduler.java

@@ -0,0 +1,322 @@
+package com.yunfeiyun.agmp.iots.task;
+
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.yunfeiyun.agmp.common.utils.DateUtils;
+import com.yunfeiyun.agmp.common.utils.uuid.IdUtils;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqEnvData;
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqPredictData;
+import com.yunfeiyun.agmp.iot.common.enums.ybq.YbqTypeConst;
+import com.yunfeiyun.agmp.iots.core.manager.HttpManager;
+import com.yunfeiyun.agmp.iots.device.service.IotYbqEnvDataService;
+import com.yunfeiyun.agmp.iots.device.service.IotYbqPredictDataService;
+import com.yunfeiyun.agmp.iots.device.serviceImp.YbqCmbService;
+import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
+import com.yunfeiyun.agmp.iots.service.IIotDevicelasteddataService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 作物xx虫子检测设备调度
+ * 比如:小麦赤霉病、玉米大斑病、小麦条锈病、小麦白粉病
+ *
+ * @author zhangn
+ * @see YbqTypeConst
+ */
+@Slf4j
+@Component
+public class YbqScheduler {
+
+    @Autowired
+    private HttpManager httpManager;
+
+    @Autowired
+    private IIotDeviceService iIotDeviceService;
+
+    @Autowired
+    private IotYbqPredictDataService iotYbqPredictDataService;
+
+    @Autowired
+    private IotYbqEnvDataService iotYbqEnvDataService;
+
+    @Autowired
+    private IIotDevicelasteddataService iIotDevicelasteddataService;
+
+
+    /**
+     * 需要同步的类型(已经对接的)
+     */
+    YbqTypeConst[] ybqTypeConsts = {
+            YbqTypeConst.YBQ_XM_CMB,
+            YbqTypeConst.YBQ_SD_DWB,
+            YbqTypeConst.YBQ_YM_DBB,
+    };
+
+
+
+
+    /**
+     * 同步预测数据
+     * 步骤:
+     * 1. 遍历类型查询厂家设备
+     * 2. 根据设备查询预测数据,入库
+     * 每十分钟:0 0/10 0/1 * * ?
+     * 每1小时:0 0 0/1 * * ?
+     * 每天12点:0 0 8 * * ?
+     * 每天8点:10分 0 10 8 * * ?
+     * 对方:每天8点更新
+     * 我们:每天八点半更新
+     */
+    @Scheduled(cron = "0 30 8 * * ?")
+    public void synPredictedData() {
+        String startDate = DateUtils.dateTime();
+        log.info("【开始执行】同步预测数据任务,当前时间: {}", startDate);
+        for (YbqTypeConst ybqTypeConst : ybqTypeConsts) {
+            try {
+                log.debug("正在处理设备类型: {}", ybqTypeConst.getMessage());
+                // 同步赤霉病设备预测数据
+                List<IotDevice> iotDevices = getDevListByServiceName(ybqTypeConst.getServiceName());
+                for (IotDevice iotDevice : iotDevices) {
+                    delaYbqPredictedData(iotDevice, ybqTypeConst, startDate, startDate);
+                }
+            } catch (Exception e) {
+                log.error("处理类型 {} 的设备预测数据时发生异常: ", ybqTypeConst.getMessage(), e);
+            }
+
+        }
+        log.info("【完成】同步预测数据任务执行完毕");
+    }
+
+    /**
+     * 根据设备处理预测数据
+     *
+     * @param iotDevice
+     * @param ybqTypeConst
+     * @param startDate
+     * @param endTime
+     */
+    public void delaYbqPredictedData(IotDevice iotDevice, YbqTypeConst ybqTypeConst, String startDate, String endTime) {
+        try {
+            String code = iotDevice.getDevCode();
+            String devBid = iotDevice.getDevBid();
+            JSONArray jsonArrayDevs = ((YbqCmbService)httpManager.getHttpClientByDevice(iotDevice)).getPredictedData( code, startDate, endTime,ybqTypeConst.getCode());
+            if (jsonArrayDevs == null || jsonArrayDevs.size() == 0) {
+                log.error("【预测数据同步】【{}}】暂无设备数据,无需导入", ybqTypeConst.getMessage());
+                return;
+            }
+            int count = jsonArrayDevs.size();
+            for (int i = 0; i < count; i++) {
+                try {
+                    JSONObject jobj = jsonArrayDevs.getJSONObject(i);
+                    if (jobj != null) {
+                        // 得到第三方返回的设备的预测数据
+                        String computeDate = jobj.getString("computeDate");
+                        String deviceId = jobj.getString("deviceId");
+                        String value = jobj.getString("value");
+                        //保存预测记录、更新预测最新数据
+                        // 查查当天的有没有,有的话更新,没有的话添加
+                        IotYbqPredictData iotYbqPredictDataToday = iotYbqPredictDataService.getTodayData(devBid);
+                        if (iotYbqPredictDataToday == null) {
+                            IotYbqPredictData iotYbqPredictData = new IotYbqPredictData();
+                            iotYbqPredictData.setId(IdUtils.fastUUID());
+                            iotYbqPredictData.setYbqdataBid(iotYbqPredictData.getId());
+                            iotYbqPredictData.setTid(iotDevice.getTid());
+                            iotYbqPredictData.setComputeDate(computeDate);
+                            iotYbqPredictData.setDateDevType(ybqTypeConst.getCode());
+                            iotYbqPredictData.setValue(value);
+                            iotYbqPredictData.setYbqdataContent(jobj);
+                            iotYbqPredictData.setDeviceId(deviceId);
+                            iotYbqPredictData.setDevBid(devBid);
+                            iotYbqPredictData.setDevTypeBid(iotDevice.getDevtypeBid());
+                            iotYbqPredictData.setYbqdataModifiedDate(DateUtils.dateTimeNow());
+                            iotYbqPredictData.setYbqdataCreatedDate(DateUtils.dateTimeNow());
+                            iotYbqPredictDataService.insertData(iotYbqPredictData);
+                        } else {
+                            iotYbqPredictDataToday.setComputeDate(computeDate);
+                            iotYbqPredictDataToday.setValue(value);
+                            iotYbqPredictDataToday.setYbqdataModifiedDate(DateUtils.dateTimeNow());
+                            // 只更新特定的字段
+                            iotYbqPredictDataService.updateData(iotYbqPredictDataToday);
+                        }
+                        // 同步更新设备信息
+                        String extInfo = iotDevice.getExtInfo() == null ? "{}" : iotDevice.getExtInfo();
+                        JSONObject jsonObject = JSONObject.parseObject(extInfo);
+                        //预测时间
+                        jsonObject.put("computeDate", DateUtils.dateTimeNow());
+                        //预测value
+                        jsonObject.put("computeValue", value);
+                        iIotDeviceService.updateIotDeviceExtInfo(devBid, jsonObject.toString());
+                        log.debug("设备ID={} 的预测数据处理成功,computeDate={}, value={}", iotDevice.getDevBid(), computeDate, value);
+                    }
+                } catch (Exception e) {
+                    log.error("处理设备ID={} 的预测数据时出错: ", iotDevice.getDevBid(), e);
+                }
+            }
+        } catch (Exception e) {
+            log.error("处理类型 {} 的设备预测数据时发生异常: ", ybqTypeConst.getMessage(), e);
+        }
+
+    }
+
+
+    /**
+     * 同步上报数据
+     * 测试:每五分钟:0 0/5 0/1 * * ?
+     * 对方:每2个小时 13、15
+     * 2024-06-05 12:10:00
+     * 2024-06-05 13:10:00
+     * 2024-06-05 14:10:00
+     * 2024-06-05 15:10:00
+     * 2024-06-05 16:10:00
+     * 延迟10分钟再拉取
+     * 我们:从10分开始,每一个小时拉取一次
+     */
+    @Scheduled(cron = "0 10/59 0/1 * * ? ")
+    public void synReportData() {
+        String startTime = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, DateUtils.addMinutes(new Date(), -20));
+        String endTime = DateUtils.dateTimeNow();
+        log.info("【开始】同步上报数据任务执行于 {}", endTime);
+        for (YbqTypeConst ybqTypeConst : ybqTypeConsts) {
+            try {
+                log.debug("正在处理设备类型: {}", ybqTypeConst.getMessage());
+                // 同步各类设备预测数据
+                List<IotDevice> iotDevices = getDevListByServiceName(ybqTypeConst.getServiceName());
+                for (IotDevice iotDevice : iotDevices) {
+                    dealEnvDataByDevice(iotDevice, ybqTypeConst, startTime, endTime);
+                }
+            } catch (Exception e) {
+                log.error("处理类型 {} 的设备预测数据时发生异常: ", ybqTypeConst.getMessage(), e);
+
+            }
+        }
+    }
+
+    /**
+     * 单独处理设备数据,为了与主动获取更新复用代码
+     *
+     * @param iotDevice
+     * @param ybqTypeConst
+     * @param startTime
+     * @param endTime
+     */
+    void dealEnvDataByDevice(IotDevice iotDevice, YbqTypeConst ybqTypeConst, String startTime, String endTime) {
+        try {
+            String code = iotDevice.getDevCode();
+            String devBid = iotDevice.getDevBid();
+            // 临时时间写成过去的,把数据拉起过来
+            JSONArray jsonArrayDevs = ((YbqCmbService)httpManager.getHttpClientByDevice(iotDevice)).getReportData( code, startTime, endTime);
+            if (jsonArrayDevs == null || jsonArrayDevs.size() == 0) {
+                log.error("【上报数据同步】【{}}】暂无设备数据,无需导入", ybqTypeConst.getMessage());
+                return;
+            }
+            int count = jsonArrayDevs.size();
+            for (int i = 0; i < count; i++) {
+                try {
+                    JSONObject jobj = jsonArrayDevs.getJSONObject(i);
+                    if (jobj != null) {
+                        // 得到第三方返回的设备的预测数据
+                        IotYbqEnvData iotYbqEnvData = jobj.to(IotYbqEnvData.class);
+                        String collectTime = jobj.getString("collectTime");
+                        iotYbqEnvData.setId(IdUtils.fastUUID());
+                        iotYbqEnvData.setTid(iotDevice.getTid());
+                        iotYbqEnvData.setYbqdataBid(iotYbqEnvData.getId());
+                        iotYbqEnvData.setDevBid(devBid);
+                        iotYbqEnvData.setDateDevType(ybqTypeConst.getCode());
+                        iotYbqEnvData.setYbqdataContent(jobj);
+                        //iotYbqEnvData.setYbqdataCreatedDate(DateUtils.dateTimeNow());
+                        iotYbqEnvData.setYbqdataCreatedDate(collectTime);
+                        iotYbqEnvDataService.insertData(iotYbqEnvData);
+                        // 同步更新设备信息
+                        String extInfo = iotDevice.getExtInfo() == null ? "{}" : iotDevice.getExtInfo();
+                        JSONObject jsonObject = JSONObject.parseObject(extInfo);
+                        //上报时间
+                        jsonObject.put("collectTime", DateUtils.dateTimeNow());
+                        iIotDeviceService.updateIotDeviceExtInfo(devBid, jsonObject.toString());
+                        log.info("设备ID: {}, 收集时间: {} 的数据已成功插入数据库", devBid, collectTime);
+                        // 保存 设备最新数据 到redis
+                        iIotDevicelasteddataService.createOrUpdateDeviceLastedData(jobj, iotDevice, DateUtils.dateTimeNow(), 60 * 60 * 24L);
+                    }
+                } catch (Exception e) {
+                    log.error("{}", e);
+                }
+            }
+        } catch (Exception e) {
+            log.error("处理类型 {} 的设备上报数据时发生异常: ", ybqTypeConst.getMessage(), e);
+        }
+
+    }
+
+    /**
+     * 单独获取设备的上报数据,用于第一次添加后激活
+     * 查询130分钟内,只要有数据就认为在线
+     *
+     * @param iotDevice 设备
+     */
+    public void deviceCreateHandle(IotDevice iotDevice) {
+        if (iotDevice == null) {
+            log.warn("设备信息为空,未找到对应设备信息,无法执行激活流程");
+            return;
+        }
+        String devId = iotDevice.getDevBid();
+
+        log.info("【开始】单独获取设备ID [{}] 的上报数据,用于首次激活", devId);
+        try {
+
+
+            String startTime = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, DateUtils.addMinutes(new Date(), -600 * 20));
+            String endTime = DateUtils.dateTimeNow();
+
+            //预测数据的时间,20200101格式
+            String startDate = DateUtils.dateTime();
+
+            // 处理赤霉病
+            if (IotDeviceDictConst.TYPE_HS_YBQ_CMB.equals(iotDevice.getDevtypeBid())) {
+                log.info("处理设备ID [{}] 类型 [{}] 的监测环境数据和预测数据更新", devId, IotDeviceDictConst.TYPE_HS_YBQ_CMB);
+                // 更新监测环境数据
+                dealEnvDataByDevice(iotDevice, YbqTypeConst.YBQ_XM_CMB, startTime, endTime);
+                // 更新预测数据
+                delaYbqPredictedData(iotDevice, YbqTypeConst.YBQ_XM_CMB, startDate, startDate);
+            }
+            // 处理稻瘟病
+            else if (IotDeviceDictConst.TYPE_HS_YBQ_DWB.equals(iotDevice.getDevtypeBid())) {
+                log.info("处理设备ID [{}] 类型 [{}] 的监测环境数据和预测数据更新", devId, IotDeviceDictConst.TYPE_HS_YBQ_DWB);
+                // 更新检测环境数据
+                dealEnvDataByDevice(iotDevice, YbqTypeConst.YBQ_SD_DWB, startTime, endTime);
+                // 更新预测数据
+                delaYbqPredictedData(iotDevice, YbqTypeConst.YBQ_SD_DWB, startDate, startDate);
+            }  // 处理玉米大斑病
+            else if (IotDeviceDictConst.TYPE_HS_YBQ_DBB.equals(iotDevice.getDevtypeBid())) {
+                log.info("处理设备ID [{}] 类型 [{}] 的监测环境数据和预测数据更新", devId, IotDeviceDictConst.TYPE_HS_YBQ_DBB);
+                // 更新检测环境数据
+                dealEnvDataByDevice(iotDevice, YbqTypeConst.YBQ_YM_DBB, startTime, endTime);
+                // 更新预测数据
+                delaYbqPredictedData(iotDevice, YbqTypeConst.YBQ_YM_DBB, startDate, startDate);
+            } else {
+                log.warn("设备ID [{}] 的设备类型未知,无法识别处理", devId);
+            }
+        } catch (Exception e) {
+            log.error("处理设备ID [{}] 上报数据时发生异常", devId, e);
+        }
+        log.info("【完成】设备ID [{}] 的上报数据处理完毕", devId);
+    }
+
+    /**
+     * 根据服务类查询设备
+     *
+     * @param serviceName 服务名称
+     * @return 设备列表
+     */
+    public List<IotDevice> getDevListByServiceName(String serviceName) {
+        log.debug("开始根据服务名称 [{}] 查询设备", serviceName);
+        List<IotDevice> iotDevices = iIotDeviceService.selectAllDeviceByDeviceServiceName(serviceName);
+        log.debug("根据服务名称 [{}] 查询到 {} 台设备", serviceName, iotDevices.size());
+        return iotDevices;
+    }
+}