|
|
@@ -6,22 +6,18 @@ import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.yunfeiyun.agmp.common.utils.DateUtils;
|
|
|
import com.yunfeiyun.agmp.common.utils.JSONUtils;
|
|
|
import com.yunfeiyun.agmp.common.utils.StringUtils;
|
|
|
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
|
|
|
-import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
|
|
|
import com.yunfeiyun.agmp.iot.common.constant.cmd.CmdDef;
|
|
|
-
|
|
|
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
|
|
|
import com.yunfeiyun.agmp.iot.common.constant.mqtt.IotMqttConstant;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.IotBaseEntity;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.IotBzydata;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.IotYfqxzdata;
|
|
|
import com.yunfeiyun.agmp.iot.common.enums.EnumWarnType;
|
|
|
-import com.yunfeiyun.agmp.iot.common.exception.IotBaseException;
|
|
|
import com.yunfeiyun.agmp.iot.common.model.IotWarncheck;
|
|
|
import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
|
|
|
import com.yunfeiyun.agmp.iot.common.service.IotAddressService;
|
|
|
import com.yunfeiyun.agmp.iot.common.service.MongoService;
|
|
|
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
|
|
|
import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
|
|
|
import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
|
|
|
import com.yunfeiyun.agmp.iots.device.common.DeviceAbstractImpl;
|
|
|
@@ -30,17 +26,17 @@ import com.yunfeiyun.agmp.iots.device.domain.yfqxz.YfQxzConfigMsg;
|
|
|
import com.yunfeiyun.agmp.iots.device.domain.yfqxz.YfQxzLedMsg;
|
|
|
import com.yunfeiyun.agmp.iots.device.domain.yfqxz.YfQxzMsg;
|
|
|
import com.yunfeiyun.agmp.iots.device.domain.yfqxz.YfQxzReqMsg;
|
|
|
-import com.yunfeiyun.agmp.iots.device.service.*;
|
|
|
-import com.yunfeiyun.agmp.iots.service.impl.WarnService;
|
|
|
-
|
|
|
+import com.yunfeiyun.agmp.iots.device.service.IYfQxzDevice;
|
|
|
import com.yunfeiyun.agmp.iots.service.*;
|
|
|
+import com.yunfeiyun.agmp.iots.service.impl.WarnService;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.http.util.TextUtils;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import java.util.*;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.Objects;
|
|
|
|
|
|
|
|
|
/**
|
|
|
@@ -72,8 +68,6 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
@Autowired
|
|
|
private IotAddressService iotAddressService;
|
|
|
|
|
|
-
|
|
|
-
|
|
|
@Autowired
|
|
|
private IIotDeviceService iIotDeviceService;
|
|
|
|
|
|
@@ -161,8 +155,6 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
|
|
|
@Override
|
|
|
public Object receiveData(String topic, JSONObject dataJson,String connectionId) throws Exception {
|
|
|
-
|
|
|
-
|
|
|
if (TextUtils.isEmpty(topic)) {
|
|
|
log.error("topic: empty");
|
|
|
return false;
|
|
|
@@ -193,19 +185,23 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
}
|
|
|
String devUpdateddate = dataJson.getString("devUpdateddate");
|
|
|
if(StringUtils.isEmpty(devUpdateddate)){
|
|
|
- devUpdateddate= DateUtils.dateTimeNow();
|
|
|
+ devUpdateddate = DateUtils.dateTimeNow();
|
|
|
+ }
|
|
|
+
|
|
|
+ IotDevice iotDevice = findIotDevice(topic, dataJson, connectionId);
|
|
|
+ if (iotDevice == null) {
|
|
|
+ log.error("未取到 iotDevice");
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
if (cmd.equalsIgnoreCase("terminalData") ) {
|
|
|
- this.processData(devCode,ext, devUpdateddate);
|
|
|
- }else if (cmd.equalsIgnoreCase("status") ) {
|
|
|
- this.processStatus(ext, devUpdateddate);
|
|
|
- }else if (cmd.equalsIgnoreCase("config") ) {
|
|
|
- this.processConfig(devCode, ext);
|
|
|
- }else if (cmd.equalsIgnoreCase("online")
|
|
|
- || cmd.equalsIgnoreCase("offline")
|
|
|
- ) {
|
|
|
- this.processOffline(dataJson);
|
|
|
+ this.processData(iotDevice, ext, devUpdateddate);
|
|
|
+ } else if (cmd.equalsIgnoreCase("status") ) {
|
|
|
+ this.processStatus(iotDevice, ext, devUpdateddate);
|
|
|
+ } else if (cmd.equalsIgnoreCase("config") ) {
|
|
|
+ this.processConfig(iotDevice, ext, devUpdateddate);
|
|
|
+ } else if (cmd.equalsIgnoreCase("online") || cmd.equalsIgnoreCase("offline")) {
|
|
|
+ this.processOffline(iotDevice, dataJson, devUpdateddate);
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
@@ -251,12 +247,10 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
* @param ext
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- public void processData(String devCode, JSONObject ext, String devUpdateddate) throws Exception {
|
|
|
+ public void processData(IotDevice iotDeviceOld, JSONObject ext, String devUpdateddate) throws Exception {
|
|
|
log.info(TAG+"数据解析 processData {}", ext.toString());
|
|
|
JSONArray jarrData = ext.getJSONArray("data");
|
|
|
|
|
|
- IotDevice iotDeviceOld = findQxzsqzDevice(devCode);
|
|
|
-
|
|
|
iotDeviceOld.setDevUpdateddate(devUpdateddate);
|
|
|
iotDeviceOld.setDevStatus("1");//在线
|
|
|
// 更新设备基础信息数据库 mysql
|
|
|
@@ -272,9 +266,7 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
// 更新设备数据信息到数据库 mongodb
|
|
|
String cId = iotDeviceOld.getTid();
|
|
|
|
|
|
- for (Object item:
|
|
|
- jarrData) {
|
|
|
-
|
|
|
+ for (Object item: jarrData) {
|
|
|
JSONObject jobj = (JSONObject) item;
|
|
|
IotYfqxzdata iotYfqxzdata = new IotYfqxzdata();
|
|
|
iotYfqxzdata.setCId(cId);
|
|
|
@@ -292,16 +284,8 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
jobj.put("time",DateUtils.parseDateToStr("yyyy-MM-dd'T'HH:mm:ss.SSSX",reportDt));
|
|
|
jobj.put("devBid",iotYfqxzdata.getDevBid());
|
|
|
jobj.put("cId",iotYfqxzdata.getCId());
|
|
|
-
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- // 保存 设备最新数据 到redis
|
|
|
- /*iIotDevicelasteddataService.createOrUpdateDeviceLastedData(
|
|
|
- ext, iotDeviceOld, reportTime, 60 * 60 * 24L);*/
|
|
|
-
|
|
|
- iIotDevicelasteddataService.updateDeviceLastedData(
|
|
|
- iotDeviceOld,jarrData.toString(), devUpdateddate);
|
|
|
+ iIotDevicelasteddataService.updateDeviceLastedData(iotDeviceOld,jarrData.toString(), devUpdateddate);
|
|
|
|
|
|
//预警
|
|
|
//云飞 气象站 和 墒情站 都从这里进数据
|
|
|
@@ -327,7 +311,6 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
//预警检查 V2
|
|
|
warnService.checkSensData("0","",iotDeviceOld.getDevBid());
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -336,14 +319,12 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
* @param ext
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- public void processStatus(JSONObject ext, String devUpdateddate) throws Exception {
|
|
|
+ public void processStatus(IotDevice iotDeviceOld, JSONObject ext, String devUpdateddate) throws Exception {
|
|
|
log.info(TAG+"数据解析 processStatus {}", ext.toString());
|
|
|
|
|
|
JSONObject jobjStatus = ext.getJSONObject("terminalStatus");
|
|
|
|
|
|
- String devCode = ext.getString("StationID");
|
|
|
-
|
|
|
- IotDevice iotDeviceOld = findQxzsqzDevice(devCode);
|
|
|
+ String devCode = iotDeviceOld.getDevCode();
|
|
|
|
|
|
IotDevice iotDevice = new IotDevice();
|
|
|
iotDevice.setDevtypeBid(iotDeviceOld.getDevtypeBid());
|
|
|
@@ -359,19 +340,15 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
String lat = jobjStatus.getString("latitude");
|
|
|
|
|
|
//如果存在经纬度
|
|
|
- if(!TextUtils.isEmpty(lng)&&!TextUtils.isEmpty(lat)){
|
|
|
-
|
|
|
+ if(!TextUtils.isEmpty(lng) &&! TextUtils.isEmpty(lat)){
|
|
|
// 自动的可以更新数据库 0 手动
|
|
|
if (!Objects.equals(iotDeviceOld.getDevPositionstatus(), "0")) {
|
|
|
iotDeviceAddressService.setDeviceAddress(iotDevice, lng, lat);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
// 更新设备基础信息数据库 mysql
|
|
|
iIotDeviceService.updateIotDevice(iotDevice);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -384,24 +361,16 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
* @param ext
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- private void processConfig(String devCode, JSONObject ext) throws Exception {
|
|
|
+ private void processConfig(IotDevice iotDeviceOld, JSONObject ext, String devUpdateddate) throws Exception {
|
|
|
log.info(TAG+"数据解析 processConfig {}", ext);
|
|
|
-
|
|
|
- IotDevice iotDeviceOld = findQxzsqzDevice(devCode);
|
|
|
-
|
|
|
String devConfig = ext.toString();
|
|
|
-
|
|
|
// 创建或更新设备配置信息
|
|
|
if (StringUtils.isNotEmpty(devConfig)) {
|
|
|
- String dtNow = DateUtils.dateTimeNow();
|
|
|
- iIotDeviceconfigService.createOrUpdateDevConfig(iotDeviceOld, devConfig, dtNow);
|
|
|
+ iIotDeviceconfigService.createOrUpdateDevConfig(iotDeviceOld, devConfig, devUpdateddate);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
- public void processOffline(JSONObject dataJson) {
|
|
|
+ public void processOffline(IotDevice iotDeviceOld, JSONObject dataJson, String devUpdateddate) {
|
|
|
log.info(TAG+"处理 上线/离线 消息 {}", dataJson.toString());
|
|
|
String cmd = dataJson.getString("cmd");
|
|
|
JSONObject ext = dataJson.getJSONObject("ext");
|
|
|
@@ -413,24 +382,20 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
}
|
|
|
|
|
|
// 设备不存在 就不再处理
|
|
|
- IotDevice iotDeviceOld = findQxzsqzDevice(devCode);
|
|
|
-
|
|
|
IotDevice newIotDevice = new IotDevice();
|
|
|
newIotDevice.setDevBid(iotDeviceOld.getDevBid());
|
|
|
|
|
|
- String dtNow = DateUtils.dateTimeNow();
|
|
|
-
|
|
|
if("offline".equalsIgnoreCase(cmd)){
|
|
|
newIotDevice.setDevStatus("0");
|
|
|
- newIotDevice.setDevOfflinedate(dtNow);
|
|
|
+ newIotDevice.setDevOfflinedate(devUpdateddate);
|
|
|
}else if("online".equalsIgnoreCase(cmd)){
|
|
|
newIotDevice.setDevStatus("1");
|
|
|
|
|
|
}
|
|
|
-
|
|
|
- newIotDevice.setDevUpdateddate(dtNow);
|
|
|
+ newIotDevice.setDevUpdateddate(devUpdateddate);
|
|
|
|
|
|
iIotDeviceService.updateIotDevice(newIotDevice);
|
|
|
+ // 主动进行查询,以确保不是误判离线
|
|
|
|
|
|
new Thread(new Runnable() {
|
|
|
@Override
|
|
|
@@ -446,9 +411,6 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
|
|
|
}
|
|
|
}).start();
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -464,11 +426,8 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
|
|
|
@Override
|
|
|
public IotDevice findIotDevice(String topic, JSONObject jobjMsg,String connectionId) {
|
|
|
- String devCode = topic.substring(topic.lastIndexOf("/") + 1);
|
|
|
- //查询
|
|
|
- IotDevice ret = findQxzsqzDevice(devCode);
|
|
|
- //log.info("查到了一个iotdevice {}", ret);
|
|
|
- return ret;
|
|
|
+ String devId = mqttManager.getDevIdByTopic(connectionId,topic);
|
|
|
+ return iIotDeviceService.selectIotDeviceByDevBid(devId);
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -482,7 +441,6 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
if (TextUtils.isEmpty(topic)){
|
|
|
return "";
|
|
|
}
|
|
|
-
|
|
|
return topic.substring(topic.lastIndexOf("/")+1);
|
|
|
}
|
|
|
|
|
|
@@ -503,24 +461,5 @@ public class YfQxzDeviceImpl extends DeviceAbstractImpl implements IYfQxzDevice
|
|
|
}else{
|
|
|
log.error("mqttPublisher is null");
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- private IotDevice findQxzsqzDevice(String devCode){
|
|
|
- String firmBid = mqttManager.getFirmBizId(SERVICE_NAME);
|
|
|
-
|
|
|
- //尝试查找 气象站 里面有没有该设备;然后再查找 墒情站里有没有该设备
|
|
|
- List<String> devtypeBids = new ArrayList<>();
|
|
|
- devtypeBids.add(IotDeviceDictConst.TYPE_YF_QXZ);
|
|
|
- devtypeBids.add(IotDeviceDictConst.TYPE_YF_SQZ);
|
|
|
- devtypeBids.add(IotDeviceDictConst.TYPE_YF_GXZW);
|
|
|
-
|
|
|
- IotDevice iotDeviceOld = iIotDeviceService.getDeviceListByDevtypeBidsAndDevCode(firmBid, devCode, devtypeBids);
|
|
|
- if (iotDeviceOld == null) {
|
|
|
- log.error("找不到 【云飞 墒情站】,厂商{}设备类型{}设备编号{}",firmBid, devtypeBids, devCode);
|
|
|
- throw new IotBaseException(IotErrorCode.DB_NOT_FOUND);
|
|
|
- }
|
|
|
- return iotDeviceOld;
|
|
|
}
|
|
|
}
|