# -*- coding: utf-8 -*- # Form implementation generated from reading ui file '.\ui文件\杀虫灯质检工具.ui' # # Created by: PyQt5 UI code generator 5.15.4 # # WARNING: Any manual changes made to this file will be lost when pyuic5 is # run again. Do not edit this file unless you know what you are doing. # pyinstaller -n "杀虫灯检验工具(v3.2)" -D -w f:\image_down_code\scd_zhijian.py -i .\LOGO.ico from aliyunsdkcore.vendored.requests.auth import HTTPBasicAuth from PyQt5 import QtCore, QtGui, QtWidgets import hashlib import os import sys import re import ast from urllib import parse import json import time import datetime import requests import uuid import pymongo import pymysql from xlrd import open_workbook from xlsxwriter.workbook import Workbook save_filename = "" pwd_str = "yf6021" toji_format = { 'font_name' : '宋体', 'font_size': 14, 'font_color': 'black', 'text_wrap': True, 'bold': False, 'fg_color': '92D050', 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } title_format = { 'font_name' : '宋体', 'font_size': 12, 'bold': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } merge_title_format = { 'font_name' : '宋体', 'font_size': 26, 'bold': True, 'align': 'center', 'valign': 'vcenter', "fg_color": "8DB4E2", 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } explain_formal = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', 'text_wrap': True, 'align': 'justify', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } formal_format = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', 'fg_color': '77E88C', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } common_format = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', "fg_color": 'E7EC73', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } error_format = { 'font_name' : '宋体', 'font_size': 11, 'font_color': 'black', "fg_color": 'F4746A', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } class Ui_MainWindow(object): """GUI界面""" def setupUi(self, MainWindow): MainWindow.setObjectName("MainWindow") MainWindow.resize(701, 644) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(12) MainWindow.setFont(font) icon = QtGui.QIcon("logo.ico") MainWindow.setWindowIcon(icon) self.centralwidget = QtWidgets.QWidget(MainWindow) self.centralwidget.setObjectName("centralwidget") self.pageTitleLabel = QtWidgets.QLabel(self.centralwidget) self.pageTitleLabel.setGeometry(QtCore.QRect(139, 50, 429, 42)) font = QtGui.QFont() font.setFamily("楷体") font.setPointSize(28) self.pageTitleLabel.setFont(font) self.pageTitleLabel.setTextFormat(QtCore.Qt.AutoText) self.pageTitleLabel.setObjectName("pageTitleLabel") self.inputFileLabel = QtWidgets.QLabel(self.centralwidget) self.inputFileLabel.setGeometry(QtCore.QRect(180, 130, 121, 21)) self.inputFileLabel.setObjectName("inputFileLabel") self.inputFileEdit = QtWidgets.QLineEdit(self.centralwidget) self.inputFileEdit.setGeometry(QtCore.QRect(310, 130, 151, 21)) self.inputFileEdit.setFocusPolicy(QtCore.Qt.NoFocus) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(8) self.inputFileEdit.setFont(font) self.inputFileEdit.setObjectName("inputFileEdit") self.inputFileTool = QtWidgets.QToolButton(self.centralwidget) self.inputFileTool.setGeometry(QtCore.QRect(470, 130, 71, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(10) self.inputFileTool.setFont(font) self.inputFileTool.setObjectName("inputFileTool") self.savePathLabel = QtWidgets.QLabel(self.centralwidget) self.savePathLabel.setGeometry(QtCore.QRect(180, 170, 121, 21)) self.savePathLabel.setObjectName("savePathLabel") self.savePathEdit = QtWidgets.QLineEdit(self.centralwidget) self.savePathEdit.setGeometry(QtCore.QRect(310, 170, 151, 21)) self.savePathEdit.setFocusPolicy(QtCore.Qt.NoFocus) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(8) self.savePathEdit.setFont(font) self.savePathEdit.setObjectName("savePathEdit") self.savePathTool = QtWidgets.QToolButton(self.centralwidget) self.savePathTool.setGeometry(QtCore.QRect(470, 170, 71, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(10) self.savePathTool.setFont(font) self.savePathTool.setObjectName("savePathTool") self.startTimeLabel = QtWidgets.QLabel(self.centralwidget) self.startTimeLabel.setGeometry(QtCore.QRect(180, 210, 121, 21)) self.startTimeLabel.setObjectName("startTimeLabel") self.startTimeEdit = QtWidgets.QDateTimeEdit(self.centralwidget) self.startTimeEdit.setGeometry(QtCore.QRect(310, 210, 151, 21)) self.startTimeEdit.setMaximumDateTime(QtCore.QDateTime(QtCore.QDate(2025, 12, 31), QtCore.QTime(23, 59, 59))) self.startTimeEdit.setMinimumDateTime(QtCore.QDateTime(QtCore.QDate(2024, 3, 29), QtCore.QTime(1, 0, 0))) self.startTimeEdit.setObjectName("startTimeEdit") self.endTimeLabel = QtWidgets.QLabel(self.centralwidget) self.endTimeLabel.setGeometry(QtCore.QRect(180, 250, 121, 21)) self.endTimeLabel.setObjectName("endTimeLabel") self.endTimeEdit = QtWidgets.QDateTimeEdit(self.centralwidget) self.endTimeEdit.setGeometry(QtCore.QRect(310, 250, 151, 21)) self.endTimeEdit.setMaximumDateTime(QtCore.QDateTime(QtCore.QDate(2025, 12, 31), QtCore.QTime(23, 59, 59))) self.endTimeEdit.setMinimumDateTime(QtCore.QDateTime(QtCore.QDate(2024, 3, 30), QtCore.QTime(1, 0, 0))) self.endTimeEdit.setObjectName("endTimeEdit") self.platLabel = QtWidgets.QLabel(self.centralwidget) self.platLabel.setGeometry(QtCore.QRect(180, 290, 121, 21)) self.platLabel.setObjectName("dianjiLabel") self.platBox = QtWidgets.QComboBox(self.centralwidget) self.platBox.setGeometry(QtCore.QRect(310, 290, 151, 21)) self.platBox.addItems(['大数据平台','四情平台']) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.platBox.setFont(font) self.platBox.setObjectName("platBox") self.stm8vsLabel = QtWidgets.QLabel(self.centralwidget) self.stm8vsLabel.setGeometry(QtCore.QRect(180, 330, 121, 21)) self.stm8vsLabel.setObjectName("stm8vsLabel") self.stm8vsEdit = QtWidgets.QLineEdit(self.centralwidget) self.stm8vsEdit.setGeometry(QtCore.QRect(310, 330, 151, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.stm8vsEdit.setFont(font) self.stm8vsEdit.setObjectName("stm8vsEdit") self.dverLabel = QtWidgets.QLabel(self.centralwidget) self.dverLabel.setGeometry(QtCore.QRect(180, 370, 121, 21)) self.dverLabel.setObjectName("dverLabel") self.dverEdit = QtWidgets.QLineEdit(self.centralwidget) self.dverEdit.setGeometry(QtCore.QRect(310, 370, 151, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.dverEdit.setFont(font) self.dverEdit.setObjectName("dverEdit") self.orderLabel = QtWidgets.QLabel(self.centralwidget) self.orderLabel.setGeometry(QtCore.QRect(180, 410, 121, 21)) self.orderLabel.setObjectName("dverLabel") self.orderEdit = QtWidgets.QLineEdit(self.centralwidget) self.orderEdit.setGeometry(QtCore.QRect(310, 410, 151, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.orderEdit.setFont(font) self.orderEdit.setObjectName("dverEdit") self.pushButton = QtWidgets.QPushButton(self.centralwidget) self.pushButton.setGeometry(QtCore.QRect(300, 450, 111, 41)) self.pushButton.setObjectName("pushButton") self.progressBar = QtWidgets.QProgressBar(self.centralwidget) self.progressBar.setGeometry(QtCore.QRect(130, 510, 491, 31)) self.progressBar.setProperty("value", 0) self.progressBar.setVisible(False) self.progressBar.setObjectName("progressBar") MainWindow.setCentralWidget(self.centralwidget) self.retranslateUi(MainWindow) self.inputFileTool.clicked.connect(self.input_file_path) self.savePathTool.clicked.connect(self.out_save_location) self.pushButton.clicked.connect(self.on_click) QtCore.QMetaObject.connectSlotsByName(MainWindow) def retranslateUi(self, MainWindow): _translate = QtCore.QCoreApplication.translate MainWindow.setWindowTitle(_translate("MainWindow", "杀虫灯质检工具")) self.savePathTool.setText(_translate("MainWindow", "选择文件夹")) self.inputFileTool.setText(_translate("MainWindow", "选择文件")) self.pageTitleLabel.setText(_translate("MainWindow", "

云飞杀虫灯设备质检工具

")) self.savePathLabel.setText(_translate("MainWindow", "|输出文件位置:")) self.inputFileLabel.setText(_translate("MainWindow", "|输入文件位置:")) self.startTimeLabel.setText(_translate("MainWindow", "|开始时间:")) self.endTimeLabel.setText(_translate("MainWindow", "|结束时间:")) self.platLabel.setText(_translate("MainWindow", "|检验平台:")) self.stm8vsLabel.setText(_translate("MainWindow", "|主板版本号:")) self.dverLabel.setText(_translate("MainWindow", "|联网模块版本号:")) self.orderLabel.setText(_translate("MainWindow", "|任务单号:")) self.pushButton.setText(_translate("MainWindow", "开始导出")) def input_file_path(self): file_path = QtWidgets.QFileDialog.getOpenFileName(None,"选取文件","./","All Files (*.xlsx;*.xls);;Text Files (*.txt)") self.inputFileEdit.setText(file_path[0]) self.inputFileEdit.setStyleSheet("color:black;") def out_save_location(self): fname = QtWidgets.QFileDialog.getExistingDirectory(None, '选取文件夹', './') self.savePathEdit.setText(fname) self.savePathEdit.setStyleSheet("color:black;") def on_click(self): file_path = self.inputFileEdit.text() save_path = self.savePathEdit.text() start_time = self.startTimeEdit.dateTime().toPyDateTime() end_time = self.endTimeEdit.dateTime().toPyDateTime() set_plat = self.platBox.currentText() set_stm8vs = self.stm8vsEdit.text() set_dver = self.dverEdit.text() set_order = self.orderEdit.text() if not all ([file_path,save_path,start_time,end_time,set_stm8vs,set_dver,set_order]): QtWidgets.QMessageBox.information(None, "提示", "选项未填写完整") elif end_time < start_time: QtWidgets.QMessageBox.information(None, "提示", "结束日期应在开始日期之后") else: xls = open_workbook(file_path) sheet_object = xls.sheets()[0] ncols = sheet_object.ncols read_dict = {} for i in range(ncols): col_value = sheet_object.col_values(i) read_dict[col_value[0]] = col_value[1:] device_list = read_dict.get("设备ID") if device_list: self.inputFileTool.setEnabled(False) self.savePathTool.setEnabled(False) self.startTimeEdit.setEnabled(False) self.endTimeEdit.setEnabled(False) self.platBox.setEnabled(False) self.stm8vsEdit.setEnabled(False) self.dverEdit.setEnabled(False) self.orderEdit.setEnabled(False) self.pushButton.setEnabled(False) self.pushButton.setText("执行中...") self.runThread = SCDThread(device_list,save_path,start_time,end_time,set_plat,set_stm8vs,set_dver,set_order) self.runThread.proess_signal.connect(self.set_progressbar_value) self.runThread.start() self.progressBar.setVisible(True) else: QtWidgets.QMessageBox.information(None, "提示", "输入文件无'设备ID'列或该列无数据") def set_progressbar_value(self, value): self.progressBar.setValue(value) if value == 100: QtWidgets.QMessageBox.information(None, "提示", "文件导出完毕!导出文件名:\n{}".format(save_filename)) self.inputFileTool.setEnabled(True) self.inputFileEdit.setText("") self.savePathTool.setEnabled(True) self.savePathEdit.setText("") self.startTimeEdit.setEnabled(True) self.endTimeEdit.setEnabled(True) self.platBox.setEnabled(True) self.stm8vsEdit.setEnabled(True) self.stm8vsEdit.setText("") self.dverEdit.setEnabled(True) self.dverEdit.setText("") self.orderEdit.setEnabled(True) self.orderEdit.setText("") self.pushButton.setEnabled(True) self.pushButton.setText("开始导出") self.progressBar.setVisible(False) self.progressBar.setValue(0) return class SCDThread(QtCore.QThread): """涉及进度条需主界面动态执行GUI,线程执行业务逻辑,避免主页面卡死""" proess_signal = QtCore.pyqtSignal(int) def __init__(self,device_list,save_path,start_time,end_time,set_plat,set_stm8vs,set_dver,set_order): super(SCDThread, self).__init__() self.device_list = device_list self.save_path = save_path self.start_time = time.mktime(start_time.timetuple()) self.end_time = time.mktime(end_time.timetuple()) self.start_time_str = start_time.strftime("%y-%m-%d %H:%M:%S") self.end_time_str = end_time.strftime("%y-%m-%d %H:%M:%S") self.set_plat = set_plat self.set_stm8vs = set_stm8vs self.set_dver = set_dver self.set_order = set_order self.user = parse.quote_plus("root") self.passwd = parse.quote_plus("yfkj@6020") self.myclient = pymongo.MongoClient("mongodb://{0}:{1}@8.136.98.49:57017/".format(self.user,self.passwd)) self.db = self.myclient.smartfarming self.device_collection = self.db.sa_device self.scd_collection = self.db.sa_device_scd_data self.config = { 'host': '120.27.222.26', 'port': 3306, 'user': 'yfwlw', 'password': 'sql_yfkj_6019', 'db': 'yfwlw', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor, } self.connection = pymysql.connect(**self.config) self.cursor = self.connection.cursor() device_list_tp = [] for d in device_list: d_id, device_id, platform = self.device_their_platform(d) device_list_tp.append(device_id) self.device_list = device_list_tp def mongo_ping(self): """mongo-ping预防连接失效""" try: self.myclient.admin.command('ping') except: # "ConnectionFailure" self.myclient = pymongo.MongoClient("mongodb://{0}:{1}@8.136.98.49:57017/".format(self.user,self.passwd)) self.db = self.myclient.smartfarming self.device_collection = self.db.sa_device self.scd_collection = self.db.sa_device_scd_data def sql_ping(self): """mysql-ping 预防连接失效""" try: self.connection.ping() except: self.connection = pymysql.connect(**self.config) self.cursor = self.connection.cursor() def __time_dif(self,checkdatetime): """计算时间差""" nowdatetime = datetime.datetime.now() checkdatetime = datetime.datetime.strptime(checkdatetime, "%Y-%m-%d %H:%M:%S") timedif = checkdatetime - nowdatetime return timedif.days def device_their_platform(self,shortId): """确定设备所在平台已经完整设备号""" self.mongo_ping() self.sql_ping() regex = re.compile('.*{}$'.format(shortId)) bd_device_dict = self.device_collection.find_one( filter = {"device_id":regex,"device_type_id":2}, projection = {'_id': 0}, sort = [('uptime', pymongo.DESCENDING)] ) device_sql = "SELECT * FROM AppInfoManage_scdstatus WHERE equip_id_id LIKE '%{}' ORDER BY upl_time DESC LIMIT 1;".format(shortId) self.cursor.execute(device_sql) sq_device_dict = self.cursor.fetchone() if bd_device_dict and sq_device_dict: bd_upltime = bd_device_dict["uptime"] sq_upltime = time.mktime(sq_device_dict["upl_time"].timetuple()) if bd_upltime >= sq_upltime: d_id = bd_device_dict["id"] deviceId = bd_device_dict["device_id"] platform = "大数据平台" else: d_id = "" deviceId = sq_device_dict["equip_id_id"] platform = "四情平台" elif bd_device_dict and not sq_device_dict: d_id = bd_device_dict["id"] deviceId = bd_device_dict["device_id"] platform = "大数据平台" return d_id,deviceId,platform elif not bd_device_dict and sq_device_dict: d_id = "" deviceId = sq_device_dict["equip_id_id"] platform = "四情平台" else: d_id = "" deviceId = "平台无此设备" platform = "未知" return d_id,deviceId,platform def sim_info(self,iccid): # 时间戳 用于获取sign timestamp = int(time.time()) current_milli_time = lambda: int(round(time.time() * 1000)) data_1 = "appid=%s&iccid=%s×tamp=%s%s"%("102420177762",iccid,current_milli_time(),"6397d7e6a56589f1d93284e9800493e1") sign = hashlib.sha256(data_1.encode('utf-8')).hexdigest() data = {"appid": "102420177762", "iccid": iccid, "timestamp":current_milli_time(),"sign":sign} url = "https://api.simboss.com/2.0/device/detail" try: status = 1 ret = requests.post(url, data=data) code = json.loads(ret.text)["code"] if code == "0": status = 1 else: url = 'http://sim.brlink.cn/api/open/iotcard/card' appkey = "iaO2DKgS8KdlnVgU" appsecret = "qzKgO4sBdzMrjRwv9H22S9ufepNv8Hl5ehPqkYVD31DCICjyKwqUdj7zihQQKfgx" status = 2 ret = requests.post(url,json={'iccid':iccid},auth=HTTPBasicAuth(appkey,appsecret),timeout=(5,10)) codes = json.loads(ret.text)["code"] if codes == 0: status = 2 else: url = "https://jsnl.xmnengjia.com/open/api/module/cards" data = {"iccids":[iccid]} data = json.dumps(data) ret = requests.post(url,data=data,timeout=(10,30)) print(ret.text) status = 3 except: status = 0 ret = 0 day = 0 if ret: try: result = json.loads(ret.text) expiry_date = result.get("data", {}).get("expiry_date") now_date = int(time.time()) time_difference = int((expiry_date - now_date) / 3600 / 24) print(time_difference) if time_difference < 30: return [0,"有效期剩余{}天".format(time_difference)] elif time_difference >= 181: return [1,"有效期剩余{}天".format(time_difference)] else: return [2,"有效期剩余{}天".format(time_difference)] except: return [0, "查询无结果"] else: return [0, "查询无结果"] def __bigdata_verify(self,data_cursor,proess): ds_verify,ct_verify,cv_verify,bv_verify,rps_verify = [1,"合格"],[0,"无电击数据"],[0,"不存在充电状态"],[1,"合格"],[0,"无雨控数据"] tt_verify,stm8vs_verify,dver_verify,lng_verify,lat_verify = [1,"合格"],[1,self.set_stm8vs],[1,self.set_dver],[1,"合格"],[1,"合格"] ws_list,ws_count = [],0 iccid = "" # 经纬度展示不合格条数 lng_lst = [] lat_lst = [] for index,i in enumerate(data_cursor): addtime = i["addtime"] data_strftime = datetime.datetime.fromtimestamp(addtime).strftime("%Y-%m-%d %H:%M:%S") device_data = i["device_data"] device_data = ast.literal_eval(device_data) if device_data.get("ds","0") == "0": ds_verify[0], ds_verify[1] = 0, data_strftime + "为关机状态" ws = device_data.get("ws","0") ws_list.append(ws) if ws == "1": ws_count += 1 elif ws == "2": if float(device_data.get("cv","0")) >= 7 and float(device_data.get("cv","0")) <= 23: if "电压异常" in cv_verify[1]: pass else: cv_verify[0],cv_verify[1] = 1,"合格" else: cv_verify[0],cv_verify[1] = 0,"{}电压异常:{}v".format(data_strftime,float(device_data.get("cv","0"))) dianji_count = int(device_data.get("ct","0")) if dianji_count != 0: if dianji_count > 1000: ct_verify[0],ct_verify[1] = 0, "{}电击数超限:{}".format(data_strftime,str(dianji_count)) else: if "电击数超限" not in ct_verify[1]: ct_verify[0],ct_verify[1] = 1,"合格" if float(device_data.get("bv","0")) == 12 or float(device_data.get("bv","0")) > 15: bv_verify[0], bv_verify[1] = 0, "{}电压异常:{}v".format(data_strftime,float(device_data.get("bv","0"))) if device_data.get("rps","0") == "1": rps_verify[0], rps_verify[1] = 1, data_strftime+"进入雨控" if device_data.get("ts","0") == "0": if device_data.get("tt","0") != "4": tt_verify[0], tt_verify[1] = 0, "光控定时时长为{}小时".format(device_data.get("tt","0")) else: tt_verify[0], tt_verify[1] = 0, "为时控模式" if device_data.get("stm8vs","0") != self.set_stm8vs: stm8vs_verify[0],stm8vs_verify[1] = 0, device_data.get("stm8vs","0") if device_data.get("iccid",""): iccid = device_data.get("iccid","") if device_data.get("dver","0") != self.set_dver: dver_verify[0],dver_verify[1] = 0, device_data.get("dver","0") # if float(device_data.get("lng","0")) < 113.7536111 or float(device_data.get("lng","0")) > 113.7869444: # lng_verify[0],lng_verify[1] = 0, data_strftime+"超出范围" # if float(device_data.get("lat","0")) < 35.0125 or float(device_data.get("lat","0")) > 35.0458333: # lat_verify[0],lat_verify[1] = 0, data_strftime+"超出范围" try: lng_lst.append(float(device_data.get("lng"))) except Exception as e: lng_lst.append(0) try: lat_lst.append(float(device_data.get("lat"))) except Exception as e: lat_lst.append(0) prosee = proess + ((index+1)/data_cursor.count()*(1/len(self.device_list))*100) if int(prosee) == 100: self.proess_signal.emit(99) else: self.proess_signal.emit(int(prosee)) if "0" in ws_list: if "1" in ws_list: if "2" in ws_list: stamp_diff = self.end_time - self.start_time day_diff = round(stamp_diff / (12*60*60)) if day_diff <= 1: low_limit, high_limit = 20, 30 else: low_limit, high_limit = 20*day_diff, 30*day_diff if ws_count>low_limit and ws_count 113.7869444: lg += 1 for k in lat_lst: if k < 35.0125 or k > 35.0458333: lt += 1 if lg != 0: lng_verify = [0, f"{str(min(lng_lst))}~{str(max(lng_lst))},不合格条数{str(lg)}"] if lt != 0: lat_verify = [0, f"{str(min(lat_lst))}~{str(max(lat_lst))},不合格条数{str(lt)}"] return [ds_verify,ws_verify,ct_verify,cv_verify,bv_verify,rps_verify,tt_verify,stm8vs_verify,sim_verify,dver_verify,lng_verify,lat_verify] def __siqing_verify(self,data_result,status_result,proess): ds_verify,ct_verify,cv_verify,bv_verify,rps_verify = [1,"合格"],[0,"无电击数据"],[0,"不存在充电状态"],[1,"合格"],[0,"无雨控数据"] tt_verify,stm8vs_verify,dver_verify,lng_verify,lat_verify = [1,"合格"],[1,self.set_stm8vs],[1,self.set_dver],[1,"合格"],[1,"合格"] ws_list,ws_count = [],0 iccid = "" for i in status_result: data_strftime = i["upl_time"].strftime("%Y-%m-%d %H:%M:%S") device_data = i["scd_status"] device_data = ast.literal_eval(device_data) if device_data.get("stm8vs","0") != self.set_stm8vs: stm8vs_verify[0],stm8vs_verify[1] = 0, device_data.get("stm8vs","0") if device_data.get("iccid",""): iccid = device_data.get("iccid","") if device_data.get("dver","0") != self.set_dver: dver_verify[0],dver_verify[1] = 0, device_data.get("dver","0") for index,i in enumerate(data_result): data_strftime = i["upl_time"].strftime("%Y-%m-%d %H:%M:%S") device_data = i["scd_data"] device_data = ast.literal_eval(device_data) if str(device_data.get("ds",0)) == "0": ds_verify[0], ds_verify[1] = 0, data_strftime + "为关机状态" ws = str(device_data.get("ws",0)) ws_list.append(ws) if ws == "1": ws_count += 1 elif ws == "2": if device_data.get("cv",0)/1000 >= 7 and device_data.get("cv",0)/1000 <= 23: if "电压异常" in cv_verify[1]: pass else: cv_verify[0],cv_verify[1] = 1,"合格" else: cv_verify[0],cv_verify[1] = 0,"{}电压异常:{}v".format(data_strftime,device_data.get("cv",0)/1000) dianji_count = device_data.get("ct",0) if dianji_count != 0: if dianji_count > 1000: ct_verify[0],ct_verify[1] = 0, "{}电击数超限:{}".format(data_strftime,dianji_count) else: if "电击数超限" not in ct_verify[1]: ct_verify[0],ct_verify[1] = 1,"合格" if device_data.get("bv",0)/1000 == 12 or device_data.get("bv",0)/1000 > 15: bv_verify[0], bv_verify[1] = 0, "{}电压异常:{}v".format(data_strftime,device_data.get("bv",0)/1000) if str(device_data.get("rps",0)) == "1": rps_verify[0], rps_verify[1] = 1, data_strftime+"进入雨控" if str(device_data.get("tcs",0)) == "0": if str(device_data.get("tt",0)) != "4": tt_verify[0], tt_verify[1] = 0, "光控定时时长为{}小时".format(str(device_data.get("tt",0))) else: tt_verify[0], tt_verify[1] = 0, "为时控模式" if float(device_data.get("lng","0")) < 113.7536111 and float(device_data.get("lng","0")) > 113.7869444: lng_verify[0],lng_verify[1] = 0, data_strftime+"超出范围" if float(device_data.get("lat","0")) < 35.0125 and float(device_data.get("lat","0")) > 35.0458333: lat_verify[0],lat_verify[1] = 0, data_strftime+"超出范围" prosee = proess + ((index+1)/len(data_result)*(1/len(self.device_list))*100) if int(prosee) == 100: self.proess_signal.emit(99) else: self.proess_signal.emit(int(prosee)) if "0" in ws_list: if "1" in ws_list: if "2" in ws_list: stamp_diff = self.end_time - self.start_time day_diff = round(stamp_diff / (12*60*60)) if day_diff <= 1: low_limit, high_limit = 20, 30 else: low_limit, high_limit = 20*day_diff, 30*day_diff if ws_count>low_limit and ws_count=8,不合格允许DF两列出现异常;\n2.若不超8台要求单机判定全部合格。" ] worksheet.write_row(row = 3 ,col = 0, data = explain_data,cell_format=explain_style) formal_counts = 0 verify_list = [] for index,shortId in enumerate(self.device_list): if isinstance(shortId,int) or isinstance(shortId, float): shortId = str(int(shortId)) else: shortId = shortId.strip() d_id,deviceId,platform = self.device_their_platform(shortId) worksheet.write(index+4,0,shortId,formal_style) if platform == "大数据平台": try: worksheet.write(index+4,1,deviceId,formal_style) if platform==self.set_plat: worksheet.write(index+4,14,platform,formal_style) else: worksheet.write(index+4,14,platform,error_style) self.mongo_ping() data_cursor = self.scd_collection.find({"device_id":d_id,'addtime': {"$gte":self.start_time ,"$lte":self.end_time}}) data_counts = data_cursor.count() if data_counts == 0: for i in range(2,14): worksheet.write(index+4,i,"搜索时间内无数据",error_style) worksheet.write(index+4,15,"不合格",error_style) danji_verify = [] for i in range(12): danji_verify.append(0) verify_list.append(danji_verify) else: verify_data = self.__bigdata_verify(data_cursor,proess) danji_verify = [] for clo_index, verify_ in enumerate(verify_data): danji_verify.append(verify_[0]) if verify_[0] == 0: worksheet.write(index+4,clo_index+2,verify_[1],error_style) elif verify_[0] == 1: worksheet.write(index+4,clo_index+2,verify_[1],formal_style) else: worksheet.write(index+4,clo_index+2,verify_[1],common_style) if 0 in danji_verify: worksheet.write(index+4,15,"不合格",error_style) else: formal_counts += 1 worksheet.write(index+4,15,"合格",formal_style) verify_list.append(danji_verify) except Exception as e: print(e) elif platform == "四情平台": worksheet.write(index+4,1,deviceId,formal_style) if platform==self.set_plat: worksheet.write(index+4,14,platform,formal_style) else: worksheet.write(index+4,14,platform,error_style) self.sql_ping() data_sql = "SELECT * FROM AppInfoManage_scddata WHERE equip_id_id='{}' AND upl_time BETWEEN '{}' AND '{}';".format(deviceId,self.start_time_str,self.end_time_str) self.cursor.execute(data_sql) data_result = self.cursor.fetchall() status_sql = "SELECT * FROM AppInfoManage_scdstatus_all WHERE equip_id_id='{}' AND upl_time BETWEEN '{}' AND '{}';".format(deviceId,self.start_time_str,self.end_time_str) self.cursor.execute(status_sql) status_result = self.cursor.fetchall() if len(data_result) == 0 or len(status_result) == 0: for i in range(2,14): worksheet.write(index+4,i,"搜索时间内无数据",error_style) worksheet.write(index+4,15,"不合格",error_style) danji_verify = [] for i in range(12): danji_verify.append(0) verify_list.append(danji_verify) else: verify_data = self.__siqing_verify(data_result,status_result,proess) danji_verify = [] for clo_index, verify_ in enumerate(verify_data): danji_verify.append(verify_[0]) if verify_[0] == 0: worksheet.write(index+4,clo_index+2,verify_[1],error_style) elif verify_[0] == 1: worksheet.write(index+4,clo_index+2,verify_[1],formal_style) else: worksheet.write(index+4,clo_index+2,verify_[1],common_style) if 0 in danji_verify: worksheet.write(index+4,15,"不合格",error_style) else: formal_counts += 1 worksheet.write(index+4,15,"合格",formal_style) verify_list.append(danji_verify) else: danji_verify = [] for i in range(12): danji_verify.append(0) verify_list.append(danji_verify) worksheet.write(index+4,1,deviceId,error_style) worksheet.write(index+4,14,platform,error_style) proess = (index+1)/len(self.device_list) * 100 if int(proess) == 100: self.proess_signal.emit(99) else: self.proess_signal.emit(int(proess)) toji_data = [ "任务单号",self.set_order,"检验时间",'{}\n{}'.format(self.start_time_str,self.end_time_str), "检验数量",len(self.device_list),"合格数",formal_counts,"检验平台",self.set_plat, "主板版本",self.set_stm8vs,"联网版本",self.set_dver,"报告日期",now_time.strftime("%y-%m-%d %H:%M:%S"),"" ] worksheet.write_row(row = 1 ,col = 0, data = toji_data,cell_format=toji_style) # 批判定 if len(self.device_list) > 8: batch_verify = [1,"设备量超8台,单机合格数量存在8台及以上,排除DF两列后其他列均合格"] if formal_counts >= 8: for danji_data in verify_list: for index,element_data in enumerate(danji_data): if index == 1 or index == 3: pass else: if element_data == 0: batch_verify = [0,"设备量超8台,单机合格数量存在8台及以上,排除DF两列后其他列出现异常"] else: batch_verify = [0,"设备量超8台,单机合格数量不足8台"] else: if formal_counts == len(self.device_list): batch_verify = [1,"设备量不超8台,单机判定全部合格"] else: batch_verify = [0,"设备量不超8台,单机判定存在不合格"] worksheet.merge_range("Q5:Q13",batch_verify[1],formal_style if batch_verify[0]==1 else error_style) worksheet.protect(pwd_str) workbook.close() self.cursor.close() self.connection.close() self.myclient.close() self.proess_signal.emit(100) if __name__== "__main__": QtWidgets.QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling) QtGui.QGuiApplication.setAttribute(QtCore.Qt.HighDpiScaleFactorRoundingPolicy.PassThrough) app = QtWidgets.QApplication(sys.argv) MainWindow = QtWidgets.QMainWindow() ui = Ui_MainWindow() ui.setupUi(MainWindow) MainWindow.show() sys.exit(app.exec_())