from cgi import print_arguments from unittest import result from PyQt5 import QtCore, QtGui, QtWidgets import os import sys import re import ast from urllib import parse import json import time import datetime import requests import uuid import pymongo import pymysql from collections import defaultdict, Counter from xlrd import open_workbook from xlsxwriter.workbook import Workbook import openpyxl save_filename = "" pwd_str = "yf6021" toji_format = { 'font_name' : '宋体', 'font_size': 14, 'font_color': 'black', 'text_wrap': True, 'bold': False, 'fg_color': '92D050', 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } title_format = { 'font_name' : '宋体', 'font_size': 12, 'bold': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } merge_title_format = { 'font_name' : '宋体', 'font_size': 26, 'bold': True, 'align': 'center', 'valign': 'vcenter', "fg_color": "8DB4E2", 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } explain_formal = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', 'text_wrap': True, 'align': 'justify', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } formal_format = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', 'fg_color': '77E88C', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } common_format = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', "fg_color": 'E7EC73', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } error_format = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', "fg_color": 'F4746A', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } default_formal = { 'font_name': '宋体', 'font_size': 11, 'font_color': 'black', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } class SCDThread(object): """涉及进度条需主界面动态执行GUI,线程执行业务逻辑,避免主页面卡死""" def __init__(self,device_list,save_path,start_time,end_time,set_plat,set_stm8vs,set_order): self.device_list = device_list self.save_path = save_path self.start_time = start_time self.end_time = end_time self.start_time_str = start_time.strftime("%y-%m-%d %H:%M:%S") self.end_time_str = end_time.strftime("%y-%m-%d %H:%M:%S") self.set_plat = set_plat self.set_stm8vs = set_stm8vs self.set_order = set_order self.user = parse.quote_plus("root") self.passwd = parse.quote_plus("yfkj@6020") self.myclient = pymongo.MongoClient("mongodb://{0}:{1}@8.136.98.49:57017/".format(self.user,self.passwd)) self.db = self.myclient.smartfarming self.device_collection = self.db.sa_device self.qxz_collection = self.db.sa_device_qxz_data self.config = { 'host': '120.27.222.26', 'port': 3306, 'user': 'yfwlw', 'password': 'sql_yfkj_6019', 'db': 'yfwlw', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor, } self.connection = pymysql.connect(**self.config) self.cursor = self.connection.cursor() self.mongo_ping() def mongo_ping(self): """mongo-ping预防连接失效""" # try: # self.myclient.admin.command('ping') # except: # "ConnectionFailure" self.myclient = pymongo.MongoClient("mongodb://{0}:{1}@8.136.98.49:57017/".format(self.user,self.passwd)) self.db = self.myclient.smartfarming self.device_collection = self.db.sa_device self.qxz_collection = self.db.sa_qxz_data self.qxz_info_record_collection = self.db.sa_qxz_info_record self.qxz_base_info_collection = self.db.sa_qxz_base_info # self.sa_qxz_conf = self.db. def sql_ping(self): """mysql-ping 预防连接失效""" try: self.connection.ping() except: self.connection = pymysql.connect(**self.config) self.cursor = self.connection.cursor() def __time_dif(self,checkdatetime): """计算时间差""" nowdatetime = datetime.datetime.now() checkdatetime = datetime.datetime.strptime(checkdatetime, "%Y-%m-%d %H:%M:%S") timedif = checkdatetime - nowdatetime return timedif.days def device_their_platform(self,shortId): """确定设备所在平台以及完整设备号""" self.mongo_ping() self.sql_ping() regex = re.compile('.*{}$'.format(shortId)) bd_device_dict = self.device_collection.find_one( filter = {"device_id":regex,"device_type_id":5}, projection = {'_id': 0}, sort = [('uptime', pymongo.DESCENDING)] ) device_sql = "SELECT * FROM AppInfoManage_qxzstatus_new WHERE equip_id_id LIKE '%{}' ORDER BY upl_time DESC LIMIT 1;".format(shortId) self.cursor.execute(device_sql) sq_device_dict = self.cursor.fetchone() if bd_device_dict and sq_device_dict: bd_upltime = bd_device_dict["uptime"] sq_upltime = time.mktime(sq_device_dict["upl_time"].timetuple()) if bd_upltime >= sq_upltime: d_id = bd_device_dict["id"] deviceId = bd_device_dict["device_id"] platform = "大数据平台" else: d_id = "" deviceId = sq_device_dict["equip_id_id"] platform = "四情平台" elif bd_device_dict and not sq_device_dict: d_id = bd_device_dict["id"] deviceId = bd_device_dict["device_id"] platform = "大数据平台" return d_id,deviceId,platform elif not bd_device_dict and sq_device_dict: d_id = "" deviceId = sq_device_dict["equip_id_id"] platform = "四情平台" else: d_id = "" deviceId = "平台无此设备" platform = "未知" print("deviceId:",deviceId) return d_id,deviceId,platform def sim_info(self, iccid): """查询卡信息""" url = "http://8.136.98.49:10001/iotcard/platsimview/inquiries/" try: response = requests.request("POST", url, data={"iccid": iccid}) except: return [0, "查询卡信息异常稍后重试"] else: res_data = json.loads(response.text) if res_data["msg"] == "success" and res_data["data"]: expiry_date = res_data["data"]["expiry_date"] if expiry_date == "未知": return [0, "未查询到卡信息"] else: time_difference = self.__time_dif(expiry_date) if time_difference >= 180: return [1, "有效期剩余{}天".format(time_difference)] elif 30 <= time_difference < 180: return [2, "有效期剩余{}天".format(time_difference)] else: return [0, "有效期剩余{}天".format(time_difference)] else: return [0, "查询无结果"] def _get_data_new(self): """获取最新设备数据信息""" pipeline = [ { "$match": { "device_id": {"$in": self.device_list} } }, { "$group": { "_id": {"device_id": "$device_id"}, "last_msg": {"$last": "$$ROOT"} } }, { "$replaceRoot": {"newRoot": "$last_msg"} } ] cursor = self.db.sa_qxz_data.aggregate(pipeline, allowDiskUse=True) data_dict = {item['device_id']: item for item in cursor} return data_dict def _get_bigdata_qxz_conf(self): data_dict = self._get_data_new() cursor = self.db.sa_qxz_conf.find({"device_id": {"$in": self.device_list}}) title_name_set = set() qxz_conf_dict = {} for item in cursor: device_id = item['device_id'] qxz_data = data_dict[device_id] config_dict = {} counter = Counter() for k, v in item.items(): if k.startswith('e') and v and qxz_data.get(k, None): t_name = v.split('#')[0] t_index = counter.get(t_name, "") config_dict[k] = f"{t_name}#{t_index}" counter[t_name] += 1 n_list = list(config_dict.values()) title_name_set.update(n_list) qxz_conf_dict[device_id] = config_dict title_name_list = sorted(list(title_name_set)) for i in title_name_list: print(i) for k, v in qxz_conf_dict.items(): print(k, v) return title_name_list, qxz_conf_dict def get_qxz_element_data(self, device_id, conf_dict): group_dict = { "_id": "$device_id", "total_count": {"$count": {}}, "uptime_list": {"$push": "$uptime"} } project_dict = { "_id": 0, "device_id": "$_id", "total_count": "$total_count", "uptime_info": { "$function": { "args": ["$uptime_list"], "lang": "js", "body": """ function(uptime_list){ let old_uptime = uptime_list[0]; let uptime_20 = 0; let uptime_40 = 0; for (let i = 1; i < uptime_list.length; i++) { let uptime = uptime_list[i]; let diff_time = uptime - old_uptime; if (diff_time < 1200){ uptime_20 += 1; } else if (diff_time > 2400){ uptime_40 += 1; } old_uptime = uptime; } return {"uptime_20": uptime_20, "uptime_40": uptime_40} } """ } } } for ek in conf_dict.keys(): group_dict[ek] = {"$push": f"${ek}"} try: k = conf_dict[ek] func_name = self.get_conf_key(k) project_dict[ek] = self.cond.func_dict[func_name](ek) except KeyError as e: continue pipeline = [ { "$match": { "device_id": device_id, "uptime": { "$gte": self.start_time, "$lt": self.end_time } } }, { "$sort": {"uptime": 1} }, { "$project": { "device_id": "$device_id", "uptime": "$uptime", "tmp_list": { "$filter": { "input": {"$objectToArray": "$$ROOT"}, "as": "item", "cond": { "$and": [ { "$regexMatch": { "input": "$$item.k", "regex": "^e\d+", "options": "i" } }, { "$regexMatch": { "input": "$$item.v", "regex": ".*#.*", "options": "i" } } ] } } } } }, { "$addFields": { "tmp_obj": { "$arrayToObject": { "$map": { "input": "$tmp_list", "as": "item", "in": { "k": "$$item.k", "v": { "$toInt": { "$toDouble": {"$arrayElemAt": [{"$split": ["$$item.v", "#"]}, 0]} } } } } } } } }, { "$replaceRoot": { "newRoot": { "$mergeObjects": ["$tmp_obj", {"device_id": "$device_id", "uptime": "$uptime"}] } } }, { "$group": group_dict }, { "$project": project_dict } ] cursor = self.db.sa_qxz_data.aggregate(pipeline, allowDiskUse=True) result = {k: v for item in cursor for k, v in item.items()} return result def get_qxz_volt_or_rssi_data(self): pipeline = [ { "$match": { "device_id": { "$in": self.device_list } }, }, { "$project": { "device_id": "$device_id", "volt": { "$toDouble": "$volt" }, "rssi": { "$toDouble": "$rssi" } } }, { "$group": { "_id": "$device_id", "max_volt": { "$max": "$volt" }, "min_volt": { "$min": "$volt" }, "max_rssi": { "$max": "$rssi" }, "min_rssi": { "$min": "$rssi" } } }, { "$project": { "_id": 0, "device_id": "$_id", "volt_info": { 'max_value': "$max_volt", 'min_value': "$min_volt", 'status': { "$cond": { "if": { "$and": [ {"$lte": ["$max_volt", 15]}, {"$gte": ["$min_volt", 11]} ] }, "then": 1, "else": 0 } } }, "rssi_info": { 'max_value': "$max_rssi", 'min_value': "$min_rssi", 'status': { "$cond": { "if": { "$gte": ["$min_rssi", 14] }, "then": 1, "else": 0 } } } } } ] cursor = self.qxz_info_record_collection.aggregate(pipeline, allowDiskUse=True) result = {item['device_id']: item for item in cursor} return result def get_qxz_device_data(self): pipeline = [ { "$match": { "device_id": { "$in": self.device_list } }, }, { "$project": { "device_id": "$device_id", "dver_num": "$dver_num", "lng": { "$convert": { "input": "$lng", "to": "double", "onError": 0, "onNull": 0 } }, "lat": { "$convert": { "input": "$lat", "to": "double", "onError": 0, "onNull": 0 } } } }, { "$project": { "_id": 0, "device_id": "$device_id", "dver_info": { "status": { "$cond": { "if": { "$eq": ["$dver_num", self.set_stm8vs] }, "then": 1, "else": 0 } }, "dver_num": "$dver_num", "old_value": self.set_stm8vs }, "lng_info": { "status": { "$cond": { "if": { "$and": [ {"$lte": ["$lng", 113.7869444]}, {"$gte": ["$lng", 113.7536111]} ] }, "then": 1, "else": 0 } }, "lng": "$lng" }, "lat_info": { "status": { "$cond": { "if": { "$and": [ {"$lte": ["$lat", 35.0458333]}, {"$gte": ["$lat", 35.0125]} ] }, "then": 1, "else": 0 } }, "lat": "$lat" } } } ] cursor = self.device_collection.aggregate(pipeline, allowDiskUse=True) result = {item['device_id']: item for item in cursor} return result def get_sim_data(self): pipeline = [ { "$match": { "device_id": { "$in": self.device_list } } }, { "$project": { "_id": 0, "device_id": "$device_id", "iccid": "$iccid" } } ] cursor = self.qxz_base_info_collection.aggregate(pipeline, allowDiskUse=True) result = {item['device_id']: item["iccid"] for item in cursor} return result def parse_data(self, device_id, qxz_conf_dict, vr_dict, device_data_dict, sim_data): """解析获取元素数据""" element_dict = {} element_dict["ID#"] = device_id element_dict["检验项目#"] = device_id try: conf_dict = qxz_conf_dict[device_id] device_data = self.get_qxz_element_data(device_id, conf_dict) element_dict.update({conf_dict[k]: v for k, v in device_data.items() if k in conf_dict}) vr_info = vr_dict[device_id] dd_info = device_data_dict[device_id] element_dict["经度#"] = dd_info["lng_info"] element_dict["纬度#"] = dd_info["lat_info"] element_dict["固件版本号#"] = dd_info["dver_info"] element_dict["电压#"] = vr_info["volt_info"] element_dict["信号强度#"] = vr_info["rssi_info"] sim_info = self.sim_info(sim_data[device_id]) element_dict["sim卡信息#"] = { "status": sim_info[0], "msg": sim_info[1] } element_dict["上传数据条数#"] = { "total_count": device_data['total_count'], "uptime_20": device_data["uptime_info"]["uptime_20"], "uptime_40": device_data["uptime_info"]["uptime_40"] } except KeyError as e: pass return element_dict def get_conf_key(self, k): key = k.split('#')[0] if "土壤含水率" in key: key = "土壤含水率" if "土壤温度" in key: key = "土壤温度" return key def get_position(self, lng, lat): if lng and lat: try: ret = requests.post("http://api.map.baidu.com/geocoder?location=%s,%s&coord_type=gcj02&output=json"%(lat,lng)) ret_json = json.loads(ret.text) province, city, district = ret_json["result"]["addressComponent"]["province"], \ ret_json["result"]["addressComponent"]["city"], \ ret_json["result"]["addressComponent"]["district"] return province + city + district except Exception as e: return False else: return False def get_conf_data(self, device_config, device_data, keys): device_config = dict(device_config) for iname, ivalue in device_config.items(): if ivalue and isinstance(ivalue, str): if keys in ivalue: idevice_key_sun = iname idevice_data = [] for d in device_data: print(d, "++") idevice_data.append(float((d.get(idevice_key_sun)).split("#")[0])) return idevice_data def run(self): """主业务逻辑,涉及进度条不能模块化,慢慢捋""" title_name_list = ["ID#", "检验项目#", "电压#", "信号强度#", "经度#", "纬度#", "固件版本号#", "sim卡信息#", "上传数据条数#"] default_func_dict = { "电压": self.cond.get_cond_volt_msg, "信号强度": self.cond.get_cond_rssi_msg, "经度": self.cond.get_cond_lng_msg, "纬度": self.cond.get_cond_lat_msg, "固件版本号": self.cond.get_cond_version_msg, "上传数据条数": self.cond.get_time_uptime_msg, "sim卡信息": self.cond.get_cond_sim_msg } head_name_list, qxz_conf_dict = self._get_bigdata_qxz_conf() for n in head_name_list: if n not in title_name_list: title_name_list.append(n) title_name_list.append("位置信息#") title_name_list.append("单台合格数#") device_data_dict = self.get_qxz_device_data() vr_dict = self.get_qxz_volt_or_rssi_data() sim_data = self.get_sim_data() proess = 0 now_time = datetime.datetime.now() global save_filename save_filename = self.set_order + "_" + now_time.strftime("%m%d") + ".xlsx" save_path = os.path.join(self.save_path,save_filename) workbook = Workbook(save_path) worksheet = workbook.add_worksheet() merge_title_style = workbook.add_format(merge_title_format) toji_style = workbook.add_format(toji_format) default_style = workbook.add_format(default_formal) red_style = workbook.add_format(error_format) green_style = workbook.add_format(formal_format) yellow_style = workbook.add_format(common_format) style_dict = { 0: red_style, 1: green_style, 2: yellow_style, 3: default_style } head_list = [i.split("#")[0] for i in title_name_list] worksheet.merge_range(0, 0, 0, len(head_list) - 1, "物联网气象站设备质检表格", merge_title_style) for index, k in enumerate(title_name_list): value = k.split('#')[0] worksheet.write(2, index, value, default_style) for index, k in enumerate(title_name_list): key = self.get_conf_key(k) try: value = self.cond.head_dict[key] except KeyError as e: value = "无判定条件" worksheet.write(3, index, value, default_style) row_index = 4 qualified = 0 for device_id in self.device_list: self.mongo_ping() element_dict = self.parse_data(device_id, qxz_conf_dict, vr_dict, device_data_dict, sim_data) device_config = self.db.sa_qxz_conf.find_one({"device_id": device_id}, {'_id':0,'id':0}) device_data = self.db.sa_qxz_data.find({"device_id": device_id, "uptime": {"$gte": self.start_time, "$lte": self.end_time}}) # device_machine = self.db.sa_device.find_one({"device_id": device_id}) not_qualified = 0 lng = 0 lat = 0 for index, k in enumerate(title_name_list): status, msg = 3, "无数据" try: value = element_dict[k] key = self.get_conf_key(k) if key in ["ID", "检验项目"]: rt = {"status": 1, "msg": value} elif key in default_func_dict: print(key, "-----------------") if key == "电压": print("----", device_data.count()) result = self.get_conf_data(device_config, device_data, "电压") print(result, "************") rt = default_func_dict[key](value) if key == "经度": lng = value.get("lng") if key == "纬度": lat = value.get("lat") else: if key == "日照时数": idevice_key_sun = "" device_config = dict(device_config) for iname, ivalue in device_config.items(): if ivalue and isinstance(ivalue, str): if "日照时数" in ivalue: idevice_key_sun = iname idevice_data = [] for d in device_data: idevice_data.append(float((d.get(idevice_key_sun)).split("#")[0])) if max(idevice_data) > 0.1: rt = {'status': 1, 'msg': f'最大值:{str(max(idevice_data))},最小值:{str(min(idevice_data))}'} else: rt = {'status': 0, 'msg': f'最大值:{str(max(idevice_data))},最小值:{str(min(idevice_data))}'} elif key == "降雨量累计": idevice_key_rain = "" device_config = dict(device_config) for iname, ivalue in device_config.items(): if ivalue and isinstance(ivalue, str): if "降雨量累计" in ivalue: idevice_key_rain = iname idevice_data = [] for d in device_data: idevice_data.append(float((d.get(idevice_key_rain)).split("#")[0])) if max(idevice_data) > 0.1: rt = {'status': 1, 'msg': f'最大值:{str(max(idevice_data))},最小值:{str(min(idevice_data))}'} else: rt = {'status': 0, 'msg': f'最大值:{str(max(idevice_data))},最小值:{str(min(idevice_data))}'} else: rt = self.cond.cond_msg_dict[key](value) status, msg = rt['status'], rt['msg'] except Exception as e: pass if status == 0: not_qualified += 1 if k == "位置信息#": # 使用经纬度获取地理 result = self.get_position(lng, lat) if result and result == "河南省新乡市原阳县": status = 1 msg = result else: status = 0 msg = result if k == "单台合格数#": status, msg = 0, f"不合格: {not_qualified}" if not_qualified == 0: status, msg = 1, "合格" column_style = style_dict[status] worksheet.write(row_index, index, str(msg), column_style) proess = (index + 1) / len(self.device_list) * 100 if int(proess) == 100: self.proess_signal.emit(99) else: self.proess_signal.emit(int(proess)) if not_qualified == 0: qualified += 1 row_index += 1 toji_data = [ "任务单号", self.set_order, "检验时间", self.start_time_str, self.end_time_str, "报告日期", now_time.strftime("%y-%m-%d %H:%M:%S"), "合格数", qualified ] for i in range(len(head_list) - len(toji_data)): toji_data.append(" ") for index, k in enumerate(toji_data): value = str(k) worksheet.write(1, index, value, toji_style) c_n = len(value) + 15 worksheet.set_column(index, index, c_n) worksheet.protect(pwd_str) workbook.close() self.cursor.close() self.connection.close() self.myclient.close() if __name__ == "__main__": device_list,save_path,start_time,end_time,set_plat,set_stm8vs,set_order = [""] SCDThread().run()