from datetime import datetime import sys import csv from PyQt5.QtCore import Qt from PyQt5.QtWidgets import ( QApplication, QMainWindow, QVBoxLayout, QFileDialog, QLineEdit, QWidget, QPushButton, QMessageBox, QHBoxLayout ) import query_db import json class MainWindow(QMainWindow): def __init__(self): super().__init__() # 获取所有用户名称与UID self.setWindowTitle("导出设备信息") self.resize(500, 400) # 设置窗口布局 layout = QVBoxLayout() center_layout01 = QHBoxLayout() # 添加输入框(QLineEdit),并设置宽度 self.search_input01 = QLineEdit(self) self.search_input01.setPlaceholderText("请输入用户名") self.search_input01.setFixedWidth(300) self.search_input01.setFixedHeight(30) center_layout01.addWidget(self.search_input01) center_layout02 = QHBoxLayout() # 添加输入框(QLineEdit),并设置宽度 self.search_input02 = QLineEdit(self) self.search_input02.setPlaceholderText("请输入任务单号") self.search_input02.setFixedWidth(300) self.search_input02.setFixedHeight(30) center_layout02.addWidget(self.search_input02) # 添加导出按钮 center_layout03 = QHBoxLayout() self.export_button = QPushButton("导出数据", self) self.export_button.setFixedWidth(100) # 设置按钮宽度 center_layout03.addWidget(self.export_button) # 绑定导出按钮点击事件 self.export_button.clicked.connect(self.export_data) center_layout01.setAlignment(Qt.AlignCenter) center_layout02.setAlignment(Qt.AlignCenter) center_layout03.setAlignment(Qt.AlignCenter) layout.addLayout(center_layout01) layout.addLayout(center_layout02) layout.addLayout(center_layout03) # 设置布局 container = QWidget() container.setLayout(layout) container.setFixedHeight(300) self.setCentralWidget(container) def export_data(self): client = query_db.bigata_mongo() sa_device_type = client.sa_device_type sa_device = client.sa_device sa_device_cbd_data = client.sa_device_cbd_data sa_device_bzy_data = client.sa_device_bzy_data sa_device_scd_data = client.sa_device_scd_data sa_device_xy_three_data = client.sa_device_xy_three_data sa_device_xct_data = client.sa_device_xct_data xct_haomi_data = client.xct_haomi_data sa_qxz_base_info = client.sa_qxz_base_info # 先判断是否有输入用户名,任务单号或设备号 search_input01_text = self.search_input01.text() search_input02_text = self.search_input02.text() search = {} if (not search_input01_text) and (not search_input02_text): self.message("请输入用户名称或任务单号") return if search_input01_text: print(search_input01_text) rek = query_db.user_info(search_input01_text) if rek: search.update(rek) else: self.message("请确认用户为普通用户或经销商,或当前输入用户正确无误") return if search_input02_text: search.update({"salesman_task_number": search_input02_text}) result = sa_device.find( search, { "device_id": 1, "device_name":1, "device_status": 1, "device_type_id": 1, "device_model": 1, "salesman_task_number": 1, "province": 1, "city": 1, "district": 1, "addtime": 1, "uptime": 1, "off_time": 1, "simid": 1, "id":1, "_id":0 } ) result = list(result) title = [ "ID", "名称", "状态", "设备类型", "任务单号", "地址", "设备添加时间", "最近更新时间", "离线时间", "图片ICCID", "图片ICCD过期时间", "图片ICCD厂家", "设备ICCID", "设备ICCID过期时间", "设备ICCID厂家" ] if result: device_type_dict = sa_device_type.find({}, {"id": 1, "type_name": 1, "_id": 0}) device_type_dict = {str(i.get("id")): i.get("type_name") for i in list(device_type_dict)} # 弹出文件保存对话框让用户选择保存路径 file_dialog = QFileDialog(self) file_dialog.setAcceptMode(QFileDialog.AcceptSave) file_dialog.setFileMode(QFileDialog.AnyFile) file_dialog.setNameFilter("CSV文件 (*.csv)") file_dialog.selectFile("device.csv") if file_dialog.exec_(): # 获取文件保存路径 file_path = file_dialog.selectedFiles()[0] # 执行导出操作,将设备名称保存为CSV文件 try: with open(file_path, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(title) for row in result: d_id = row.get("id") img_iccid = row.get("simid") if img_iccid: iccid_info = query_db.sim_query_new(img_iccid) else: iccid_info = {} expire = iccid_info.get("expire", "") company = iccid_info.get("company", "") device_type_id = row.get("device_type_id") device_model = row.get("device_model") model = None if device_type_id == 3: if device_model == 11: device_type_name = "虫情信息采集设备" else: device_type_name = "虫情测报灯" model = sa_device_cbd_data elif device_type_id == 7: if device_model == 50: device_type_name = "孢子仪1.0" elif device_model == 52: device_type_name = "孢子仪2.0" else: device_type_name = "孢子仪" model = sa_device_bzy_data elif device_type_id == 2: if device_model == 101: device_type_name = "风吸式杀虫灯" else: device_type_name = "杀虫灯" model = sa_device_scd_data elif device_type_id == 8: device_type_name = device_type_dict.get(str(device_type_id)) model = sa_device_xy_three_data elif device_type_id == 12: device_type_name = device_type_dict.get(str(device_type_id)) model = sa_device_xct_data elif device_type_id == 28: device_type_name = device_type_dict.get(str(device_type_id)) model = xct_haomi_data else: device_type_name = device_type_dict.get(str(device_type_id)) device_status = row.get("device_status") status = "在线" if device_status == 1 else "离线" device_iccid, device_expire, device_company = "", "", "" if device_type_id == 5: qxz = sa_qxz_base_info.find({"device_id": row.get("device_id")}, {"iccid": 1, "_id": 0}).sort("uptime", -1).limit(1) qxz = list(qxz) if qxz: try: device_iccid = qxz[0].get("iccid") device_iccid_info = query_db.sim_query_new(device_iccid) except Exception as e: device_iccid_info = {} device_expire = device_iccid_info.get("expire", "") device_company = device_iccid_info.get("company", "") else: if model: device_data = model.find({"device_id": d_id}, {"device_data":1, "_id":0}).sort("addtime", -1).limit(1) d_data = list(device_data) if d_data: device_data_str = d_data[0].get("device_data") device_data_json = {} if device_data_str: try: device_data_json = eval(device_data_str) except Exception as e: device_data_json = json.loads(device_data_str) device_iccid = device_data_json.get("iccid") if device_iccid: device_iccid_info = query_db.sim_query_new(device_iccid) else: device_iccid_info = {} device_expire = device_iccid_info.get("expire", "") device_company = device_iccid_info.get("company", "") data = [ row.get("device_id"), row.get("device_name"), status, device_type_name, row.get("salesman_task_number"), row.get("province") + row.get("city") + row.get("district"), datetime.fromtimestamp(row.get("addtime")).strftime("%Y-%m-%d %H:%M:%S") if row.get("off_time") != 0 else "", datetime.fromtimestamp(row.get("uptime")).strftime("%Y-%m-%d %H:%M:%S") if row.get("off_time") != 0 else "", datetime.fromtimestamp(row.get("off_time")).strftime("%Y-%m-%d %H:%M:%S") if row.get("off_time") != 0 else "", img_iccid, expire, company, device_iccid, device_expire, device_company ] writer.writerow(data) self.message(f"数据成功导出到 {file_path}") return except Exception as e: self.message(f"导出数据时发生错误: {e}") return else: self.message("未找到符合条件的数据") return def message(self, alert): msg = QMessageBox() msg.setIcon(QMessageBox.Warning) msg.setWindowTitle("请注意!") msg.setText(alert) msg.setStandardButtons(QMessageBox.Ok) msg.exec_() if __name__ == "__main__": app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec_())