|
|
@@ -0,0 +1,322 @@
|
|
|
+package com.yunfeiyun.agmp.iots.task;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSONArray;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.yunfeiyun.agmp.common.utils.DateUtils;
|
|
|
+import com.yunfeiyun.agmp.common.utils.uuid.IdUtils;
|
|
|
+import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
|
|
|
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
|
|
|
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqEnvData;
|
|
|
+import com.yunfeiyun.agmp.iot.common.domain.IotYbqPredictData;
|
|
|
+import com.yunfeiyun.agmp.iot.common.enums.ybq.YbqTypeConst;
|
|
|
+import com.yunfeiyun.agmp.iots.core.manager.HttpManager;
|
|
|
+import com.yunfeiyun.agmp.iots.device.service.IotYbqEnvDataService;
|
|
|
+import com.yunfeiyun.agmp.iots.device.service.IotYbqPredictDataService;
|
|
|
+import com.yunfeiyun.agmp.iots.device.serviceImp.YbqCmbService;
|
|
|
+import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
|
|
|
+import com.yunfeiyun.agmp.iots.service.IIotDevicelasteddataService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 作物xx虫子检测设备调度
|
|
|
+ * 比如:小麦赤霉病、玉米大斑病、小麦条锈病、小麦白粉病
|
|
|
+ *
|
|
|
+ * @author zhangn
|
|
|
+ * @see YbqTypeConst
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class YbqScheduler {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private HttpManager httpManager;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IIotDeviceService iIotDeviceService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IotYbqPredictDataService iotYbqPredictDataService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IotYbqEnvDataService iotYbqEnvDataService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IIotDevicelasteddataService iIotDevicelasteddataService;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 需要同步的类型(已经对接的)
|
|
|
+ */
|
|
|
+ YbqTypeConst[] ybqTypeConsts = {
|
|
|
+ YbqTypeConst.YBQ_XM_CMB,
|
|
|
+ YbqTypeConst.YBQ_SD_DWB,
|
|
|
+ YbqTypeConst.YBQ_YM_DBB,
|
|
|
+ };
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步预测数据
|
|
|
+ * 步骤:
|
|
|
+ * 1. 遍历类型查询厂家设备
|
|
|
+ * 2. 根据设备查询预测数据,入库
|
|
|
+ * 每十分钟:0 0/10 0/1 * * ?
|
|
|
+ * 每1小时:0 0 0/1 * * ?
|
|
|
+ * 每天12点:0 0 8 * * ?
|
|
|
+ * 每天8点:10分 0 10 8 * * ?
|
|
|
+ * 对方:每天8点更新
|
|
|
+ * 我们:每天八点半更新
|
|
|
+ */
|
|
|
+ @Scheduled(cron = "0 30 8 * * ?")
|
|
|
+ public void synPredictedData() {
|
|
|
+ String startDate = DateUtils.dateTime();
|
|
|
+ log.info("【开始执行】同步预测数据任务,当前时间: {}", startDate);
|
|
|
+ for (YbqTypeConst ybqTypeConst : ybqTypeConsts) {
|
|
|
+ try {
|
|
|
+ log.debug("正在处理设备类型: {}", ybqTypeConst.getMessage());
|
|
|
+ // 同步赤霉病设备预测数据
|
|
|
+ List<IotDevice> iotDevices = getDevListByServiceName(ybqTypeConst.getServiceName());
|
|
|
+ for (IotDevice iotDevice : iotDevices) {
|
|
|
+ delaYbqPredictedData(iotDevice, ybqTypeConst, startDate, startDate);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理类型 {} 的设备预测数据时发生异常: ", ybqTypeConst.getMessage(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ log.info("【完成】同步预测数据任务执行完毕");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据设备处理预测数据
|
|
|
+ *
|
|
|
+ * @param iotDevice
|
|
|
+ * @param ybqTypeConst
|
|
|
+ * @param startDate
|
|
|
+ * @param endTime
|
|
|
+ */
|
|
|
+ public void delaYbqPredictedData(IotDevice iotDevice, YbqTypeConst ybqTypeConst, String startDate, String endTime) {
|
|
|
+ try {
|
|
|
+ String code = iotDevice.getDevCode();
|
|
|
+ String devBid = iotDevice.getDevBid();
|
|
|
+ JSONArray jsonArrayDevs = ((YbqCmbService)httpManager.getHttpClientByDevice(iotDevice)).getPredictedData( code, startDate, endTime,ybqTypeConst.getCode());
|
|
|
+ if (jsonArrayDevs == null || jsonArrayDevs.size() == 0) {
|
|
|
+ log.error("【预测数据同步】【{}}】暂无设备数据,无需导入", ybqTypeConst.getMessage());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int count = jsonArrayDevs.size();
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ try {
|
|
|
+ JSONObject jobj = jsonArrayDevs.getJSONObject(i);
|
|
|
+ if (jobj != null) {
|
|
|
+ // 得到第三方返回的设备的预测数据
|
|
|
+ String computeDate = jobj.getString("computeDate");
|
|
|
+ String deviceId = jobj.getString("deviceId");
|
|
|
+ String value = jobj.getString("value");
|
|
|
+ //保存预测记录、更新预测最新数据
|
|
|
+ // 查查当天的有没有,有的话更新,没有的话添加
|
|
|
+ IotYbqPredictData iotYbqPredictDataToday = iotYbqPredictDataService.getTodayData(devBid);
|
|
|
+ if (iotYbqPredictDataToday == null) {
|
|
|
+ IotYbqPredictData iotYbqPredictData = new IotYbqPredictData();
|
|
|
+ iotYbqPredictData.setId(IdUtils.fastUUID());
|
|
|
+ iotYbqPredictData.setYbqdataBid(iotYbqPredictData.getId());
|
|
|
+ iotYbqPredictData.setTid(iotDevice.getTid());
|
|
|
+ iotYbqPredictData.setComputeDate(computeDate);
|
|
|
+ iotYbqPredictData.setDateDevType(ybqTypeConst.getCode());
|
|
|
+ iotYbqPredictData.setValue(value);
|
|
|
+ iotYbqPredictData.setYbqdataContent(jobj);
|
|
|
+ iotYbqPredictData.setDeviceId(deviceId);
|
|
|
+ iotYbqPredictData.setDevBid(devBid);
|
|
|
+ iotYbqPredictData.setDevTypeBid(iotDevice.getDevtypeBid());
|
|
|
+ iotYbqPredictData.setYbqdataModifiedDate(DateUtils.dateTimeNow());
|
|
|
+ iotYbqPredictData.setYbqdataCreatedDate(DateUtils.dateTimeNow());
|
|
|
+ iotYbqPredictDataService.insertData(iotYbqPredictData);
|
|
|
+ } else {
|
|
|
+ iotYbqPredictDataToday.setComputeDate(computeDate);
|
|
|
+ iotYbqPredictDataToday.setValue(value);
|
|
|
+ iotYbqPredictDataToday.setYbqdataModifiedDate(DateUtils.dateTimeNow());
|
|
|
+ // 只更新特定的字段
|
|
|
+ iotYbqPredictDataService.updateData(iotYbqPredictDataToday);
|
|
|
+ }
|
|
|
+ // 同步更新设备信息
|
|
|
+ String extInfo = iotDevice.getExtInfo() == null ? "{}" : iotDevice.getExtInfo();
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(extInfo);
|
|
|
+ //预测时间
|
|
|
+ jsonObject.put("computeDate", DateUtils.dateTimeNow());
|
|
|
+ //预测value
|
|
|
+ jsonObject.put("computeValue", value);
|
|
|
+ iIotDeviceService.updateIotDeviceExtInfo(devBid, jsonObject.toString());
|
|
|
+ log.debug("设备ID={} 的预测数据处理成功,computeDate={}, value={}", iotDevice.getDevBid(), computeDate, value);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理设备ID={} 的预测数据时出错: ", iotDevice.getDevBid(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理类型 {} 的设备预测数据时发生异常: ", ybqTypeConst.getMessage(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步上报数据
|
|
|
+ * 测试:每五分钟:0 0/5 0/1 * * ?
|
|
|
+ * 对方:每2个小时 13、15
|
|
|
+ * 2024-06-05 12:10:00
|
|
|
+ * 2024-06-05 13:10:00
|
|
|
+ * 2024-06-05 14:10:00
|
|
|
+ * 2024-06-05 15:10:00
|
|
|
+ * 2024-06-05 16:10:00
|
|
|
+ * 延迟10分钟再拉取
|
|
|
+ * 我们:从10分开始,每一个小时拉取一次
|
|
|
+ */
|
|
|
+ @Scheduled(cron = "0 10/59 0/1 * * ? ")
|
|
|
+ public void synReportData() {
|
|
|
+ String startTime = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, DateUtils.addMinutes(new Date(), -20));
|
|
|
+ String endTime = DateUtils.dateTimeNow();
|
|
|
+ log.info("【开始】同步上报数据任务执行于 {}", endTime);
|
|
|
+ for (YbqTypeConst ybqTypeConst : ybqTypeConsts) {
|
|
|
+ try {
|
|
|
+ log.debug("正在处理设备类型: {}", ybqTypeConst.getMessage());
|
|
|
+ // 同步各类设备预测数据
|
|
|
+ List<IotDevice> iotDevices = getDevListByServiceName(ybqTypeConst.getServiceName());
|
|
|
+ for (IotDevice iotDevice : iotDevices) {
|
|
|
+ dealEnvDataByDevice(iotDevice, ybqTypeConst, startTime, endTime);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理类型 {} 的设备预测数据时发生异常: ", ybqTypeConst.getMessage(), e);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 单独处理设备数据,为了与主动获取更新复用代码
|
|
|
+ *
|
|
|
+ * @param iotDevice
|
|
|
+ * @param ybqTypeConst
|
|
|
+ * @param startTime
|
|
|
+ * @param endTime
|
|
|
+ */
|
|
|
+ void dealEnvDataByDevice(IotDevice iotDevice, YbqTypeConst ybqTypeConst, String startTime, String endTime) {
|
|
|
+ try {
|
|
|
+ String code = iotDevice.getDevCode();
|
|
|
+ String devBid = iotDevice.getDevBid();
|
|
|
+ // 临时时间写成过去的,把数据拉起过来
|
|
|
+ JSONArray jsonArrayDevs = ((YbqCmbService)httpManager.getHttpClientByDevice(iotDevice)).getReportData( code, startTime, endTime);
|
|
|
+ if (jsonArrayDevs == null || jsonArrayDevs.size() == 0) {
|
|
|
+ log.error("【上报数据同步】【{}}】暂无设备数据,无需导入", ybqTypeConst.getMessage());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int count = jsonArrayDevs.size();
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
+ try {
|
|
|
+ JSONObject jobj = jsonArrayDevs.getJSONObject(i);
|
|
|
+ if (jobj != null) {
|
|
|
+ // 得到第三方返回的设备的预测数据
|
|
|
+ IotYbqEnvData iotYbqEnvData = jobj.to(IotYbqEnvData.class);
|
|
|
+ String collectTime = jobj.getString("collectTime");
|
|
|
+ iotYbqEnvData.setId(IdUtils.fastUUID());
|
|
|
+ iotYbqEnvData.setTid(iotDevice.getTid());
|
|
|
+ iotYbqEnvData.setYbqdataBid(iotYbqEnvData.getId());
|
|
|
+ iotYbqEnvData.setDevBid(devBid);
|
|
|
+ iotYbqEnvData.setDateDevType(ybqTypeConst.getCode());
|
|
|
+ iotYbqEnvData.setYbqdataContent(jobj);
|
|
|
+ //iotYbqEnvData.setYbqdataCreatedDate(DateUtils.dateTimeNow());
|
|
|
+ iotYbqEnvData.setYbqdataCreatedDate(collectTime);
|
|
|
+ iotYbqEnvDataService.insertData(iotYbqEnvData);
|
|
|
+ // 同步更新设备信息
|
|
|
+ String extInfo = iotDevice.getExtInfo() == null ? "{}" : iotDevice.getExtInfo();
|
|
|
+ JSONObject jsonObject = JSONObject.parseObject(extInfo);
|
|
|
+ //上报时间
|
|
|
+ jsonObject.put("collectTime", DateUtils.dateTimeNow());
|
|
|
+ iIotDeviceService.updateIotDeviceExtInfo(devBid, jsonObject.toString());
|
|
|
+ log.info("设备ID: {}, 收集时间: {} 的数据已成功插入数据库", devBid, collectTime);
|
|
|
+ // 保存 设备最新数据 到redis
|
|
|
+ iIotDevicelasteddataService.createOrUpdateDeviceLastedData(jobj, iotDevice, DateUtils.dateTimeNow(), 60 * 60 * 24L);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("{}", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理类型 {} 的设备上报数据时发生异常: ", ybqTypeConst.getMessage(), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 单独获取设备的上报数据,用于第一次添加后激活
|
|
|
+ * 查询130分钟内,只要有数据就认为在线
|
|
|
+ *
|
|
|
+ * @param iotDevice 设备
|
|
|
+ */
|
|
|
+ public void deviceCreateHandle(IotDevice iotDevice) {
|
|
|
+ if (iotDevice == null) {
|
|
|
+ log.warn("设备信息为空,未找到对应设备信息,无法执行激活流程");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String devId = iotDevice.getDevBid();
|
|
|
+
|
|
|
+ log.info("【开始】单独获取设备ID [{}] 的上报数据,用于首次激活", devId);
|
|
|
+ try {
|
|
|
+
|
|
|
+
|
|
|
+ String startTime = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, DateUtils.addMinutes(new Date(), -600 * 20));
|
|
|
+ String endTime = DateUtils.dateTimeNow();
|
|
|
+
|
|
|
+ //预测数据的时间,20200101格式
|
|
|
+ String startDate = DateUtils.dateTime();
|
|
|
+
|
|
|
+ // 处理赤霉病
|
|
|
+ if (IotDeviceDictConst.TYPE_HS_YBQ_CMB.equals(iotDevice.getDevtypeBid())) {
|
|
|
+ log.info("处理设备ID [{}] 类型 [{}] 的监测环境数据和预测数据更新", devId, IotDeviceDictConst.TYPE_HS_YBQ_CMB);
|
|
|
+ // 更新监测环境数据
|
|
|
+ dealEnvDataByDevice(iotDevice, YbqTypeConst.YBQ_XM_CMB, startTime, endTime);
|
|
|
+ // 更新预测数据
|
|
|
+ delaYbqPredictedData(iotDevice, YbqTypeConst.YBQ_XM_CMB, startDate, startDate);
|
|
|
+ }
|
|
|
+ // 处理稻瘟病
|
|
|
+ else if (IotDeviceDictConst.TYPE_HS_YBQ_DWB.equals(iotDevice.getDevtypeBid())) {
|
|
|
+ log.info("处理设备ID [{}] 类型 [{}] 的监测环境数据和预测数据更新", devId, IotDeviceDictConst.TYPE_HS_YBQ_DWB);
|
|
|
+ // 更新检测环境数据
|
|
|
+ dealEnvDataByDevice(iotDevice, YbqTypeConst.YBQ_SD_DWB, startTime, endTime);
|
|
|
+ // 更新预测数据
|
|
|
+ delaYbqPredictedData(iotDevice, YbqTypeConst.YBQ_SD_DWB, startDate, startDate);
|
|
|
+ } // 处理玉米大斑病
|
|
|
+ else if (IotDeviceDictConst.TYPE_HS_YBQ_DBB.equals(iotDevice.getDevtypeBid())) {
|
|
|
+ log.info("处理设备ID [{}] 类型 [{}] 的监测环境数据和预测数据更新", devId, IotDeviceDictConst.TYPE_HS_YBQ_DBB);
|
|
|
+ // 更新检测环境数据
|
|
|
+ dealEnvDataByDevice(iotDevice, YbqTypeConst.YBQ_YM_DBB, startTime, endTime);
|
|
|
+ // 更新预测数据
|
|
|
+ delaYbqPredictedData(iotDevice, YbqTypeConst.YBQ_YM_DBB, startDate, startDate);
|
|
|
+ } else {
|
|
|
+ log.warn("设备ID [{}] 的设备类型未知,无法识别处理", devId);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理设备ID [{}] 上报数据时发生异常", devId, e);
|
|
|
+ }
|
|
|
+ log.info("【完成】设备ID [{}] 的上报数据处理完毕", devId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据服务类查询设备
|
|
|
+ *
|
|
|
+ * @param serviceName 服务名称
|
|
|
+ * @return 设备列表
|
|
|
+ */
|
|
|
+ public List<IotDevice> getDevListByServiceName(String serviceName) {
|
|
|
+ log.debug("开始根据服务名称 [{}] 查询设备", serviceName);
|
|
|
+ List<IotDevice> iotDevices = iIotDeviceService.selectAllDeviceByDeviceServiceName(serviceName);
|
|
|
+ log.debug("根据服务名称 [{}] 查询到 {} 台设备", serviceName, iotDevices.size());
|
|
|
+ return iotDevices;
|
|
|
+ }
|
|
|
+}
|