Pārlūkot izejas kodu

新增 气象站墒情站设备对接

zhaiyifei 1 gadu atpakaļ
vecāks
revīzija
d8a0078730

+ 51 - 51
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/YfQxzDeviceImpl.java

@@ -13,13 +13,10 @@ import com.yunfeiyun.agmp.iot.common.domain.IotBaseEntity;
 import com.yunfeiyun.agmp.iot.common.domain.IotBzydata;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotYfqxzdata;
-import com.yunfeiyun.agmp.iot.common.enums.EnumWarnType;
-import com.yunfeiyun.agmp.iot.common.model.IotWarncheck;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
 import com.yunfeiyun.agmp.iot.common.service.IotAddressService;
 import com.yunfeiyun.agmp.iot.common.service.MongoService;
 import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
-import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
 import com.yunfeiyun.agmp.iots.device.common.DeviceAbstractImpl;
 import com.yunfeiyun.agmp.iots.device.domain.bzy.YfBzyTakephotoMsg;
 import com.yunfeiyun.agmp.iots.device.domain.yfqxz.YfQxzConfigMsg;
@@ -31,7 +28,6 @@ import com.yunfeiyun.agmp.iots.service.*;
 import com.yunfeiyun.agmp.iots.service.impl.WarnService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.util.TextUtils;
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -140,11 +136,14 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
             }
         }
 
-        MqttPublisher mqttPublisher = mqttManager.getPublisherByService(SERVICE_NAME);
-        mqttPublisher.publish(IotMqttConstant.YFQxzTopic.TOPIC_QXZ_CMD_PREFIX + cmdModel.getIotDevice().getDevCode(), mqttMsgContent);
-        log.info(TAG+"发送指令完毕!");
+        String topic = IotMqttConstant.YFQxzTopic.TOPIC_QXZ_CMD_PREFIX + cmdModel.getIotDevice().getDevCode();
+        IotDevice iotDevice= iIotDeviceService.selectIotDeviceByDevBid(cmdModel.getIotDevice().getDevBid());
+        mqttManager.publishMsg(iotDevice.getDevconnBid(), topic, mqttMsgContent);
+        log.info("【YFQXZ】发送指令完毕!connectionId:{},topic :{} mqttMsgContent: {}",iotDevice.getDevconnBid(),topic, mqttMsgContent);
+
         cmdModel.setClogSendresult(clogSendresult);
         cmdModel.setClogDesc(mqttMsgContent);
+
         iIotCmdlogService.insertSuccessCmdlog(cmdModel);
 
         //返回值待定
@@ -152,7 +151,6 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
 
     }
 
-
     @Override
     public Object receiveData(String topic, JSONObject dataJson,String connectionId) throws Exception {
         if (TextUtils.isEmpty(topic)) {
@@ -296,21 +294,21 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
         }else if( IotDeviceDictConst.TYPE_YF_GXZW.equals(iotDeviceOld.getDevtypeBid()) ){
             warnService.checkGssqData("0","",iotDeviceOld.getDevBid());
         }*/
-        EnumWarnType warnType = EnumWarnType.findEnumByDevbid(iotDeviceOld.getDevtypeBid());
-        if(warnType != null){
-            warnService.checkSensData(warnType.getCode(),"0","",iotDeviceOld.getDevBid());
-        }
-
-        if("3".equals(warnService.getWarnVer())){
-            //要求预警检查 V3
-            IotWarncheck warncheck = new IotWarncheck();
-            warncheck.setDevBid(iotDeviceOld.getDevBid());
-            // TODO
-            //  iotsMqService.sendMsg(IotMqConstant.TOPIC_WARNCHECK, IotMqConstant.TOPIC_WARNCHECK, warncheck);
-        }else{
-            //预警检查 V2
-            warnService.checkSensData("0","",iotDeviceOld.getDevBid());
-        }
+//        EnumWarnType warnType = EnumWarnType.findEnumByDevbid(iotDeviceOld.getDevtypeBid());
+//        if(warnType != null){
+//            warnService.checkSensData(warnType.getCode(),"0","",iotDeviceOld.getDevBid());
+//        }
+//
+//        if("3".equals(warnService.getWarnVer())){
+//            //要求预警检查 V3
+//            IotWarncheck warncheck = new IotWarncheck();
+//            warncheck.setDevBid(iotDeviceOld.getDevBid());
+//            // TODO
+//            //  iotsMqService.sendMsg(IotMqConstant.TOPIC_WARNCHECK, IotMqConstant.TOPIC_WARNCHECK, warncheck);
+//        }else{
+//            //预警检查 V2
+//            warnService.checkSensData("0","",iotDeviceOld.getDevBid());
+//        }
     }
 
 
@@ -396,25 +394,28 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
 
         iIotDeviceService.updateIotDevice(newIotDevice);
         // 主动进行查询,以确保不是误判离线
-
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-
-                //主动进行查询,以确保不是误判离线
-                YfQxzReqMsg reqMsg = new YfQxzReqMsg();
-                JSONObject jobjParams = new JSONObject();
-                jobjParams.put("type","data");
-                reqMsg.setExt(jobjParams);
-
-                publishMsg(devCode,reqMsg);
-
-            }
-        }).start();
+        if("offline".equalsIgnoreCase(cmd)){
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(2*1000);
+                    }catch (InterruptedException e){
+                        e.printStackTrace();
+                    }
+                    //主动进行查询,以确保不是误判离线
+                    log.info("【YFQXZ】主动进行查询,以确保不是误判离线" + iotDeviceOld);
+                    YfQxzReqMsg reqMsg = new YfQxzReqMsg();
+                    JSONObject jobjParams = new JSONObject();
+                    jobjParams.put("type","data");
+                    reqMsg.setExt(jobjParams);
+
+                    publishMsg(iotDeviceOld, reqMsg);
+                }
+            }).start();
+        }
     }
 
-
-
     @Override
     public boolean isDeviceProps(JSONObject jobjMsg) {
 
@@ -449,17 +450,16 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
      * @param devCode
      * @param reqMsg
      */
-    private void publishMsg(String devCode, YfQxzMsg reqMsg){
-
-        MqttPublisher mqttPublisher = mqttManager.getPublisherByService(SERVICE_NAME);
-        if(mqttPublisher!=null){
-            try {
-                mqttPublisher.publish(IotMqttConstant.YFQxzTopic.TOPIC_QXZ_CMD_PREFIX + devCode, JSON.toJSON(reqMsg).toString());
-            } catch (MqttException e) {
-                log.error("publishMsg",e);
-            }
-        }else{
-            log.error("mqttPublisher is null");
+    private void publishMsg(IotDevice iotDevice, YfQxzMsg reqMsg) {
+        String devCode = iotDevice.getDevCode();
+        String mqttMsgContent = JSON.toJSONString(reqMsg);
+        String topic = IotMqttConstant.YFQxzTopic.TOPIC_QXZ_CMD_PREFIX + devCode;
+
+        try{
+            mqttManager.publishMsg(iotDevice.getDevconnBid(), topic, mqttMsgContent);
+            log.info("【YFQXZ】发送指令完毕!connectionId:{},topic :{} mqttMsgContent: {}",iotDevice.getDevconnBid(),topic, mqttMsgContent);
+        }catch (Exception e){
+            log.error("【YFQXZ】发送指令异常",e);
         }
     }
 }