from asyncio import ensure_future, shield from concurrent.futures import process from platform import python_build from turtle import st from PyQt5 import QtCore, QtGui, QtWidgets import shutil import os import sys import re import ast from urllib import parse import json import time import datetime import requests import pymongo import pymysql from xlrd import open_workbook from xlsxwriter.workbook import Workbook import numpy as np import cv2 from aliyunsdkcore.vendored.requests.auth import HTTPBasicAuth import requests import datetime import time import hashlib from skimage import filters save_filename = "" pwd_str = "yf6021" toji_format = { 'font_name' : '宋体', 'font_size': 12, 'font_color': 'black', 'text_wrap': True, 'bold': False, 'fg_color': '92D050', 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } title_format = { 'font_name' : '宋体', 'font_size': 10, 'bold': True, 'align': 'center', 'valign': 'vcenter', "text_wrap": True, 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } merge_title_format = { 'font_name' : '宋体', 'font_size': 22, 'bold': True, 'align': 'center', 'valign': 'vcenter', "fg_color": "8DB4E2", 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } explain_formal = { 'font_name' : '宋体', 'font_size': 9, 'font_color': 'black', 'text_wrap': True, 'align': 'justify', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } formal_format = { 'font_name' : '宋体', 'font_size': 9, 'font_color': 'black', 'fg_color': '77E88C', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } common_format = { 'font_name' : '宋体', 'font_size': 9, 'font_color': 'black', "fg_color": 'E7EC73', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } error_format = { 'font_name' : '宋体', 'font_size': 9, 'font_color': 'black', "fg_color": 'F4746A', 'text_wrap': True, 'align': 'center', 'valign': 'vcenter', 'border': 1, 'top': 1, 'left': 1, 'right': 1, 'bottom': 1 } class Ui_MainWindow(object): """GUI界面""" def setupUi(self, MainWindow): MainWindow.setObjectName("MainWindow") MainWindow.resize(701, 644) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(12) MainWindow.setFont(font) icon = QtGui.QIcon("logo.ico") MainWindow.setWindowIcon(icon) self.centralwidget = QtWidgets.QWidget(MainWindow) self.centralwidget.setObjectName("centralwidget") self.pageTitleLabel = QtWidgets.QLabel(self.centralwidget) self.pageTitleLabel.setGeometry(QtCore.QRect(139, 50, 429, 42)) font = QtGui.QFont() font.setFamily("楷体") font.setPointSize(28) self.pageTitleLabel.setFont(font) self.pageTitleLabel.setTextFormat(QtCore.Qt.AutoText) self.pageTitleLabel.setObjectName("pageTitleLabel") self.inputFileLabel = QtWidgets.QLabel(self.centralwidget) self.inputFileLabel.setGeometry(QtCore.QRect(180, 130, 121, 21)) self.inputFileLabel.setObjectName("inputFileLabel") self.inputFileEdit = QtWidgets.QLineEdit(self.centralwidget) self.inputFileEdit.setGeometry(QtCore.QRect(310, 130, 151, 21)) self.inputFileEdit.setFocusPolicy(QtCore.Qt.NoFocus) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(8) self.inputFileEdit.setFont(font) self.inputFileEdit.setObjectName("inputFileEdit") self.inputFileTool = QtWidgets.QToolButton(self.centralwidget) self.inputFileTool.setGeometry(QtCore.QRect(470, 130, 71, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(10) self.inputFileTool.setFont(font) self.inputFileTool.setObjectName("inputFileTool") self.savePathLabel = QtWidgets.QLabel(self.centralwidget) self.savePathLabel.setGeometry(QtCore.QRect(180, 170, 121, 21)) self.savePathLabel.setObjectName("savePathLabel") self.savePathEdit = QtWidgets.QLineEdit(self.centralwidget) self.savePathEdit.setGeometry(QtCore.QRect(310, 170, 151, 21)) self.savePathEdit.setFocusPolicy(QtCore.Qt.NoFocus) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(8) self.savePathEdit.setFont(font) self.savePathEdit.setObjectName("savePathEdit") self.savePathTool = QtWidgets.QToolButton(self.centralwidget) self.savePathTool.setGeometry(QtCore.QRect(470, 170, 71, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(10) self.savePathTool.setFont(font) self.savePathTool.setObjectName("savePathTool") self.startTimeLabel = QtWidgets.QLabel(self.centralwidget) self.startTimeLabel.setGeometry(QtCore.QRect(180, 210, 121, 21)) self.startTimeLabel.setObjectName("startTimeLabel") self.startTimeEdit = QtWidgets.QDateTimeEdit(self.centralwidget) self.startTimeEdit.setGeometry(QtCore.QRect(310, 210, 151, 21)) self.startTimeEdit.setMaximumDateTime(QtCore.QDateTime(QtCore.QDate(2028, 12, 31), QtCore.QTime(23, 59, 59))) self.startTimeEdit.setMinimumDateTime(QtCore.QDateTime(QtCore.QDate(2024, 1, 1), QtCore.QTime(12, 0, 0))) self.startTimeEdit.setObjectName("startTimeEdit") self.endTimeLabel = QtWidgets.QLabel(self.centralwidget) self.endTimeLabel.setGeometry(QtCore.QRect(180, 250, 121, 21)) self.endTimeLabel.setObjectName("endTimeLabel") self.endTimeEdit = QtWidgets.QDateTimeEdit(self.centralwidget) self.endTimeEdit.setGeometry(QtCore.QRect(310, 250, 151, 21)) self.endTimeEdit.setMaximumDateTime(QtCore.QDateTime(QtCore.QDate(2028, 12, 31), QtCore.QTime(23, 59, 59))) self.endTimeEdit.setMinimumDateTime(QtCore.QDateTime(QtCore.QDate(2024, 1, 1), QtCore.QTime(12, 0, 0))) self.endTimeEdit.setObjectName("endTimeEdit") self.platLabel = QtWidgets.QLabel(self.centralwidget) self.platLabel.setGeometry(QtCore.QRect(180, 290, 121, 21)) self.platLabel.setObjectName("dianjiLabel") self.platBox = QtWidgets.QComboBox(self.centralwidget) self.platBox.setGeometry(QtCore.QRect(310, 290, 151, 21)) self.platBox.addItems(['大数据平台','四情平台']) # 产品名称 self.productLabel = QtWidgets.QLabel(self.centralwidget) self.productLabel.setGeometry(QtCore.QRect(180, 440, 121, 21)) self.productLabel.setObjectName("productLabel") self.productBox = QtWidgets.QComboBox(self.centralwidget) self.productBox.setGeometry(QtCore.QRect(310, 440, 151, 21)) self.productBox.addItems(['1.0测报灯','4.0测报灯', "水稻测报灯"]) # 供电选择 self.powerSupplyLabel = QtWidgets.QLabel(self.centralwidget) self.powerSupplyLabel.setGeometry(QtCore.QRect(180, 470, 121, 21)) self.powerSupplyLabel.setObjectName("powerSupplyLabel") self.powerSupplyBox = QtWidgets.QComboBox(self.centralwidget) self.powerSupplyBox.setGeometry(QtCore.QRect(310, 470, 151, 21)) self.powerSupplyBox.addItems(['AC','DC']) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.platBox.setFont(font) self.platBox.setObjectName("platBox") self.stm8vsLabel = QtWidgets.QLabel(self.centralwidget) self.stm8vsLabel.setGeometry(QtCore.QRect(180, 330, 121, 21)) self.stm8vsLabel.setObjectName("stm8vsLabel") self.stm8vsEdit = QtWidgets.QLineEdit(self.centralwidget) self.stm8vsEdit.setGeometry(QtCore.QRect(310, 330, 151, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.stm8vsEdit.setFont(font) self.stm8vsEdit.setObjectName("stm8vsEdit") self.dverLabel = QtWidgets.QLabel(self.centralwidget) self.dverLabel.setGeometry(QtCore.QRect(180, 370, 121, 21)) self.dverLabel.setObjectName("dverLabel") self.dverEdit = QtWidgets.QLineEdit(self.centralwidget) self.dverEdit.setGeometry(QtCore.QRect(310, 370, 151, 21)) font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.dverEdit.setFont(font) self.dverEdit.setObjectName("dverEdit") self.orderLabel = QtWidgets.QLabel(self.centralwidget) self.orderLabel.setGeometry(QtCore.QRect(180, 410, 121, 21)) self.orderLabel.setObjectName("dverLabel") self.orderEdit = QtWidgets.QLineEdit(self.centralwidget) self.orderEdit.setGeometry(QtCore.QRect(310, 410, 151, 21)) # 进度条与按钮 font = QtGui.QFont() font.setFamily("Arial") font.setPointSize(11) self.orderEdit.setFont(font) self.orderEdit.setObjectName("dverEdit") self.pushButton = QtWidgets.QPushButton(self.centralwidget) self.pushButton.setGeometry(QtCore.QRect(300, 520, 111, 41)) self.pushButton.setObjectName("pushButton") self.progressBar = QtWidgets.QProgressBar(self.centralwidget) self.progressBar.setGeometry(QtCore.QRect(130, 540, 491, 31)) self.progressBar.setProperty("value", 0) self.progressBar.setVisible(False) self.progressBar.setObjectName("progressBar") MainWindow.setCentralWidget(self.centralwidget) self.retranslateUi(MainWindow) self.inputFileTool.clicked.connect(self.input_file_path) self.savePathTool.clicked.connect(self.out_save_location) self.pushButton.clicked.connect(self.on_click) QtCore.QMetaObject.connectSlotsByName(MainWindow) def retranslateUi(self, MainWindow): _translate = QtCore.QCoreApplication.translate MainWindow.setWindowTitle(_translate("MainWindow", "杀虫灯质检工具")) self.savePathTool.setText(_translate("MainWindow", "选择文件夹")) self.inputFileTool.setText(_translate("MainWindow", "选择文件")) self.pageTitleLabel.setText(_translate("MainWindow", "

云飞测报灯设备质检工具

")) self.savePathLabel.setText(_translate("MainWindow", "|输出文件位置:")) self.inputFileLabel.setText(_translate("MainWindow", "|输入文件位置:")) self.startTimeLabel.setText(_translate("MainWindow", "|开始时间:")) self.endTimeLabel.setText(_translate("MainWindow", "|结束时间:")) self.platLabel.setText(_translate("MainWindow", "|检验平台:")) self.productLabel.setText(_translate("MainWindow", "|产品名称:")) self.powerSupplyLabel.setText(_translate("MainWindow", "|供电选择:")) self.stm8vsLabel.setText(_translate("MainWindow", "|系统固件版本号:")) self.dverLabel.setText(_translate("MainWindow", "|RTU固件版本号: ")) self.orderLabel.setText(_translate("MainWindow", "|任务单号:")) self.pushButton.setText(_translate("MainWindow", "开始导出")) def input_file_path(self): file_path = QtWidgets.QFileDialog.getOpenFileName(None,"选取文件","./","All Files (*.xlsx;*.xls);;Text Files (*.txt)") self.inputFileEdit.setText(file_path[0]) self.inputFileEdit.setStyleSheet("color:black;") def out_save_location(self): fname = QtWidgets.QFileDialog.getExistingDirectory(None, '选取文件夹', './') self.savePathEdit.setText(fname) self.savePathEdit.setStyleSheet("color:black;") def on_click(self): file_path = self.inputFileEdit.text() save_path = self.savePathEdit.text() start_time = self.startTimeEdit.dateTime().toPyDateTime() end_time = self.endTimeEdit.dateTime().toPyDateTime() set_plat = self.platBox.currentText() set_stm8vs = self.stm8vsEdit.text() set_dver = self.dverEdit.text() set_order = self.orderEdit.text() set_product = self.productBox.currentText() set_power = self.powerSupplyBox.currentText() if not all ([file_path,save_path,start_time,end_time,set_stm8vs,set_dver,set_order]): QtWidgets.QMessageBox.information(None, "提示", "选项未填写完整") elif end_time < start_time: QtWidgets.QMessageBox.information(None, "提示", "结束日期应在开始日期之后") else: xls = open_workbook(file_path) sheet_object = xls.sheets()[0] ncols = sheet_object.ncols read_dict = {} for i in range(ncols): col_value = sheet_object.col_values(i) read_dict[col_value[0]] = col_value[1:] device_list = read_dict.get("设备ID") if device_list: self.inputFileTool.setEnabled(False) self.savePathTool.setEnabled(False) self.startTimeEdit.setEnabled(False) self.endTimeEdit.setEnabled(False) self.platBox.setEnabled(False) self.productBox.setEnabled(False) self.powerSupplyBox.setEnabled(False) self.stm8vsEdit.setEnabled(False) self.dverEdit.setEnabled(False) self.orderEdit.setEnabled(False) self.pushButton.setEnabled(False) self.pushButton.setText("执行中...") self.runThread = CBDThread(device_list,save_path,start_time,end_time,set_plat,set_stm8vs,set_dver,set_order, set_product, set_power) self.runThread.proess_signal.connect(self.set_progressbar_value) self.runThread.start() self.progressBar.setVisible(True) else: QtWidgets.QMessageBox.information(None, "提示", "输入文件无'设备ID'列或该列无数据") def set_progressbar_value(self, value): self.progressBar.setValue(value) if value == 100: QtWidgets.QMessageBox.information(None, "提示", "文件导出完毕!导出文件名:\n{}".format(save_filename)) self.inputFileTool.setEnabled(True) self.inputFileEdit.setText("") self.savePathTool.setEnabled(True) self.savePathEdit.setText("") self.startTimeEdit.setEnabled(True) self.endTimeEdit.setEnabled(True) self.platBox.setEnabled(True) self.productBox.setEnabled(True) self.powerSupplyBox.setEnabled(True) self.stm8vsEdit.setEnabled(True) self.stm8vsEdit.setText("") self.dverEdit.setEnabled(True) self.dverEdit.setText("") self.orderEdit.setEnabled(True) self.orderEdit.setText("") self.pushButton.setEnabled(True) self.pushButton.setText("开始导出") self.progressBar.setVisible(False) self.progressBar.setValue(0) return class CBDThread(QtCore.QThread): """涉及进度条需主界面动态执行GUI,线程执行业务逻辑,避免主页面卡死""" proess_signal = QtCore.pyqtSignal(int) def __init__(self,device_list,save_path,start_time,end_time,set_plat,set_stm8vs,set_dver,set_order, set_product, set_power): super(CBDThread, self).__init__() self.device_list = device_list self.save_path = save_path self.start_time = time.mktime(start_time.timetuple()) self.end_time = time.mktime(end_time.timetuple()) # self.start_time = 1689737400 # self.end_time = 1689753600 self.start_time_str = start_time.strftime("%y-%m-%d %H:%M:%S") self.end_time_str = end_time.strftime("%y-%m-%d %H:%M:%S") self.set_plat = set_plat self.set_stm8vs = set_stm8vs self.set_dver = set_dver self.set_order = set_order self.set_product = set_product self.set_power = set_power self.user = parse.quote_plus("root") self.passwd = parse.quote_plus("yfkj@6020") self.myclient = pymongo.MongoClient("mongodb://{0}:{1}@8.136.98.49:57017/".format(self.user,self.passwd)) self.db = self.myclient.smartfarming self.device_collection = self.db.sa_device self.cbd_collection = self.db.sa_device_cbd_data self.cbd_photo = self.db.sa_device_cbdphoto self.cbd_alerm = self.db.sa_alarm_record self.device_config = self.db.sa_device_config self.config = { 'host': '120.27.222.26', 'port': 3306, 'user': 'yfwlw', 'password': 'sql_yfkj_6019', 'db': 'yfwlw', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor, } self.connection = pymysql.connect(**self.config) self.cursor = self.connection.cursor() def get_position(self, device_id): device = self.device_collection.find_one({"device_id": device_id}) province, city, district = device.get("province"), device.get("city"), device.get("district") position_device = f"{province}{city}{district}" if position_device: return position_device lng, lat = device.get("lng"), device.get("lat") if lng and lat: try: ret = requests.post("http://api.map.baidu.com/geocoder?location=%s,%s&coord_type=gcj02&output=json"%(lat,lng)) ret_json = json.loads(ret.text) province, city, district = ret_json["result"]["addressComponent"]["province"], \ ret_json["result"]["addressComponent"]["city"], \ ret_json["result"]["addressComponent"]["district"] return province + city + district except Exception as e: return False else: return False def sim_info(self,iccid): # 时间戳 用于获取sign timestamp = int(time.time()) current_milli_time = lambda: int(round(time.time() * 1000)) data_1 = "appid=%s&iccid=%s×tamp=%s%s"%("102420177762",iccid,current_milli_time(),"6397d7e6a56589f1d93284e9800493e1") sign = hashlib.sha256(data_1.encode('utf-8')).hexdigest() data = {"appid": "102420177762", "iccid": iccid, "timestamp":current_milli_time(),"sign":sign} url = "https://api.simboss.com/2.0/device/detail" try: status = 1 ret = requests.post(url, data=data) code = json.loads(ret.text)["code"] if code == "0": status = 1 else: url = 'http://sim.brlink.cn/api/open/iotcard/card' appkey = "iaO2DKgS8KdlnVgU" appsecret = "qzKgO4sBdzMrjRwv9H22S9ufepNv8Hl5ehPqkYVD31DCICjyKwqUdj7zihQQKfgx" status = 2 ret = requests.post(url,json={'iccid':iccid},auth=HTTPBasicAuth(appkey,appsecret),timeout=(5,10)) codes = json.loads(ret.text)["code"] if codes == 0: status = 2 else: url = "https://jsnl.xmnengjia.com/open/api/module/cards" data = {"iccids":[iccid]} data = json.dumps(data) ret = requests.post(url,data=data,timeout=(10,30)) status = 3 except: status = 0 ret = 0 day = 0 if ret: try: result = json.loads(ret.text) expiry_date = result.get("data", {}).get("expiry_date") now_date = int(time.time()) time_difference = int((expiry_date - now_date) / 3600 / 24) if time_difference < 30: return [1,"有效期剩余{}天".format(time_difference)] elif time_difference >= 181: return [1,"有效期剩余{}天".format(time_difference)] else: return [1,"有效期剩余{}天".format(time_difference)] except: return [1, "查询无结果"] else: return [1, "查询无结果"] def _imageToMatrix(self, image): """ 根据名称读取图片对象转化矩阵 :param strName: :return: 返回矩阵 """ imgMat = np.matrix(image) return imgMat def preImgOps(self, img): """ 图像的预处理操作 :param img: 图像的而明朝 :return: 灰度化和resize之后的图片对象 """ # 预处理操作 try: reImg = cv2.resize(img, (300, 400), interpolation=cv2.INTER_CUBIC) # img2gray = cv2.cvtColor(reImg, cv2.COLOR_BGR2GRAY) # 将图片压缩为单通道的灰度图 return reImg,img2gray, True except Exception as e: return None, None, False def ReadImage(self, im_file): """ 读取图片 :param im_file: 图像的路径 :return: 利用opencv读取的完整图像img 托虫板内接四边形图像cutimg 内接四边形的坐标c[xmin,ymin,xmax,ymax] """ try: img = cv2.imdecode(np.fromfile(im_file, dtype=np.uint8), cv2.IMREAD_COLOR) ImgShape = img.shape center = [ImgShape[1]/2,ImgShape[0]/2] #圆盘中心 radius = int(ImgShape[0]/2) #圆盘半径 length = pow(2,0.5)*radius #圆的内接正方形边长 Size = int(length/2) xmin = int(center[0] - Size) xmax = int(center[0] + Size) ymin = int((center[1] - Size)*1.1) ymax = int(center[1] + Size) c = [xmin,ymin,xmax,ymax] cutimg = img[ymin:ymax,xmin:xmax] return img,cutimg,c, True except Exception as e: return None, None, None, False def getFileName(self, imageFile): url, FileName = os.path.split(imageFile) tamp = [] tamp.append(FileName) return tamp #图像清晰度检测 def predict(self, imageFile): """ 图像清晰度检测 Args: imageFile(dir or file):单张图片或者存放图片的文件夹 """ result = [] #存放清晰度评分 imgList = [] #存放识别后图片 if os.path.isdir(imageFile): imageFileList = [os.path.join(imageFile,x) for x in os.listdir(imageFile)] for image in imageFileList: img,cutimg,c, is_get = self.ReadImage(image) if is_get: reImg, img2gray, is_imgs= self.preImgOps(cutimg) if is_imgs: f = self._imageToMatrix(img2gray) tmp = filters.sobel(f) source=np.sum(tmp**2) source=np.sqrt(source) result.append(source) imgList.append(img) elif os.path.isfile(imageFile): img,cutimg,c, is_get = self.ReadImage(imageFile) if is_get: reImg, img2gray, is_imgs= self.preImgOps(cutimg) if is_imgs: f = self._imageToMatrix(img2gray) tmp = filters.sobel(f) source=np.sum(tmp**2) source=np.sqrt(source) result.append(source) imgList.append(img) return result def __bigdata_iamge_verify(self, device_id): # 对图片质量进行打分并求平均值 scores_time = {} photo_obj = self.cbd_photo.find({"device_id":str(device_id), 'addtime': {"$gte": int(self.start_time) - 3600 * 4,"$lte":self.start_time}}) for p in photo_obj: photo_addr = p.get("addr") addtime = p.get("addtime") # 把图片下载到本地 local_dir = "org_image" file_name = photo_addr.split("/")[-1] file_name = file_name.replace("?", "") os.makedirs(local_dir) if not os.path.exists(local_dir) else None print(photo_addr, "----------img----------") # response = requests.get(f"https://bigdata-image.oss-cn-hangzhou.aliyuncs.com/Basics/cbd/{photo_addr}") if "ftp" in photo_addr: response = requests.get(photo_addr) else: response = requests.get(f"http://8.136.98.49:8003/Basics/cbd/{photo_addr}") with open(os.path.join(local_dir, file_name), 'wb') as f: f.write(response.content) result_score = self.predict(os.path.join(local_dir, file_name)) print(result_score) if result_score: avg_score = sum(result_score)/len(result_score) scores_time[str(addtime)] = avg_score try: os.remove(os.path.join(local_dir, file_name)) except: pass try: shutil.rmtree(local_dir) except: pass print(scores_time, "----------------------") return scores_time def __siqing_image_verify(self, device_id): # 对图片质量进行打分并求平均值 scores_time = {} sql = "SELECT * FROM AppInfoManage_cbdphoto WHERE equip_id_id='{}' AND upl_time BETWEEN '{}' AND '{}' order by 'alarm_time';".format(device_id,self.start_time_str,self.end_time_str) self.cursor.execute(sql) photo_obj = self.cursor.fetchall() for p in photo_obj: photo_addr = p.get("addr") addtime = p.get("addtime") # 把图片下载到本地 local_dir = "org_image" file_name = photo_addr.split("/")[-1] os.makedirs(local_dir) if not os.path.exists(local_dir) else None response = requests.get(photo_addr) with open(os.path.join(local_dir, file_name), 'wb') as f: f.write(response.content) result_score = self.predict(os.path.join(local_dir, file_name)) avg_score = sum(result_score)/len(result_score) scores_time[str(addtime)] = avg_score try: os.remove(os.path.join(local_dir, file_name)) except: pass return scores_time def mongo_ping(self): """mongo-ping预防连接失效""" try: self.myclient.admin.command('ping') except: # "ConnectionFailure" self.myclient = pymongo.MongoClient("mongodb://{0}:{1}@8.136.98.49:57017/".format(self.user,self.passwd)) self.db = self.myclient.smartfarming self.device_collection = self.db.sa_device self.cbd_collection = self.db.sa_device_cbd_data self.cbd_photo = self.db.sa_device_cbdphoto self.cbd_alerm = self.db.sa_alarm_record def sql_ping(self): """mysql-ping 预防连接失效""" try: self.connection.ping() except: self.connection = pymysql.connect(**self.config) self.cursor = self.connection.cursor() def __time_dif(self,checkdatetime): """计算时间差""" nowdatetime = datetime.datetime.now() checkdatetime = datetime.datetime.strptime(checkdatetime, "%Y-%m-%d %H:%M:%S") timedif = checkdatetime - nowdatetime return timedif.days def device_their_platform(self,shortId): """确定设备所在平台已经完整设备号""" self.mongo_ping() self.sql_ping() regex = re.compile('.*{}$'.format(shortId)) bd_device_dict = self.device_collection.find_one( filter = {"device_id":regex,"device_type_id":3}, projection = {'_id': 0}, sort = [('uptime', pymongo.DESCENDING)] ) device_sql = "SELECT * FROM AppInfoManage_cbdstatus WHERE equip_id_id LIKE '%{}' ORDER BY upl_time DESC LIMIT 1;".format(shortId) self.cursor.execute(device_sql) sq_device_dict = self.cursor.fetchone() if bd_device_dict and sq_device_dict: bd_upltime = bd_device_dict["uptime"] sq_upltime = time.mktime(sq_device_dict["upl_time"].timetuple()) if bd_upltime >= sq_upltime: d_id = bd_device_dict["id"] deviceId = bd_device_dict["device_id"] platform = "大数据平台" else: d_id = "" deviceId = sq_device_dict["equip_id_id"] platform = "四情平台" elif bd_device_dict and not sq_device_dict: d_id = bd_device_dict["id"] deviceId = bd_device_dict["device_id"] platform = "大数据平台" return d_id,deviceId,platform elif not bd_device_dict and sq_device_dict: d_id = "" deviceId = sq_device_dict["equip_id_id"] platform = "四情平台" else: d_id = "" deviceId = "平台无此设备" platform = "未知" return d_id,deviceId,platform def eliminate_adjacent_duplicates(self, string): result = '' prev_char = None for char in string: if char != prev_char: result += char prev_char = char return result def __bigdata_verify(self,data_cursor,data_status,device_id_info, image_score, proess, data_temps, position, tt): """ 获取大数据平台查询结果 1. 合格 2. 不合格 """ current_jg = [1,""] # 振动电机工作状态下电流匹配度 up_jg = [1,"无电流不足信息"] # 上仓门电机电机工作状态 down_jg = [1,"无电流不足信息"] # 下仓门电机电机工作状态 lig_jg = [1,"无电流不足信息"] # 灯管工作状态下 machine_jg = [1, "无电流不足信息"]# 清扫/转盘电机工作状态 camera_jg = [1, "无电流不足信息"] # 相机工作状态 current_jg_ct, up_jg_ct, down_jg_ct, lig_jg_ct, camera_jg_ct, machine_ct = 0,0,0,0,0,0 for alerm in data_status: alarm_time = datetime.datetime.fromtimestamp(alerm.get("alarm_time")).strftime("%Y-%m-%d %H:%M:%S") a = alerm.get("alarm_desc", "") if a: a = json.loads(a) if a.get("type", "") == "震动电机": if "电流不足" in a.get("status"): current_jg_ct += 1 elif a.get("type", "") == "上仓门电机": if "电流不足" in a.get("status"): up_jg_ct += 1 elif a.get("type", "") == "下仓门电机": if "电流不足" in a.get("status"): down_jg_ct += 1 elif a.get("type", "") == "灯管": if "电流不足" in a.get("status"): lig_jg_ct += 1 elif a.get("type", "") in ["清扫电机", "转盘电机"]: if "电流不足" in a.get("status"): machine_ct += 1 elif a.get("type", "") == "相机": print(a, "电流不足", alarm_time) if "电流不足" in a.get("status"): camera_jg_ct += 1 # if current_jg_ct: # current_jg = [1,""] # if up_jg_ct: # up_jg = [0,f"有电流不足信息{str(up_jg_ct)}条"] # if down_jg_ct: # down_jg = [0,f"有电流不足信息{str(down_jg_ct)}条"] # if lig_jg_ct: # lig_jg = [0,f"有电流不足信息{str(lig_jg_ct)}条"] # if camera_jg_ct: # camera_jg = [0,f"有电流不足信息{str(camera_jg_ct)}条"] # if machine_ct: # machine_jg = [0,f"有电流不足信息{str(machine_ct)}条"] iccid = "" lng_lst = [] lat_lst = [] work_status = [] # 工作状态切换 elec_vs_lst = [] # 电路板固件版本号 v_vs_lst = [] # 电压 tr_k_jg_lst = [] # 温控 rain_k_jg_lst = []# 雨控 infor_jg_count = 0 # 上报信息条数 heat_temp_lst = []# 加热仓温度 temp_hum = [] # 环境温度 环境湿度 total_time = [] # 总时长 start_time = 0 information_count = [] # 上报时间 infor_lamp = {} dver_lst = [] for c in data_temps: device_c = c.get("device_data", "") if device_c: device_c = ast.literal_eval(device_c) tp = device_c.get("tps", "") if tp == "1": tr_k_jg_lst.append(tp) for index, cur in enumerate(data_cursor): start_time = int(cur.get("addtime", 0)) device_data = cur.get("device_data", "") if device_data: device_data = ast.literal_eval(device_data) iccid = device_data.get("iccid", "") lamp = device_data.get("lamp") if lamp == "0": infor_lamp[str(start_time)] = lamp information_count.append(start_time) lng = device_data.get("lng", "") lat = device_data.get("lat", "") try: lng_lst.append(float(lng)) lat_lst.append(float(lat)) except Exception as e: lng_lst.append(0) lat_lst.append(0) work_status.append(device_data.get("ws", "")) elec_vs_lst.append(device_data.get("dver", "")) v_vs_lst.append(float(device_data.get("vbat", 0))) rain_k_jg_lst.append(device_data.get("rps", "")) infor_jg_count += 1 heat_temp_lst.append(float(device_data.get("hrt", 0))) temp_hum.append({"at": device_data.get("at", ""), "ah": device_data.get("ah", "")}) if self.set_power == "AC": total_time.append(int(device_data.get("tt"))) elif self.set_power == "DC": st = device_data.get("st") # 19:00 et = device_data.get("et") # 08:00 st = int(st.split(":")[0]) et = int(et.split(":")[0]) if st > et: total = 24 - st + et else: total = et - st total_time.append(total) dver = device_data.get("dver", "") if dver: dver_lst.append(dver) # 检查卡号及有效期 if self.set_product != "水稻测报灯": card_jg = self.sim_info(iccid) else: card_jg = [1, "合格"] # 经度 lng_pass = [i for i in lng_lst if 113.76194444 >= i or i >= 113.77861111] if len(lng_pass): lng_jg = [4,f"{str(min(lng_lst))}~{str(max(lng_lst))},不符合条件的数据有{str(len(lng_pass))}条"] else: lng_jg = [1, f"{str(min(lng_lst))}~{str(max(lng_lst))}"] # 纬度 35.02083333,35.0375 lat_pass = [i for i in lat_lst if 35.02083333 >=i or i >= 35.0375] if len(lat_pass): lat_jg = [4,f"{str(min(lat_lst))}~{str(max(lat_lst))},不符合条件的数据有{str(len(lat_pass))}条"] else: lat_jg = [1, f"{str(min(lat_lst))}~{str(max(lat_lst))}"] # 工作状态切换 work_status_str = "".join(work_status) result = self.eliminate_adjacent_duplicates(work_status_str) count_01 = result.count("01") count_10 = result.count("10") ww = min([count_01, count_10]) if ww >= 1: work_jg = [1, f"待机/工作转换{str(ww)}次"] else: work_jg = [0, f"无待机/工作转换"] # 电路板输入电压 v_vs_pass = [i for i in v_vs_lst if 22 > i or i > 30] if len(v_vs_pass): v_vs = [0, f"{str(min(v_vs_lst))}~{str(max(v_vs_lst))},不合格数据3条"] else: v_vs = [1, f"{str(min(v_vs_lst))}~{str(max(v_vs_lst))}"] # 温控 if "1" in tr_k_jg_lst: tr_k_jg = [1, f"上报被温控数据{str(tr_k_jg_lst.count('1'))}条"] else: tr_k_jg = [0, f"无数据上报"] # 雨控 if self.set_product != "水稻测报灯": if "1" in rain_k_jg_lst: rain_k_jg = [1, f"被雨控有{str(rain_k_jg_lst.count('1'))}条数据"] else: rain_k_jg = [0, "无被雨控数据上报"] else: rain_k_jg = [1, "合格"] # 加热仓温度 heat_temp = [] for h in heat_temp_lst: if 85 <= h <= 105: heat_temp = [1, f"{str(min(heat_temp_lst))}~{str(max(heat_temp_lst))}"] break if not heat_temp: heat_temp = [0, f"{str(min(heat_temp_lst))}~{str(max(heat_temp_lst))}"] # 环境温度 if {"at": "25", "ah": "35"} in temp_hum: en_temp = [0,"35且环境湿度显示25"] else: temp = [float(k.get("at", 0)) for k in temp_hum] en_temp = [1,f"{str(min(temp))}~{str(max(temp))}"] # 环境湿度 if {"at": "35", "ah": "25"} in temp_hum: en_hum = [0,"25且环境温度显示35"] else: temp = [float(k.get("ah", 0)) for k in temp_hum] en_hum = [1,f"{str(min(temp))}~{str(max(temp))}"] # 图片质量 if self.set_product == "4.0测报灯" or self.set_product == "水稻测报灯": sorted_dict = sorted(image_score.items(), key=lambda x: x[1], reverse=True) sorted_dict = sorted_dict[:3] lte = [] score = [] print(sorted_dict, "----------------") for v in sorted_dict: rank = 55 if self.set_product == "水稻测报灯" else 68 if v[1] < rank: lte.append(str(v[1])) score.append(str(round(v[1], 3))) else: score.append(str(round(v[1], 3))) if sorted_dict: if lte: img_jg = [0, ",".join(score)] else: img_jg = [1, ",".join(score)] else: img_jg = [0, "输入开始时间前4h内无图片"] else: img_jg = [1, ""] # 上报信息条数 interval = [abs(information_count[i+1] - information_count[i]) for i in range(len(information_count) - 1)] k = 0 for i in interval: if i < 5 * 60: k += 1 if len(information_count) >= 22 and k <= 3: infor_jg_count = [1, f"数据条数为{str(len(information_count))},其中有小于5分钟的数据间隔{str(k)}条"] elif len(information_count) >= 22 and k >=4: infor_jg_count = [0, f"数据条数{str(len(information_count))},其中有小于5分钟的数据间隔{str(k)}条"] else: infor_jg_count = [0, f"数据条数{str(len(information_count))},其中有小于5分钟的数据间隔{str(k)}条"] # 电路板固件版本号 RTU固件版本号 rtus = [] gujians = [] for d in dver_lst: rtu = d.split("-")[-1] gujian = d.replace(rtu, "").replace("-", "") gujians.append(gujian) rtus.append(rtu) rtus = list(set(rtus)) gujians = list(set(gujians)) if len(gujians) == 1 and gujians[0] == self.set_stm8vs: gujian_num = [1, self.set_stm8vs] else: if len(gujians) == 1: gujian_num = [0, gujians[0]] else: gujian_num = [0, ",".join(gujians)] if len(rtus) == 1 and rtus[0] == self.set_dver: rtu_num = [1, self.set_dver] else: if len(rtus) == 1: rtu_num = [0, rtus[0]] else: rtu_num = [0, ",".join(rtus)] # 工作时长 判定 if self.set_power == "DC": # total_time 去重 total_time = list(set(total_time)) if len(total_time) == 1 and total_time[0] == 4: # if tt == "4": time_jg = [1, "4"] else: time_jg = [0, "不合格"] else: time_jg = [0, "不合格"] # power_pass = [] # for i in total_time: # if i not in power_pass: # power_pass.append(i) # if len(power_pass) == 1 and power_pass[0] == 4: # time_jg = [1, "4"] # else: # time_jg = [0, ",".join([str(i) for i in power_pass])] # if self.set_power == "AC": # power_pass = [] # for i in total_time: # if i not in power_pass: # power_pass.append(i) # if len(power_pass) == 1 and power_pass[0] == 9: # time_jg = [1, "9"] # else: # time_jg = [0, ",".join([str(i) for i in power_pass])] if position == "河南省新乡市原阳县": position_cg = [1, position] else: position_cg = [0, position] return [ device_id_info, [1, self.set_order], card_jg, lng_jg, lat_jg, work_jg, gujian_num, rtu_num, v_vs, tr_k_jg, rain_k_jg, infor_jg_count, heat_temp, en_temp, en_hum, img_jg, # current_jg, # up_jg, # down_jg, # lig_jg, # machine_jg, # camera_jg, time_jg, position_cg ] def __siqing_verify(self, data_result,status_result, status_all_result, alarm_result, device_id_info, image_score, process): """获取四情平台查询结果""" current_jg = [1,""] # 振动电机工作状态下电流匹配度 up_jg = [1,"无电流不足信息"] # 上仓门电机电机工作状态 down_jg = [1,"无电流不足信息"] # 下仓门电机电机工作状态 lig_jg = [1,"无电流不足信息"] # 灯管工作状态下 machine_jg = [1, "无电流不足信息"]# 电机工作状态 camera_jg = [1, "无电流不足信息"] # 相机工作状态 time_jg = [1, "合格"] # 工作时长 current_jg_ct, up_jg_ct, down_jg_ct, lig_jg_ct, camera_jg_ct = 0,0,0,0,0 if alarm_result: for alerm in alarm_result: a = alerm.get("alarm_desc", "") if a: try: a = eval(a) except Exception as e: a = json.loads(a) if a.get("type", "") == "震动电机": if a.get("status") == "电流不足": current_jg_ct += 1 elif a.get("type", "") == "上仓门电机": if "电流不足" in a.get("status"): up_jg_ct += 1 elif a.get("type", "") == "下仓门电机": if "电流不足" in a.get("status"): down_jg_ct += 1 elif a.get("type", "") == "灯管": if a.get("status") == "电流不足": lig_jg_ct += 1 elif a.get("type", "") == "相机": if a.get("status") == "电流不足": camera_jg_ct += 1 if current_jg_ct: current_jg = [1,""] if up_jg_ct: up_jg = [0,f"有电流不足信息{str(up_jg_ct)}条"] if down_jg_ct: down_jg = [0,f"有电流不足信息{str(down_jg_ct)}条"] if lig_jg_ct: lig_jg = [0,f"有电流不足信息{str(lig_jg_ct)}条"] if camera_jg_ct: camera_jg = [0,f"有电流不足信息{str(camera_jg_ct)}条"] lng_lst = [] # 经度 lat_lst = [] # 纬度 tr_k_jg_lst = [] # 温控 rain_k_jg_lst = []# 雨控 infor_jg_count = 0 # 上报信息条数 heat_temp_lst = []# 加热仓温度 temp_hum = [] # 环境温度 环境湿度 total_time = [] # 总时长 information_count = [] # 上报时间 v_vs_lst = [] # 电压 for index, cur in enumerate(data_result): upl_time_str = cur.get("upl_time", "0") upl_time_int = int(time.mktime(upl_time_str.timetuple())) information_count.append(upl_time_int) device_data = cur.get("cbd_data", "") if device_data: device_data = ast.literal_eval(device_data) lng = device_data.get("lng", "") lng_lst.append(float(lng)) lat = device_data.get("lat", "") lat_lst.append(float(lat)) v_vs_lst.append(float(device_data.get("vbat", "0"))) tr_k_jg_lst.append(device_data.get("tps", "")) rain_k_jg_lst.append(device_data.get("rps", "")) infor_jg_count += 1 heat_temp_lst.append(float(device_data.get("hrt", 0))) temp_hum.append({"at": device_data.get("at", ""), "ah": device_data.get("ah", "")}) if self.set_power == "AC": total_time.append(device_data.get("tt")) elif self.set_power == "DC": st = device_data.get("st", "00:00") et = device_data.get("et", "00:00") st = int(st.split(":")[0]) et = int(et.split(":")[0]) if st > et: total = 24 - st + et else: total = et - st total_time.append(total) # 经度 lng_pass = [i for i in lng_lst if 113.76194444 >= i or i >= 113.77861111] if len(lng_pass): lng_jg = [0,f"{str(min(lng_lst))}~{str(max(lng_lst))},不符合条件的数据有{str(len(lng_pass))}条"] else: lng_jg = [1, f"{str(min(lng_lst))}~{str(max(lng_lst))}"] # 纬度 35.02083333,35.0375 lat_pass = [i for i in lat_lst if 35.02083333 >=i or i >= 35.0375] if len(lat_pass): lat_jg = [0,f"{str(min(lat_lst))}~{str(max(lat_lst))},不符合条件的数据有{str(len(lat_pass))}条"] else: lat_jg = [1, f"{str(min(lat_lst))}~{str(max(lat_lst))}"] # 电路板输入电压 v_vs_pass = [i for i in v_vs_lst if 22 >= i or i >= 30] if len(v_vs_pass): v_vs = [0, f"{str(min(v_vs_lst))}~{str(max(v_vs_lst))},不合格数据3条"] else: v_vs = [1, f"{str(min(v_vs_lst))}~{str(max(v_vs_lst))}"] # 温控 if "1" in tr_k_jg_lst: tr_k_jg = [1, f"上报被温控数据{str(tr_k_jg_lst.count('1'))}条"] else: tr_k_jg = [0, f"无数据上报"] # 雨控 if self.set_product == "水稻测报灯": rain_k_jg = [1, "合格"] else: if "1" in rain_k_jg_lst: rain_k_jg = [1, f"被雨控有{str(rain_k_jg_lst.count('1'))}条数据"] else: rain_k_jg = [0, "无被雨控数据上报"] # 加热仓温度 heat_temp = [] for h in heat_temp_lst: if 85 <= h <= 105: heat_temp = [1, f"{str(min(heat_temp_lst))}~{str(max(heat_temp_lst))}"] break if not heat_temp: heat_temp = [0, f"{str(min(heat_temp_lst))}~{str(max(heat_temp_lst))}"] # 环境温度 if {"at": "25", "ah": "35"} in temp_hum: en_temp = [0,"35且环境湿度显示25"] else: temp = [float(k.get("at", 0)) for k in temp_hum] en_temp = [1,f"{str(min(temp))}~{str(max(temp))}"] # 环境湿度 if {"at": "35", "ah": "25"} in temp_hum: en_hum = [0,"25且环境温度显示35"] else: temp = [float(k.get("ah", 0)) for k in temp_hum] en_hum = [1,f"{str(min(temp))}~{str(max(temp))}"] # 图片质量 img_pass = [] img_score = [] for k, v in image_score.items(): if v > 50: img_score.append(v) if v < 68: img_pass.append((datetime.datetime.fromtimestamp(int(k))).strftime("%Y-%m-%d %H:%M:%S")) if img_score: if img_pass: img_jg = [0, f"{str(min(img_score))}~{str(max(img_score))}, 不合格图片{str(len(img_pass))}幅, 上报时间为{','.join(img_pass)}"] else: img_jg = [1, f"{str(min(img_score))}~{str(max(img_score))}"] else: img_jg = [0, "图片得分均在50以下"] # 上报信息条数 interval = [abs(information_count[i+1] - information_count[i]) for i in range(len(information_count) - 1)] k = 0 for i in interval: if i < 5 * 60: k += 1 if len(information_count) >= 22 and k == 0: infor_jg_count = [1, f"数据条数为{str(len(information_count))},无小于5分钟的数据间隔"] elif len(information_count) >= 22 and k: infor_jg_count = [0, f"数据条数{str(len(information_count))},其中有小于5分钟的数据间隔{str(k)}条"] else: infor_jg_count = [0, f"数据条数{str(len(information_count))}"] iccid = "" work_status = [] # 工作状态切换 elec_vs_lst = [] # 电路板固件版本号 for index, cur in enumerate(status_all_result): device_data = cur.get("cbd_status", "") if device_data: device_data = ast.literal_eval(device_data) iccid = device_data.get("iccid", "") work_status.append(str(device_data.get("ws", ""))) elec_vs_lst.append(device_data.get("dver", "")) # 检查卡号及有效期 card_jg = self.sim_info(iccid) # 工作状态切换 work_status_str = "".join(work_status) wr = work_status_str.count("01") if wr > 4: work_jg = [1, f"待机/工作转换{str(wr)}次"] else: work_jg = [0, f"待机/工作转换{str(wr)}次"] # 电路板固件版本号 RTU固件版本号 rtus = [] gujians = [] for d in elec_vs_lst: rtu = d.split("-")[-1] gujian = d.replace(rtu, "").replace("-", "") # if gujian == self.set_stm8vs: gujians.append(gujian) # if rtu == self.set_dver: rtus.append(rtu) rtus = list(set(rtus)) gujians = list(set(gujians)) if len(gujians) == 1 and gujians[0] == self.set_stm8vs: gujian_num = [1, self.set_stm8vs] else: if len(gujians) == 1: gujian_num = [0, gujians[0]] else: gujian_num = [0, ",".join(gujians)] if len(rtus) == 1 and rtus[0] == self.set_dver: rtu_num = [1, self.set_dver] else: if len(rtus) == 1: rtu_num = [0, rtus[0]] else: rtu_num = [0, ",".join(rtus)] # 工作时长 判定 if self.set_power == "DC": power_pass = [] for i in total_time: if i not in power_pass: power_pass.append(i) if len(power_pass) == 1 and power_pass[0] == 4: time_jg = [1, "4"] else: time_jg = [0, ",".join([str(i) for i in power_pass])] if self.set_power == "AC": power_pass = [] for i in total_time: if i not in power_pass: power_pass.append(i) if len(power_pass) == 1 and power_pass[0] == 9: time_jg = [1, "9"] else: time_jg = [0, ",".join([str(i) for i in power_pass])] return [ device_id_info, [1, self.set_order], card_jg, lng_jg, lat_jg, work_jg, gujian_num, rtu_num, v_vs, tr_k_jg, rain_k_jg, infor_jg_count, heat_temp, en_temp, en_hum, current_jg, up_jg, down_jg, lig_jg, img_jg, machine_jg, camera_jg, time_jg ] def run(self): """主业务逻辑,涉及进度条不能模块化,慢慢捋""" proess = 0 now_time = datetime.datetime.now() time_stamp = str(int(datetime.datetime.timestamp(now_time))) global save_filename save_filename = self.set_order + "_" + now_time.strftime("%m%d") + f"测报灯检验-{time_stamp}.xlsx" save_path = os.path.join(self.save_path,save_filename) workbook = Workbook(save_path) worksheet = workbook.add_worksheet() worksheet.set_row(0, 30) worksheet.set_row(1, 35) worksheet.set_row(2, 50) worksheet.set_row(3, 80) for i in range(len(self.device_list)): worksheet.set_row(i+4, 15) worksheet.set_column(0, 26, 12) toji_style = workbook.add_format(toji_format) title_style = workbook.add_format(title_format) explain_style = workbook.add_format(explain_formal) formal_style = workbook.add_format(formal_format) error_style = workbook.add_format(error_format) common_style = workbook.add_format(common_format) merge_title_style = workbook.add_format(merge_title_format) worksheet.merge_range('A1:Y1',"物联网测报灯在线检验原始表格",merge_title_style) # "振动电机工作状态下\n电流匹配度", # "上仓门电机电机工作状态下 \n 电流匹配度","下仓门电机工作状态下 \n电流匹配度","灯管工作状态下 \n电流匹配度","清扫电机/转盘电机工作状态下的 \n 最大电流","相机工作状态下 \n电流匹配度", title_data = [ "文档ID","设备ID", "任务单号", "卡号及有效期","经度","纬度","工作状态切换", "电路板固件版本号","RTU固件版本号", "电路板输入电压","温控","雨控","上报信息条数","加热仓温度","环境温度","环境湿度","图片质量", "工作时长", "位置", "综合判定" ] worksheet.write_row(row = 2 ,col = 0, data = title_data,cell_format=title_style) explain_data = [ "/", "后台显示与输入一致为合格,否则为不合格", "按照输入文件的任务单为依据复制到此列", "有显示卡号及有效期即为合格,否则为不合格", "合格条件(绿色):\n113°46′13″±30″范围内,转为十进度为:(113.76194444,113.77861111)区间内", "合格条件(绿色):\n35°1′45″±30″范围内,转为十进度为:(35.02083333,35.0375)区间内", "合格条件(绿色):\n测试时间段内有待机/工作且出现大于等于4次以上循环", "合格条件(绿色):\n后台上报结果与输入标准对照,相符则合格,其余则不合格", "合格条件(绿色):\n后台上报结果与输入标准对照,相符则合格,其余则不合格", "合格条件(绿色):\n电路板输入电压:22~30V", "合格条件(绿色):\n测试时间段之前至少有一条被温控数据上报", "合格条件(绿色):\n测试时间段内至少有一条被雨控数据上报", "合格条件(绿色):\n1、“工作”的条数大于22条,2、在“工作”的前提下,两条数据间隔小于等于5分钟的次数不能高于3条,两项全部符合为合格,否则不合格", "合格条件(绿色):\n上传的加热温度包含但不局限于85~105°之间的值", "合格条件(绿色):\n等于35且环境温度等于25的前提下为不合格,其余显示的任何值为合格", "合格条件(绿色):\n等于25度且环境湿度等于35的前提下为不合格,其余显示的任何值为合格", "合格条件(绿色):\n对自动检查前上传的图片质量进行打分,取前三个图片分数,如果有一个小于68为不合格,否则合格", # "合格条件(绿色):\n在工作时间段内没有接收到“电流不足”的信息为合格,否则不合格", # "合格条件(绿色):\n在工作时间段内没有接收到“电流不足”的信息为合格,否则不合格", # "合格条件(绿色):\n在工作时间段内没有接收到“电流不足”的信息为合格,否则不合格", # "合格条件(绿色):\n在工作时间段内没有接收到“电流不足”的信息为合格,否则不合格", # "合格条件(绿色):\n在工作时间段内没有接收到“电流不足”的信息为合格,否则不合格", # "合格条件(绿色):\n在工作时间段内没有接收到“电流不足”的信息为合格,否则不合格", "合格条件(绿色):\n与供电选择输入对照,(DC为4h,AC为9h)一致判定为合格,否则为不合格", "位置信息(绿色): \n合格条件(绿色), 显示河南省新乡市原阳县为合格,否则不合格", "合格条件(绿色):\nd:r列全部合格为合格,否则为不合格" ] worksheet.write_row(row = 3 ,col = 0, data = explain_data,cell_format=explain_style) formal_counts = 0 verify_list = [] for index, short_id in enumerate(self.device_list): short_id = str(int(short_id)).strip() # 把device_id转换成字符串并去除空格 d_id,deviceId,platform = self.device_their_platform(short_id) # if short_id != deviceId: # device_id_info = [0, "不合格"] # else: device_id_info = [1, deviceId] worksheet.write(index+4,0,short_id,formal_style) if platform == "大数据平台": try: worksheet.write(index+4,1,deviceId,formal_style) self.mongo_ping() data_status = self.cbd_alerm.find({"equip_id": deviceId, "alarm_time": {"$gte":self.start_time ,"$lte":self.end_time}}).sort([('alarm_time', pymongo.DESCENDING)]) data_cursor = self.cbd_collection.find({"device_id":d_id,'addtime': {"$gte":self.start_time ,"$lte":self.end_time}}).sort([('addtime', pymongo.DESCENDING)]) data_temps = self.cbd_collection.find({"device_id":d_id,'addtime': {"$gte":self.start_time - 3600 * 24 ,"$lte":self.start_time}}).sort([('addtime', pymongo.DESCENDING)]) data_counts = data_cursor.count() # 配置 de = self.device_collection.find_one({"device_id":deviceId}) device_config = self.device_config.find_one({"d_id":de.get("id")}) tt = 0 if device_config: device_config_temp = device_config.get("device_config") device_config_temp = eval(device_config_temp) device_config_temp_ext = device_config_temp.get("ext") tt = device_config_temp_ext.get("tt") image_score = self.__bigdata_iamge_verify(d_id) if data_counts == 0: for i in range(1,19): worksheet.write(index+4,i,"搜索时间内无数据",error_style) worksheet.write(index+4,19,"不合格",error_style) danji_verify = [] for i in range(19): danji_verify.append(0) verify_list.append(danji_verify) else: danji_verify = [] result = self.get_position(deviceId) verify_data = self.__bigdata_verify(data_cursor,data_status, device_id_info, image_score, proess, data_temps, result, tt) for clo_index, verify_ in enumerate(verify_data): danji_verify.append(verify_[0]) if verify_[0] == 0: worksheet.write(index+4,clo_index+1,verify_[1],error_style) elif verify_[0] == 1: worksheet.write(index+4,clo_index+1,verify_[1],formal_style) else: worksheet.write(index+4,clo_index+1,verify_[1],common_style) if 4 in danji_verify and (0 not in danji_verify): worksheet.write(index+4,19,"可接受", formal_style) elif 0 in danji_verify: worksheet.write(index+4,19,"不合格", error_style) else: formal_counts += 1 worksheet.write(index+4,19,"合格",formal_style) verify_list.append(danji_verify) except Exception as e: print(e) elif platform == "四情平台": try: worksheet.write(index+4,1,deviceId,formal_style) self.sql_ping() data_sql = "SELECT * FROM AppInfoManage_cbddata WHERE equip_id_id='{}' AND upl_time BETWEEN '{}' AND '{}' order by upl_time;".format(deviceId,self.start_time_str,self.end_time_str) alarm_sql = "SELECT * FROM AppInfoManage_cbd_alarm_record WHERE equip_id_id='{}' AND alarm_time BETWEEN '{}' AND '{}' order by alarm_time;".format(deviceId,self.start_time_str,self.end_time_str) self.cursor.execute(data_sql) data_result = self.cursor.fetchall() self.cursor.execute(alarm_sql) alarm_result = self.cursor.fetchall() status_sql = "SELECT * FROM AppInfoManage_cbdstatus WHERE equip_id_id='{}' AND upl_time BETWEEN '{}' AND '{}' order by upl_time;".format(deviceId,self.start_time_str,self.end_time_str) self.cursor.execute(status_sql) status_result = self.cursor.fetchall() status_all_sql = "SELECT * FROM AppInfoManage_cbdstatus_all WHERE equip_id_id='{}' AND upl_time BETWEEN '{}' AND '{}' order by upl_time;".format(deviceId,self.start_time_str,self.end_time_str) self.cursor.execute(status_all_sql) status_all_result = self.cursor.fetchall() if len(data_result) == 0: # 空白数据填充 for i in range(1,19): worksheet.write(index+4,i,"搜索时间内无数据",error_style) worksheet.write(index+4,19,"不合格",error_style) danji_verify = [] for i in range(19): danji_verify.append(0) verify_list.append(danji_verify) else: image_score = self.__siqing_image_verify(deviceId) verify_data = self.__siqing_verify(data_result,status_result, status_all_result, alarm_result, device_id_info, image_score, process) danji_verify = [] for clo_index, verify_ in enumerate(verify_data): danji_verify.append(verify_[0]) if verify_[0] == 0: worksheet.write(index+4,clo_index+1,verify_[1],error_style) elif verify_[0] == 1: worksheet.write(index+4,clo_index+1,verify_[1],formal_style) else: worksheet.write(index+4,clo_index+1,verify_[1],common_style) if 0 in danji_verify: worksheet.write(index+4,19,"不合格",error_style) else: formal_counts += 1 worksheet.write(index+4,19,"合格",formal_style) verify_list.append(danji_verify) except Exception as e: print(e) pass proess = (index+1)/len(self.device_list) * 100 if int(proess) == 100: self.proess_signal.emit(99) else: self.proess_signal.emit(int(proess)) toji_data = [ "任务单号",self.set_order, "检验时间",'{}\n{}'.format(self.start_time_str,self.end_time_str), "检验数量",len(self.device_list), "合格数",formal_counts, "检验平台",self.set_plat, "主板版本",self.set_stm8vs, "联网版本",self.set_dver, "产品名称", self.set_product, "供电选择", self.set_power, "报告日期",now_time.strftime("%y-%m-%d %H:%M:%S") ] worksheet.write_row(row = 1 ,col = 0, data = toji_data,cell_format=toji_style) worksheet.protect(pwd_str) # 保护视图,不允许修改 workbook.close() self.cursor.close() self.connection.close() self.myclient.close() self.proess_signal.emit(100) if __name__== "__main__": QtWidgets.QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling) QtGui.QGuiApplication.setAttribute(QtCore.Qt.HighDpiScaleFactorRoundingPolicy.PassThrough) app = QtWidgets.QApplication(sys.argv) MainWindow = QtWidgets.QMainWindow() ui = Ui_MainWindow() ui.setupUi(MainWindow) MainWindow.show() sys.exit(app.exec_())