||
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from django.conf import settings
- from django.core.paginator import Paginator
- from django.forms.models import model_to_dict
- import time
- import os
- import datetime
- import uuid
- import json
- import logging
- import requests
- from smartfarming.utils import get_addr_by_lag_lng
- from smartfarming.serializers.device_serializers import DeviceSerializers
- from smartfarming.models.device import MongoDevice, MongoCBDData, MongoSCDData, MongoXYCBData
- from smartfarming.models.worm_forecast import MongoCBDphoto
- from smartfarming.models.weather import (
- MongoQXZ_Base_Info,
- MongoQXZ_Alarm_Log_New,
- MongoQXZ_Conf,
- QXZdata_New,
- MongoQXZ_Alarm,
- QXZThresholdWarning
- )
- from django.db.models import Q
- from smartfarming.qxz import data_deal
- from collections import Counter, defaultdict
- from smartfarming.models.device import MongoDevice, DevicePestWarning, MongoDeviceType
- from kedong.decoration import kedong_deco, PortError
- from django.core.paginator import Paginator
- logger = logging.getLogger("data_ingestion")
- config_dict = settings.CONFIG
- device_type_en = config_dict.get("device_type_en")
- device_type_zh = config_dict.get("device_type_zh")
- class CbdScdXyDeviceSaveAPIView(APIView):
- permission_classes = []
- authentication_classes = []
- def post(self, request):
- # 测报灯 杀虫灯 性诱 设备及数据入库
- try:
- request_data = request.body
- data = json.loads(request_data)
- topic = data.get("topic")
- payload = data.get("payload")
- cmd = payload.get("cmd")
- topic_msg = topic.split("/")
- now = int(time.time())
- logger.warning(topic_msg)
- if topic_msg and len(topic_msg) == 5 and cmd:
- device_id = topic_msg[-1]
- try:
- device_type = topic_msg[2]
- device_type_id = device_type_en.get(device_type)
- if device_type_id == 2:
- model = MongoSCDData
- logger.warning(f"杀虫灯数据入库原数据: {data}")
- elif device_type_id == 3:
- model = MongoCBDData
- logger.warning(f"测报灯数据入库原数据: {data}")
- elif device_type_id == 4:
- model = MongoXYCBData
- logger.warning(f"性诱数据入库原数据: {data}")
- # 在设备信息表中查找是否有数据,如果没有数据则增加
- device_name = device_type_zh.get(str(device_type_id))
- device, is_created = MongoDevice.objects.get_or_create(
- device_id = device_id,
- defaults={
- "device_id": device_id,
- "device_type_id": device_type_id,
- "addtime": now
- }
- )
- if is_created:
- device.device_name = device_name
- device.save()
- logger.warning(f"新设备:{device_type} {device_id} 入库成功")
- # 获取数据并更新设备
- if cmd == "data":
- ext = payload.get("ext")
- if ext:
- # 获取设备上报的时间,同步设备
- uptime = 0
- try:
- stamp = ext.get("stamp")
- uptime = int((datetime.datetime.strptime(stamp, "%Y%m%d%H%M%S")).timestamp())
- except Exception as e:
- logger.error(f"同步设备时间失败:{device_id} {e}")
- # 增加设备数据
- model.objects.create(
- device_id = device.id,
- device_data = str(ext),
- addtime = uptime if uptime else now
- )
- lng = ext.get("lng")
- lat = ext.get("lat")
- dver_num = ext.get("dver")
- device.device_status = 1
- device.uptime = uptime if uptime else now
- if dver_num:
- device.dver_num = dver_num
- if lng and lat and dver_num and device.gps != 0:
- device.lng = lng
- device.lat = lat
- # 根据经纬度获取省市级
- is_success, province, city, district = get_addr_by_lag_lng(lat, lng)
- if is_success and device.add_position == 0:
- # 更新地理位置坐标
- device.province = province
- device.city = city
- device.district = district
- device.save()
- elif cmd == "offline":
- ext = payload.get("ext")
- if ext:
- # 增加设备数据
- model.objects.create(
- device_id=device_id,
- device_data = str(ext),
- addtime=now
- )
- # 更新设备状态
- device = MongoDevice.objects.filter(device_id=device_id).first()
- device.device_status = 0
- device.uptime = now
- device.save()
- return Response({"code": 0, "msg": "设备离线"})
- return Response({"code": 0, "msg": "success"})
- except Exception as e:
- logger.error(f"测报灯设备 {device_id} 处理上报数据或增加设备失败,错误原因:{e.args}")
- return Response({"code": 2, "msg": f"处理测报灯上报数据失败 {device_id}"})
- else:
- return Response({"code": 2, "msg": "请核对数据结构"})
- except Exception as e:
- logger.error(f"测报灯、杀虫灯、性诱设备 {e.args}")
- return Response({"code": 2, "msg": "failer"})
-
- class CbdPhotoAPIView(APIView):
- permission_classes = []
- authentication_classes = []
- def post(self, request):
- try:
- request_data = request.body
- request_data = json.loads(request_data)
- logger.warning(f"测报灯图片数据入库原数据: {request_data}")
- device_id = request_data.get("imei")
- device = MongoDevice.objects.filter(device_id=device_id)
- media = config_dict.get("media")
- img_content_url = f'{config_dict.get("image_url").get("image")}/media'
- if device:
- device = device.first()
- d_id = device.id
- # 把原图下载到本地
- result_image = request_data.get("Image")
- addr = config_dict.get("oss") + result_image if not result_image.startswith("http") else result_image
- inden_path = f"{media}/cbd/{device.device_id}"
- os.makedirs(inden_path) if not os.path.exists(inden_path) else None
- stamp = int(datetime.datetime.now().timestamp())
- unique_id = uuid.uuid4()
- combined_id = str(stamp) + "-" + str(unique_id)
- addr_org = os.path.join(inden_path, f"{combined_id}.jpg")
- remote_content = requests.get(addr).content
- with open(addr_org, "wb") as f:
- f.write(remote_content)
- # 把识别后的图片下载到本地
- result_temp = request_data.get("Result_image")
- indentify = config_dict.get("oss") + result_temp if not result_temp.startswith("http") else result_temp
- inden_path_tp = f"{media}/result/cbd/{device.device_id}"
- os.makedirs(inden_path_tp) if not os.path.exists(inden_path_tp) else None
- stamp = int(datetime.datetime.now().timestamp())
- unique_id = uuid.uuid4()
- combined_id = str(stamp) + "-" + str(unique_id)
- indentify_photo = os.path.join(inden_path_tp, f"{combined_id}.jpg")
- indentify_remote_content = requests.get(indentify).content
- with open(indentify_photo, "wb") as f:
- f.write(indentify_remote_content)
- photo_time = request_data.get("photo_time")
- data = {
- "device_id": d_id,
- "addr": addr_org.replace(media, "/media"),
- "indentify_photo":indentify_photo.replace(media, img_content_url),
- "indentify_result": request_data.get("Result"),
- "label": request_data.get("Result_code"),
- "photo_status": 1,
- "uptime": int(photo_time),
- "addtime": int(photo_time)
- }
- photo = MongoCBDphoto(**data)
- photo.save()
- return Response({"code": 0, "msg": "success"})
- else:
- return Response({"code": 2, "msg": "该项目不存在此设备"})
- except Exception as e:
- logger.error(f"测报灯图片入库失败 {e.args}")
- return Response({"code": 2, "msg": "failer"})
-
-
- class QxzDeviceAddAPIViw(APIView):
- permission_classes = []
- authentication_classes = []
- def post(self, request):
- # 气象站上传数据
- try:
- request_data = request.body
- request_data = json.loads(request_data)
- logger.error(f"气象站源数据:{request_data}")
- device_id = request_data.get("StationID", "")
- uptime = request_data.get("MonitorTime")
- data = request_data.get("data")
- terminalStatus = request_data.get("terminalStatus")
- cmd = request_data.get("cmd")
- if uptime:
- up = uptime.replace(" ", "")
- uptime_tp = int((datetime.datetime.strptime(up, "%Y-%m-%d%H:%M:%S")).timestamp())
- else:
- uptime_tp = int(time.time())
- if device_id:
- # 获取该设备的预警配置数据
- alarm = MongoQXZ_Alarm.objects.filter(device_id=device_id)
- qxz_e_conf = MongoQXZ_Conf.objects.filter(device_id=device_id)
- qxz_e_conf = qxz_e_conf.first() if qxz_e_conf else None
- mongo_device = MongoDevice.objects.filter(device_id=device_id)
- if mongo_device:
- mongo_device = mongo_device.first()
- if not qxz_e_conf:
- logger.error(f"该设备未配置预警阀值: {mongo_device.device_id}")
- return Response({"code": 2, "msg": "该设备未配置预警阀值"})
- if data:
- qxz_e_conf = model_to_dict(qxz_e_conf)
- qx_ek = {}
- result_tp_fin = ""
- for i in data:
- tp_value = i.get("eValue")
- if tp_value:
- ek = i.get("eKey")
- qx_ek[ek] = f"{tp_value}#{i.get('eNum')}#{ek}"
- if alarm:
- # 存在预警配置文件, 先查看预警配置文件中是否有配置 -- "0#6" 表示大于6则报警 "1#5" 表示小于5报警 "0#" 表示不配置
- alarms = alarm.first()
- alarm_config = eval(alarms.conf)
- dat = alarm_config.get("dat")
- for m, n in dat.items():
- n_sp = n.split("#")
- if n_sp[1]:
- if ek == m:
- # 查询具体含义
- zh = qxz_e_conf.get(m)
- zh_k = zh.split("#")
- result = ""
- if n_sp[0] == "1":
- if float(tp_value) > float(n_sp[1]):
- # 组织预警信息
- result = f"为{tp_value},大于{n_sp[1]}"
- elif n_sp[0] == "0":
- if float(tp_value) < float(n_sp[1]):
- result = f"为{tp_value},小于{n_sp[1]}"
- if result:
- QXZThresholdWarning.objects.create(
- device_id=device_id,
- warning_content= "大于预警" if n_sp[0] == "1" else "小于预警",
- ekey=zh,
- set_value=n_sp[1],
- current_value=tp_value,
- upltime=uptime_tp
- )
- result_tp = f"{zh_k[0]}{result}{zh_k[1]},"
- result_tp_fin += result_tp
- if result_tp_fin:
- alarm_new = MongoQXZ_Alarm_Log_New()
- alarm_new.warning_content = result_tp_fin
- alarm_new.upl_time = uptime_tp
- alarm_new.device_id = mongo_device.id
- alarm_new.warning_name = str(mongo_device.device_type_id) # 设备类型ID
- alarm_new.save()
- logger.error(f"产生预警:{device_id}")
- # 30分钟上报一次的数据
- qx_ek["device_id"] = device_id
- qx_ek["uptime"] = uptime_tp
- qxz_data = QXZdata_New(**qx_ek)
- qxz_data.save()
- mongo_device.uptime=uptime_tp
- mongo_device.device_status=1
- mongo_device.save()
- return Response({"code": 0, "msg": "success"})
- if terminalStatus:
- base_info_obj, is_created = MongoQXZ_Base_Info.objects.update_or_create(
- device_id=device_id,
- defaults={
- "volt": terminalStatus.get("VOLT"),
- "rssi": terminalStatus.get("RSSI"),
- "uptime": uptime_tp
- }
- )
- iccid = terminalStatus.get("ICCID")
- lng = terminalStatus.get("longitude")
- lat = terminalStatus.get("latitude")
- led = terminalStatus.get("Dotled")
- dver = terminalStatus.get("Version")
- device, is_created = MongoDevice.objects.get_or_create(device_id=device_id)
- if iccid:
- base_info_obj.iccid = iccid
- if lng:
- base_info_obj.lng = lng
- device.lng = lng
- if lat:
- base_info_obj.lat = lat
- device.lat = lat
- if led:
- base_info_obj.led = led
- if dver:
- base_info_obj.dver = dver
- # 如果经纬度均存在
- if lat and lng:
- is_success, province, city, district = get_addr_by_lag_lng(lat, lng)
- if is_success:
- device.province = province
- device.city = city
- device.district = district
- base_info_obj.save()
- device.uptime = uptime_tp
- device.save()
- return Response({"code": 0, "msg": "success"})
- if cmd:
- ext = request_data.get("ext")
- imei = ext.get("imei")
- device_info = MongoDevice.objects.get(device_id=imei)
- if cmd == "online":
- device_info.device_status = 1
- if cmd == "offline":
- device_info.device_status = 0
- device_info.uptime = uptime_tp
- device_info.save()
- except Exception as e:
- logger.error(f"气象站设备 {device_id} 处理上报数据或增加设备失败,错误原因:{e.args}")
- return Response({"code": 2, "msg": "failer"})
- class DeviceListAPIView(APIView):
- def post(self, request):
- # 设备列表
- request_data = request.data
- device_id = request_data.get("device_id")
- device_status = request_data.get("device_status")
- search = request_data.get("search")
- page_num = int(request_data.get("pagenum")) if request_data.get("pagenum") else 1
- page_size = int(request_data.get("pagesize")) if request_data.get("pagesize") else 10
- if device_id:
- queryset = MongoDevice.objects.filter(device_id=device_id).order_by("-uptime")
- elif device_status:
- queryset = MongoDevice.objects.filter(device_status=device_status).order_by("-uptime")
- elif search:
- queryset = MongoDevice.objects.filter(Q(device_name__icontains=search) | Q(device_id__icontains=search))
- else:
- queryset = MongoDevice.objects.all().order_by("-uptime")
- total_obj = queryset.count()
- paginator = Paginator(queryset, page_size)
- page_obj = paginator.get_page(page_num)
- serializers = DeviceSerializers(page_obj, many=True)
- return Response({"code": 0, "msg": "success", "data": serializers.data, "count": total_obj})
- class DeviceChangeAPIView(APIView):
- def post(self, request):
- # 修改设备信息
- request_data = request.data
- device_name = request_data.get("device_name")
- device_id = request_data.get("device_id")
- lng = request_data.get("lng")
- lat = request_data.get("lat")
- device = MongoDevice.objects.get(device_id=device_id)
- if device_name:
- device.device_name = device_name
- if lng and lat:
- device.lng = lng
- device.lat = lat
- is_success, province, city, district = get_addr_by_lag_lng(lat, lng)
- if is_success:
- # 更新地理位置坐标
- device.province = province
- device.city = city
- device.district = district
- device.save()
- return Response({"code": 0, "msg": "success"})
-
- class DeviceListInfoAPIView(APIView):
- def post(self, request):
- # 设备列表
- request_data = request.data
- device_id = request_data.get("device_id")
- device_status = request_data.get("device_status")
- search = request_data.get("search")
- page_num = int(request_data.get("pagenum")) if request_data.get("pagenum") else 1
- page_size = int(request_data.get("pagesize")) if request_data.get("pagesize") else 10
- if device_id:
- queryset = MongoDevice.objects.filter(device_id=device_id).order_by("-uptime")
- elif device_status:
- queryset = MongoDevice.objects.filter(device_status=device_status).order_by("-uptime")
- elif search:
- queryset = MongoDevice.objects.filter(Q(device_name__icontains=search) | Q(device_id__icontains=search))
- else:
- queryset = MongoDevice.objects.all().order_by("-uptime")
- total_obj = queryset.count()
- paginator = Paginator(queryset, page_size)
- page_obj = paginator.get_page(page_num)
- serializers = DeviceSerializers(page_obj, many=True)
- return Response({"code": 0, "msg": "success", "data": serializers.data, "count": total_obj})
- class DeviceListAPIView(APIView):
- def post(self, request):
- queryset = MongoDevice.objects.order_by('-id')
- type_dict = {d.id: d.type_name for d in MongoDeviceType.objects.all()}
- result = []
- offline_list = []
- type_counter = Counter()
- device_dict = defaultdict(list)
- for item in queryset:
- device_info = item
- device_type_id = device_info.device_type_id
- device_status = device_info.device_status
- is_offline = False if device_status == 1 else True
- device_id = device_info.device_id
- if is_offline:
- offline_list.append(device_id)
- type_counter[device_type_id] += 1
- device_dict[device_type_id].append(device_id)
- coordinates = ""
- lng = device_info.lng
- lat = device_info.lat
- if not (lng and lat):
- continue
- coordinates = f"[{str(float(lng))},{str(float(lat))}]"
- device_name = device_info.device_name
- result.append({
- 'ld_id': item.id,
- 'device_id': device_id,
- 'device_name': device_name or device_id,
- 'tpye_name': type_dict[device_info.device_type_id],
- 'device_type_id': device_type_id,
- 'coordinates': coordinates,
- 'offline': False if device_status == 1 else True,
- 'is_warning': False
- })
- statistic_list = []
- for k, v in type_counter.items():
- statistic_list.append({
- 'type_id': k,
- 'type_count': v,
- 'type_name': type_dict[k]
- })
- warning_model_dict = {
- 3: DevicePestWarning
- }
- warning_list = []
- for k, v in device_dict.items():
- try:
- model_obj = warning_model_dict[k]
- except KeyError as e:
- continue
- war_list = [d.device_id for d in model_obj.objects.filter(device_id__in=v, status=0)]
- if war_list:
- warning_list.extend(war_list)
- for item in result:
- device_id = item['device_id']
- if device_id in warning_list:
- item['is_warning'] = True
- warn_and_off = set(warning_list) & set(offline_list)
- data = {
- "statistic": statistic_list,
- 'offline': {
- 'count': len(offline_list),
- 'result': offline_list
- },
- "waring": {
- 'count': len(warning_list),
- 'result': warning_list
- },
- "warn_and_off": {
- 'count': len(warn_and_off),
- 'result': warning_list
- },
- "data": result
- }
- return Response({"code": 0, "message": "success", "data": data})
|