Prechádzať zdrojové kódy

新增 新普惠气象墒情物联网设备功能对接

zhaiyifei 1 rok pred
rodič
commit
54c2b1b779

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/DeviceTopicService.java

@@ -88,7 +88,7 @@ public class DeviceTopicService {
             case ServiceNameConst.SERVICE_BIG_DATA_MONITOR: {
                 return getHKBatchSubTopic();
             }
-            case ServiceNameConst.SERVICE_XPH:
+            case ServiceNameConst.SERVICE_XPH_YF_QXZ:
                 return getXphDeviceBatchSubTopic(deviceId);
             case ServiceNameConst.SERVICE_HPF_ZNKG:
                 return getHpfZnkgDeviceBatchSubTopic(deviceId);

+ 6 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/service/IXphYfQxzDevice.java

@@ -0,0 +1,6 @@
+package com.yunfeiyun.agmp.iots.device.service;
+
+import com.yunfeiyun.agmp.iots.device.common.Device;
+
+/** 新普惠云飞气象站 */
+public interface IXphYfQxzDevice extends Device {}

+ 393 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/XphYfQxzDeviceImpl.java

@@ -0,0 +1,393 @@
+package com.yunfeiyun.agmp.iots.device.serviceImp;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.yunfeiyun.agmp.common.utils.DateUtils;
+import com.yunfeiyun.agmp.common.utils.JSONUtils;
+import com.yunfeiyun.agmp.common.utils.StringUtils;
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
+import com.yunfeiyun.agmp.iot.common.constant.mqtt.IotMqttConstant;
+import com.yunfeiyun.agmp.iot.common.domain.*;
+import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
+import com.yunfeiyun.agmp.iot.common.service.IotAddressService;
+import com.yunfeiyun.agmp.iot.common.service.MongoService;
+import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
+import com.yunfeiyun.agmp.iots.device.common.DeviceAbstractImpl;
+import com.yunfeiyun.agmp.iots.device.domain.yfqxz.YfQxzMsg;
+import com.yunfeiyun.agmp.iots.device.domain.yfqxz.YfQxzReqMsg;
+import com.yunfeiyun.agmp.iots.device.service.IXphYfQxzDevice;
+import com.yunfeiyun.agmp.iots.service.*;
+import com.yunfeiyun.agmp.iots.service.impl.WarnService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.util.TextUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.Objects;
+
+
+/**
+ * 新普惠云飞气象站
+ *(墒情站也共用气象站的deviceimpl)
+ *
+ * @author mzq
+ */
+@Component(ServiceNameConst.SERVICE_XPH_YF_QXZ)
+@Slf4j
+public class XphYfQxzDeviceImpl extends DeviceAbstractImpl implements IXphYfQxzDevice {
+
+    private static final String TAG = "【新普惠云飞气象站/墒情站】";
+
+    private static final String SERVICE_NAME = ServiceNameConst.SERVICE_XPH_YF_QXZ;
+
+    @Autowired
+    private MqttManager mqttManager;
+
+    @Autowired
+    private IIotDeviceconfigService iIotDeviceconfigService;
+
+    @Autowired
+    private IIotDevicelasteddataService iIotDevicelasteddataService;
+
+    @Autowired
+    private MongoService<IotBaseEntity> mongoService;
+
+    @Autowired
+    private IotAddressService iotAddressService;
+
+    @Autowired
+    private IIotDeviceService iIotDeviceService;
+
+    @Autowired
+    private IIotCmdlogService iIotCmdlogService;
+
+    @Autowired
+    private WarnService warnService;
+
+    @Autowired
+    private IotDeviceAddressService iotDeviceAddressService;
+
+
+
+    @Override
+    public Object sendCmd(CmdModel cmdModel) throws Exception {
+        log.info("新普惠云飞气象墒情不支持设备操控");
+        //返回值待定
+        return true;
+
+    }
+
+    @Override
+    public Object receiveData(String topic, JSONObject dataJson,String connectionId) throws Exception {
+        if (TextUtils.isEmpty(topic)) {
+            log.error("topic: empty");
+            return false;
+        }
+
+        if (dataJson==null) {
+            log.error("dataJson: null");
+            return false;
+        }
+
+        log.info(TAG+"【收到 MQTT消息】  {}, topic:{}", dataJson,topic);
+        // 接收设备上报数据后的处理逻辑
+        String cmd = dataJson.getString("cmd");
+        if (TextUtils.isEmpty(cmd)) {
+            log.error("未取到cmd");
+            return false;
+        }
+        JSONObject ext = dataJson.getJSONObject("ext");
+        if (ext == null) {
+            log.error("未取到ext");
+            return false;
+        }
+
+        String devCode = getDevcode(topic);
+        if (TextUtils.isEmpty(devCode)) {
+            log.error("未取到 devCode");
+            return false;
+        }
+        String devUpdateddate = dataJson.getString("devUpdateddate");
+        if(StringUtils.isEmpty(devUpdateddate)){
+            devUpdateddate = DateUtils.dateTimeNow();
+        }
+
+        IotDevice iotDevice = findIotDevice(topic, dataJson, connectionId);
+        if (iotDevice == null) {
+            log.error("未取到 iotDevice");
+            return false;
+        }
+
+        if (cmd.equalsIgnoreCase("terminalData") ) {
+            this.processData(iotDevice, ext, devUpdateddate);
+        } else if (cmd.equalsIgnoreCase("status") ) {
+            this.processStatus(iotDevice, ext, devUpdateddate);
+        } else if (cmd.equalsIgnoreCase("config") ) {
+            this.processConfig(iotDevice, ext, devUpdateddate);
+        } else if (cmd.equalsIgnoreCase("online") || cmd.equalsIgnoreCase("offline")) {
+            this.processOffline(iotDevice, dataJson, devUpdateddate);
+        }
+        return true;
+    }
+
+
+    public String bzyData(JSONObject ext, String cId, IotDevice iotDevice) {
+        // 更新设备数据信息到数据库 mongodb
+        IotBzydata iotBzydata = new IotBzydata();
+        iotBzydata.setCId(cId);
+        iotBzydata.setBzydataBid(iotBzydata.getUUId());
+        iotBzydata.setDevBid(iotDevice.getDevBid());
+        iotBzydata.setBzydataCreatedDate(iotDevice.getDevUpdateddate());
+        iotBzydata.setBzydataContent(ext);
+
+        mongoService.saveOne(iotBzydata);
+
+        String[] keyArrays = {
+                "shake_sec",
+                "cul_time",
+                "set_temp",
+                "dat_f",
+                "coll_time",
+                "ds",
+                "cold_sw",
+        };
+
+
+        JSONObject extConf = new JSONObject();
+        for (String k : keyArrays) {
+            String v = "0";
+            if (ext.containsKey(k)) {
+                v = ext.getString(k);
+            }
+            extConf.put(k, v);
+        }
+        return JSONUtils.toJSONString(extConf);
+
+    }
+
+
+    /**
+     * 处理 采集数据 消息
+     * @param ext
+     * @throws Exception
+     */
+    public void processData(IotDevice iotDeviceOld, JSONObject ext, String devUpdateddate) throws Exception {
+        log.info(TAG+"数据解析 processData {}", ext.toString());
+        JSONArray jarrData = ext.getJSONArray("data");
+
+        iotDeviceOld.setDevUpdateddate(devUpdateddate);
+        iotDeviceOld.setDevStatus("1");//在线
+        // 更新设备基础信息数据库 mysql
+        iIotDeviceService.updateIotDevice(iotDeviceOld);
+
+        Date reportDt = DateUtils.dateTime(DateUtils.YYYY_MM_DD_HH_MM_SS, devUpdateddate);
+        //String reportTime = ext.getString("MonitorTime");
+        //不采信设备提供的  采集时间
+        /*if(!TextUtils.isEmpty(reportTime)){
+            reportDt = DateUtils.parseDate(reportTime);
+        }*/
+
+        // 更新设备数据信息到数据库 mongodb
+        String cId = iotDeviceOld.getTid();
+
+        for (Object item: jarrData) {
+            JSONObject jobj = (JSONObject) item;
+            IotXphYfqxzdata qxzdata = new IotXphYfqxzdata();
+            qxzdata.setCId(cId);
+            qxzdata.setDevBid(iotDeviceOld.getDevBid());
+            qxzdata.setENum(jobj.getString("eNum"));
+            qxzdata.setEName(jobj.getString("eName"));
+            qxzdata.setEKey(jobj.getString("eKey"));
+            qxzdata.setEValue(jobj.getString("eValue"));
+
+            qxzdata.setTime(reportDt);
+
+            mongoService.saveOne(qxzdata);
+
+            //把“上报时间”放里面
+            jobj.put("time",DateUtils.parseDateToStr("yyyy-MM-dd'T'HH:mm:ss.SSSX",reportDt));
+            jobj.put("devBid",qxzdata.getDevBid());
+            jobj.put("cId",qxzdata.getCId());
+        }
+        iIotDevicelasteddataService.updateDeviceLastedData(iotDeviceOld,jarrData.toString(), devUpdateddate);
+
+        //预警
+        //云飞 气象站 和 墒情站  都从这里进数据
+/*        if( IotDeviceDictConst.TYPE_YF_QXZ.equals(iotDeviceOld.getDevtypeBid()) ){
+            warnService.checkQxzData("0","",iotDeviceOld.getDevBid());
+        }else if( IotDeviceDictConst.TYPE_YF_SQZ.equals(iotDeviceOld.getDevtypeBid()) ){
+            warnService.checkGssqData("0","",iotDeviceOld.getDevBid());
+        }else if( IotDeviceDictConst.TYPE_YF_GXZW.equals(iotDeviceOld.getDevtypeBid()) ){
+            warnService.checkGssqData("0","",iotDeviceOld.getDevBid());
+        }*/
+//        EnumWarnType warnType = EnumWarnType.findEnumByDevbid(iotDeviceOld.getDevtypeBid());
+//        if(warnType != null){
+//            warnService.checkSensData(warnType.getCode(),"0","",iotDeviceOld.getDevBid());
+//        }
+//
+//        if("3".equals(warnService.getWarnVer())){
+//            //要求预警检查 V3
+//            IotWarncheck warncheck = new IotWarncheck();
+//            warncheck.setDevBid(iotDeviceOld.getDevBid());
+//            // TODO
+//            //  iotsMqService.sendMsg(IotMqConstant.TOPIC_WARNCHECK, IotMqConstant.TOPIC_WARNCHECK, warncheck);
+//        }else{
+//            //预警检查 V2
+//            warnService.checkSensData("0","",iotDeviceOld.getDevBid());
+//        }
+    }
+
+
+    /**
+     * 处理 状态消息
+     * @param ext
+     * @throws Exception
+     */
+    public void processStatus(IotDevice iotDeviceOld, JSONObject ext, String devUpdateddate) throws Exception {
+        log.info(TAG+"数据解析 processStatus {}", ext.toString());
+
+        JSONObject jobjStatus = ext.getJSONObject("terminalStatus");
+
+        String devCode = iotDeviceOld.getDevCode();
+
+        IotDevice iotDevice = new IotDevice();
+        iotDevice.setDevtypeBid(iotDeviceOld.getDevtypeBid());
+        iotDevice.setFirmBid(iotDeviceOld.getFirmBid());
+        iotDevice.setDevUpdateddate(devUpdateddate);
+        iotDevice.setDevStatus("1");
+
+        iotDevice.setDevCode(devCode);
+        iotDevice.setDevBid(iotDeviceOld.getDevBid());
+        iotDevice.setDevVersion(jobjStatus.getString("Version"));
+
+        String lng = jobjStatus.getString("longitude");
+        String lat = jobjStatus.getString("latitude");
+
+        //如果存在经纬度
+        if(!TextUtils.isEmpty(lng) &&! TextUtils.isEmpty(lat)){
+            // 自动的可以更新数据库   0 手动
+            if (!Objects.equals(iotDeviceOld.getDevPositionstatus(), "0")) {
+                iotDeviceAddressService.setDeviceAddress(iotDevice, lng, lat);
+            }
+        }
+
+        // 更新设备基础信息数据库 mysql
+        iIotDeviceService.updateIotDevice(iotDevice);
+    }
+
+    /**
+     * 处理收到的“配置信息”消息
+     *
+     * 目前只处理一个配置项 interval 上报间隔时间
+     *
+     * (系统发送“设置配置项消息”后,设备回发此消息)
+     *
+     * @param ext
+     * @throws Exception
+     */
+    private void processConfig(IotDevice iotDeviceOld, JSONObject ext, String devUpdateddate) throws Exception {
+        log.info(TAG+"数据解析 processConfig {}", ext);
+        String devConfig = ext.toString();
+        // 创建或更新设备配置信息
+        if (StringUtils.isNotEmpty(devConfig)) {
+            iIotDeviceconfigService.createOrUpdateDevConfig(iotDeviceOld, devConfig, devUpdateddate);
+        }
+    }
+
+    public void processOffline(IotDevice iotDeviceOld, JSONObject dataJson, String devUpdateddate) {
+        log.info(TAG+"处理 上线/离线 消息 {}", dataJson.toString());
+        String cmd = dataJson.getString("cmd");
+        JSONObject ext = dataJson.getJSONObject("ext");
+
+        String devCode = ext.getString("imei");
+        if (TextUtils.isEmpty(devCode)) {
+            log.error("未取到 devCode");
+            return;
+        }
+
+        // 设备不存在 就不再处理
+        IotDevice newIotDevice = new IotDevice();
+        newIotDevice.setDevBid(iotDeviceOld.getDevBid());
+
+        if("offline".equalsIgnoreCase(cmd)){
+            newIotDevice.setDevStatus("0");
+            newIotDevice.setDevOfflinedate(devUpdateddate);
+        }else if("online".equalsIgnoreCase(cmd)){
+            newIotDevice.setDevStatus("1");
+
+        }
+        newIotDevice.setDevUpdateddate(devUpdateddate);
+
+        iIotDeviceService.updateIotDevice(newIotDevice);
+        // 主动进行查询,以确保不是误判离线
+        if("offline".equalsIgnoreCase(cmd)){
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(2*1000);
+                    }catch (InterruptedException e){
+                        e.printStackTrace();
+                    }
+                    //主动进行查询,以确保不是误判离线
+                    log.info("【YFQXZ】主动进行查询,以确保不是误判离线" + iotDeviceOld);
+                    YfQxzReqMsg reqMsg = new YfQxzReqMsg();
+                    JSONObject jobjParams = new JSONObject();
+                    jobjParams.put("type","data");
+                    reqMsg.setExt(jobjParams);
+
+                    publishMsg(iotDeviceOld, reqMsg);
+                }
+            }).start();
+        }
+    }
+
+    @Override
+    public boolean isDeviceProps(JSONObject jobjMsg) {
+
+        if ("data".equals(jobjMsg.getString("cmd"))) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
+        String devId = mqttManager.getDevIdByTopic(connectionId,topic);
+        return iIotDeviceService.selectIotDeviceByDevBid(devId);
+    }
+
+
+    /**
+     * 从topic中提取
+     * @param topic
+     * @return
+     */
+    private String getDevcode(String topic){
+
+        if (TextUtils.isEmpty(topic)){
+            return "";
+        }
+        return topic.substring(topic.lastIndexOf("/")+1);
+    }
+
+    /**
+     * 发MQTT消息
+     * @param devCode
+     * @param reqMsg
+     */
+    private void publishMsg(IotDevice iotDevice, YfQxzMsg reqMsg) {
+        String devCode = iotDevice.getDevCode();
+        String mqttMsgContent = JSON.toJSONString(reqMsg);
+        String topic = IotMqttConstant.YFQxzTopic.TOPIC_QXZ_CMD_PREFIX + devCode;
+
+        try{
+            mqttManager.publishMsg(iotDevice.getDevconnBid(), topic, mqttMsgContent);
+            log.info("【YFQXZ】发送指令完毕!connectionId:{},topic :{} mqttMsgContent: {}",iotDevice.getDevconnBid(),topic, mqttMsgContent);
+        }catch (Exception e){
+            log.error("【YFQXZ】发送指令异常",e);
+        }
+    }
+}