||
- from datetime import datetime
- import sys
- import csv
- from PyQt5.QtCore import Qt
- from PyQt5.QtWidgets import (
- QApplication,
- QMainWindow,
- QVBoxLayout,
- QFileDialog,
- QLineEdit,
- QWidget,
- QPushButton,
- QMessageBox,
- QHBoxLayout
- )
- import query_db
- import json
- class MainWindow(QMainWindow):
- def __init__(self):
- super().__init__()
- # 获取所有用户名称与UID
- self.setWindowTitle("导出设备信息")
- self.resize(500, 400)
- # 设置窗口布局
- layout = QVBoxLayout()
- center_layout01 = QHBoxLayout()
- # 添加输入框(QLineEdit),并设置宽度
- self.search_input01 = QLineEdit(self)
- self.search_input01.setPlaceholderText("请输入用户名")
- self.search_input01.setFixedWidth(300)
- self.search_input01.setFixedHeight(30)
- center_layout01.addWidget(self.search_input01)
-
- center_layout02 = QHBoxLayout()
- # 添加输入框(QLineEdit),并设置宽度
- self.search_input02 = QLineEdit(self)
- self.search_input02.setPlaceholderText("请输入任务单号")
- self.search_input02.setFixedWidth(300)
- self.search_input02.setFixedHeight(30)
- center_layout02.addWidget(self.search_input02)
- # 添加导出按钮
- center_layout03 = QHBoxLayout()
- self.export_button = QPushButton("导出数据", self)
- self.export_button.setFixedWidth(100) # 设置按钮宽度
- center_layout03.addWidget(self.export_button)
- # 绑定导出按钮点击事件
- self.export_button.clicked.connect(self.export_data)
- center_layout01.setAlignment(Qt.AlignCenter)
- center_layout02.setAlignment(Qt.AlignCenter)
- center_layout03.setAlignment(Qt.AlignCenter)
- layout.addLayout(center_layout01)
- layout.addLayout(center_layout02)
- layout.addLayout(center_layout03)
- # 设置布局
- container = QWidget()
- container.setLayout(layout)
- container.setFixedHeight(300)
- self.setCentralWidget(container)
- def export_data(self):
- client = query_db.bigata_mongo()
- sa_device_type = client.sa_device_type
- sa_device = client.sa_device
- sa_device_cbd_data = client.sa_device_cbd_data
- sa_device_bzy_data = client.sa_device_bzy_data
- sa_device_scd_data = client.sa_device_scd_data
- sa_device_xy_three_data = client.sa_device_xy_three_data
- sa_device_xct_data = client.sa_device_xct_data
- xct_haomi_data = client.xct_haomi_data
- sa_qxz_base_info = client.sa_qxz_base_info
- # 先判断是否有输入用户名,任务单号或设备号
- search_input01_text = self.search_input01.text()
- search_input02_text = self.search_input02.text()
- search = {}
- if (not search_input01_text) and (not search_input02_text):
- self.message("请输入用户名称或任务单号")
- return
- if search_input01_text:
- print(search_input01_text)
- rek = query_db.user_info(search_input01_text)
- if rek:
- search.update(rek)
- else:
- self.message("请确认用户为普通用户或经销商,或当前输入用户正确无误")
- return
- if search_input02_text:
- search.update({"salesman_task_number": search_input02_text})
- result = sa_device.find(
- search,
- {
- "device_id": 1,
- "device_name":1,
- "device_status": 1,
- "device_type_id": 1,
- "device_model": 1,
- "salesman_task_number": 1,
- "province": 1,
- "city": 1,
- "district": 1,
- "addtime": 1,
- "uptime": 1,
- "off_time": 1,
- "simid": 1,
- "id":1,
- "_id":0
- }
- )
- result = list(result)
- title = [
- "ID",
- "名称",
- "状态",
- "设备类型",
- "任务单号",
- "地址",
- "设备添加时间",
- "最近更新时间",
- "离线时间",
- "图片ICCID",
- "图片ICCD过期时间",
- "图片ICCD厂家",
- "设备ICCID",
- "设备ICCID过期时间",
- "设备ICCID厂家"
- ]
- if result:
- device_type_dict = sa_device_type.find({}, {"id": 1, "type_name": 1, "_id": 0})
- device_type_dict = {str(i.get("id")): i.get("type_name") for i in list(device_type_dict)}
- # 弹出文件保存对话框让用户选择保存路径
- file_dialog = QFileDialog(self)
- file_dialog.setAcceptMode(QFileDialog.AcceptSave)
- file_dialog.setFileMode(QFileDialog.AnyFile)
- file_dialog.setNameFilter("CSV文件 (*.csv)")
- file_dialog.selectFile("device.csv")
- if file_dialog.exec_():
- # 获取文件保存路径
- file_path = file_dialog.selectedFiles()[0]
- # 执行导出操作,将设备名称保存为CSV文件
- try:
- with open(file_path, mode='w', newline='', encoding='utf-8') as file:
- writer = csv.writer(file)
- writer.writerow(title)
- for row in result:
- d_id = row.get("id")
- img_iccid = row.get("simid")
- if img_iccid:
- iccid_info = query_db.sim_query_new(img_iccid)
- else:
- iccid_info = {}
- expire = iccid_info.get("expire", "")
- company = iccid_info.get("company", "")
- device_type_id = row.get("device_type_id")
- device_model = row.get("device_model")
- model = None
- if device_type_id == 3:
- if device_model == 11:
- device_type_name = "虫情信息采集设备"
- else:
- device_type_name = "虫情测报灯"
- model = sa_device_cbd_data
- elif device_type_id == 7:
- if device_model == 50:
- device_type_name = "孢子仪1.0"
- elif device_model == 52:
- device_type_name = "孢子仪2.0"
- else:
- device_type_name = "孢子仪"
- model = sa_device_bzy_data
- elif device_type_id == 2:
- if device_model == 101:
- device_type_name = "风吸式杀虫灯"
- else:
- device_type_name = "杀虫灯"
- model = sa_device_scd_data
- elif device_type_id == 8:
- device_type_name = device_type_dict.get(str(device_type_id))
- model = sa_device_xy_three_data
- elif device_type_id == 12:
- device_type_name = device_type_dict.get(str(device_type_id))
- model = sa_device_xct_data
- elif device_type_id == 28:
- device_type_name = device_type_dict.get(str(device_type_id))
- model = xct_haomi_data
- else:
- device_type_name = device_type_dict.get(str(device_type_id))
- device_status = row.get("device_status")
- status = "在线" if device_status == 1 else "离线"
- device_iccid, device_expire, device_company = "", "", ""
- if device_type_id == 5:
- qxz = sa_qxz_base_info.find({"device_id": row.get("device_id")}, {"iccid": 1, "_id": 0}).sort("uptime", -1).limit(1)
- qxz = list(qxz)
- if qxz:
- try:
- device_iccid = qxz[0].get("iccid")
- device_iccid_info = query_db.sim_query_new(device_iccid)
- except Exception as e:
- device_iccid_info = {}
- device_expire = device_iccid_info.get("expire", "")
- device_company = device_iccid_info.get("company", "")
- else:
- if model:
- device_data = model.find({"device_id": d_id}, {"device_data":1, "_id":0}).sort("addtime", -1).limit(1)
- d_data = list(device_data)
- if d_data:
- device_data_str = d_data[0].get("device_data")
- device_data_json = {}
- if device_data_str:
- try:
- device_data_json = eval(device_data_str)
- except Exception as e:
- device_data_json = json.loads(device_data_str)
- device_iccid = device_data_json.get("iccid")
- if device_iccid:
- device_iccid_info = query_db.sim_query_new(device_iccid)
- else:
- device_iccid_info = {}
- device_expire = device_iccid_info.get("expire", "")
- device_company = device_iccid_info.get("company", "")
- data = [
- row.get("device_id"),
- row.get("device_name"),
- status,
- device_type_name,
- row.get("salesman_task_number"),
- row.get("province") + row.get("city") + row.get("district"),
- datetime.fromtimestamp(row.get("addtime")).strftime("%Y-%m-%d %H:%M:%S") if row.get("off_time") != 0 else "",
- datetime.fromtimestamp(row.get("uptime")).strftime("%Y-%m-%d %H:%M:%S") if row.get("off_time") != 0 else "",
- datetime.fromtimestamp(row.get("off_time")).strftime("%Y-%m-%d %H:%M:%S") if row.get("off_time") != 0 else "",
- img_iccid,
- expire,
- company,
- device_iccid,
- device_expire,
- device_company
- ]
- writer.writerow(data)
- self.message(f"数据成功导出到 {file_path}")
- return
- except Exception as e:
- self.message(f"导出数据时发生错误: {e}")
- return
- else:
- self.message("未找到符合条件的数据")
- return
- def message(self, alert):
- msg = QMessageBox()
- msg.setIcon(QMessageBox.Warning)
- msg.setWindowTitle("请注意!")
- msg.setText(alert)
- msg.setStandardButtons(QMessageBox.Ok)
- msg.exec_()
- if __name__ == "__main__":
- app = QApplication(sys.argv)
- window = MainWindow()
- window.show()
- sys.exit(app.exec_())
|