import json import hashlib import time import traceback import requests from requests.auth import HTTPBasicAuth import asyncio import pymongo import pymysql from urllib import parse class DeviceInfoUtils(object): """ 跨平台异步获取设备信息,调用实例化本类中的get_equip_list() :param d_id: 设备号 :param isfullId:0模糊匹配,1表示完整设备号匹配 :return:列表数据,相同设备号放一起,并且最近更新数据靠前,最近更新数据可认为设备最后所在平台 """ def get_resp(self, url, params): try: res = requests.post(url=url,data=params,timeout=10) res_dict = json.loads(res.content.decode()) except: res_dict = {} return res_dict async def plat_device_info(self, url, data): # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() res = await loop.run_in_executor(None, self.get_resp, url, data) return res def get_equip_list(self, d_id, isfullId=0): bd_url = "http://127.0.0.1:8002/search/equip" sq_url = "http://172.16.220.3/search/equip" if isfullId == 1: data = {"device_id": d_id, "isfullId": "1"} else: data = {"device_id": d_id} loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() bd_task = loop.create_task(self.plat_device_info(bd_url, data)) sq_task = loop.create_task(self.plat_device_info(sq_url, data)) bd_dict = loop.run_until_complete(bd_task) sq_dict = loop.run_until_complete(sq_task) loop.close() bd_data = bd_dict.get("data", []) sq_data = sq_dict.get("data", []) data = [] data.extend(bd_data) data.extend(sq_data) data = sorted(data, key=lambda e: e.__getitem__('uptime'), reverse=True) data = sorted(data, key=lambda e: e.__getitem__('device_id'), reverse=True) return data class GetDashujuSiqingDeviceInfo(object): def __init__(self): # 大数据平台mongo连接 user = parse.quote_plus("root") passwd = parse.quote_plus("yfkj@6020") da_shuju = "127.0.0.1" si_qing = "172.16.220.3" myclient = pymongo.MongoClient("mongodb://{0}:{1}@{2}:57017/".format(user,passwd, da_shuju)) db = myclient.smartfarming self.sa_device = db.sa_device sa_device_type = db.sa_device_type device_tps = sa_device_type.find({}, {"_id": 0, "id": 1, "type_name": 1}) self.device_tps = {str(i.get("id")): i.get("type_name") for i in list(device_tps)} # 大数据平台查询用户id与real_name da_conn = pymysql.connect( host=da_shuju, port=61882, user='root', password='yfkj@6020', database='smartfarming' ) cursor = da_conn.cursor() cursor.execute("select uid, real_name from sa_device_user") result = cursor.fetchall() self.rk = {str(i[0]): i[1] for i in result} cursor.close() da_conn.close() # 四情 mysql 连接 self.conn = pymysql.connect( host=si_qing, port=3306, user='yfwlw', password='sql_yfkj_6019', database='yfwlw' ) si_qing_cursor = self.conn.cursor() si_qing_cursor.execute("select id, username from AppInfoManage_myuser") si_result = si_qing_cursor.fetchall() self.si_rk = {str(i[0]): i[1] for i in si_result} si_qing_cursor.execute("select type_id, type_name from AppInfoManage_equip_type") si_device_type = si_qing_cursor.fetchall() self.si_device_type = {str(i[0]): i[1] for i in si_device_type} si_qing_cursor.close() def get_dashuju(self, device_id): # 大数据查询设备信息 result = self.sa_device.find_one( {"device_id": device_id}, { 'device_type_id': 1, 'salesman_task_number': 1, 'owner_uid': 1, 'device_status': 1, 'province': 1, 'city': 1, 'district': 1, 'uptime': 1, 'device_id': 1, 'user_dealer': 1, '_id': 0 } ) if not result: return {} if result.get("user_dealer"): result["owner_user"] = self.rk.get(str(result.get("user_dealer"))) elif result.get("owner_uid"): result["owner_user"] = self.rk.get(str(result.get("owner_uid"))) else: result["owner_user"] = "未分配" # 排除指定设备类型状态判断 if result.get("device_type_id") in [11, 13, 19, 20]: result["status"] = "未知" else: if result.get("device_status") == 1: result["status"] = "在线" else: result["status"] = "离线" result["device_type"] = self.device_tps.get(str(result.get("device_type_id")), "-") result["plat"] = "大数据平台" result["order_id"] = result.get("salesman_task_number") if result.get("salesman_task_number") !="0" else "——" if result.get("province"): result["location"] = result.get("province") + "," + result.get("city") + "," + result.get("district") else: result["location"] = "暂无定位" return result def get_siqing(self, device_id): cursor = self.conn.cursor() cursor.execute("select equip_id, equip_user_id, equip_type_id, equip_city from AppInfoManage_equip where equip_id = %s", (device_id, )) cursor_result = cursor.fetchone() if not cursor_result: return {} columns = [desc[0] for desc in cursor.description] result = dict(zip(columns, cursor_result)) result["plat"] = "四情平台" if result.get("equip_user_id"): result["owner_user"] = self.si_rk.get(str(result.get("equip_user_id"))) else: result["owner_user"] = "未分配" equip_type_id = result.get("equip_type_id") if equip_type_id == 0: status = "未知" uptime = 0 elif equip_type_id == 2: cursor.execute("select upl_time, is_online from AppInfoManage_scdstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 3: cursor.execute("select upl_time, is_online from AppInfoManage_cbdstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 4: cursor.execute("select upl_time, is_online from AppInfoManage_ybqstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 5: cursor.execute("select upl_time, is_online from AppInfoManage_qxzstatus_new where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 6: cursor.execute("select upl_time, is_online from AppInfoManage_jkdata where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 7: cursor.execute("select upl_time, is_online from AppInfoManage_bzystatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 8: cursor.execute("select upl_time, is_online from AppInfoManage_trapstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 9: cursor.execute("select upl_time, is_online from AppInfoManage_tccbstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 10: cursor.execute("select upl_time, is_online from AppInfoManage_rtustatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 11: cursor.execute("select upl_time, is_online from AppInfoManage_wheatstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 15: cursor.execute("select upl_time, is_online from AppInfoManage_wheatstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" elif equip_type_id == 16: cursor.execute("select upl_time, is_online from AppInfoManage_wheatstatus where equip_id_id = %s", (device_id, )) scd_status = cursor.fetchone() uptime = int((scd_status[0]).timestamp()) status = "在线" if scd_status[1] == "1" else "离线" result["uptime"] = uptime result["status"] = status result["device_type"] = self.si_device_type.get(str(equip_type_id)) if result.get("equip_city") and result.get("equip_city") != '0': result["location"] = result.get("equip_city") else: result["location"] = "暂无定位" result["order_id"] = "——" return result def get_together(self, device_id): try: dashuju = self.get_dashuju(device_id) siqing = self.get_siqing(device_id) if dashuju and siqing: if dashuju.get("uptime") > siqing.get("uptime"): return dashuju else: return siqing elif dashuju and (not siqing): return dashuju elif (not dashuju) and siqing: return siqing else: return {} except Exception as e: traceback.print_exc() class GetSIMInfo(object): """ 自定义获取SIM卡对应卡商及数据 """ def __init__(self,iccid) -> None: self.iccid = iccid self.hz_status = { 0: "未知", 1: "测试期", 2: "沉默期", 3: "使用中", 4: "停机", 5: "停机保号", 6: "预销号", 7: "销号" } self.qp_status = { "testing": "测试中", "inventory": "库存", "pending-activation": "待激活", "activation": "已激活", "deactivation": "已停卡", "retired": "已销卡" } def hz_sim_info(self): """获取合宙流量卡信息""" url = "http://sim.brlink.cn/api/open/iotcard/card" payload = { 'iccid': self.iccid, } appkey = 'iaO2DKgS8KdlnVgU' appsecret = 'qzKgO4sBdzMrjRwv9H22S9ufepNv8Hl5ehPqkYVD31DCICjyKwqUdj7zihQQKfgx' auth = HTTPBasicAuth(appkey, appsecret) try: res = requests.post(url, json=payload, auth=auth) data = json.loads(res.text) except Exception as e: print(self.iccid,"合宙接口异常",e) data = {'code': 99999, 'msg': '接口调用异常异常'} return data def qp_sim_info(self): """ 获取企鹏(SIMBOSS)流量卡信息 """ url = "https://api.simboss.com/2.0/device/detail" current_milli_time = lambda: int(round(time.time() * 1000)) data_1 = "appid=%s&iccid=%s×tamp=%s%s"%("102420177762",self.iccid,current_milli_time(),"6397d7e6a56589f1d93284e9800493e1") sign = hashlib.sha256(data_1.encode('utf-8')).hexdigest() data = {"appid": "102420177762", "iccid": self.iccid, "timestamp":current_milli_time(),"sign":sign} try: res = requests.post(url, data=data) data = json.loads(res.text) except Exception as e: print(self.iccid,"SIMBOSS接口异常",e) data = {'code': 99999, 'msg': '接口调用异常异常'} return data def xj_sim_info(self): # 信金 xj_userid = 137 xj_now = int(time.time()) xj_apikey = "2ae46f82215a187ba22656db9a9848d8" xj_params = f"userId={xj_userid}&apikey={xj_apikey}×={xj_now}" md5 = hashlib.md5() md5.update(xj_params.encode('utf-8')) xj_sign = (md5.hexdigest()).upper() url = f"http://hywx.xjict.com:32040/api/v1/getChaxun?userId={xj_userid}&cardno={self.iccid}×={xj_now}&sign={xj_sign}" xj_ret = requests.get( url, timeout=(10,30) ) xj_response = json.loads(xj_ret.text) xj_code = xj_response.get("code") if xj_code == 0: xj_st_dict = { "0":"未知", "00":"正常", "01":"单向停机", "02":"停机", "03":"预销号", "04":"销号/拆机", "05":"过户", "06":"休眠", "07":"待激", "08":"已停用", "09":"库存", "10":"已失效", "11":"违章停机", "12":"挂失", "13":"用户报停", "14":"测试", "99":"不存在" } xj_data = xj_response.get("data") autoname = xj_data.get("autoname") nub = None nub = 0 if "M" in autoname: nubs = autoname.split("M") if nubs: nub = nubs[0] if nub and nub.isdigit(): nub = int(nub) if "G" in autoname: nubs = autoname.split("G") if nubs: nub = nubs[0] if nub and nub.isdigit(): nub = int(nub) * 1024 return { "created_at": xj_data.get("shouchong_time"), "status": xj_st_dict.get(xj_data.get("state")), "total": nub, "used": int(xj_data.get("used")/1024) if xj_data.get("used") else 0, "expire": xj_data.get("expired_at"), "iccid": self.iccid, "package": autoname, "company": "信金", "code": False } else: return {} def get_sim_info(self): hz_data = self.hz_sim_info() if hz_data["code"] == 0: # 合宙 sim_operators = 1 account_status = self.hz_status[hz_data["data"]["account_status"]] if hz_data["data"]["active"] == 1: active_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(hz_data["data"]["active_date"])) data_plan = str(hz_data["data"]["data_plan"]) + "M" data_usage = str(hz_data["data"]["data_usage"]) + "M" data_balance = str(hz_data["data"]["data_balance"]) + "M" expiry_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(hz_data["data"]["expiry_date"])) else: active_date = "未激活" data_plan = "未激活" data_usage = "未激活" data_balance = "未激活" expiry_date = "未激活" else: qp_data = self.qp_sim_info() if qp_data["code"] == "0": sim_operators = 2 account_status = self.qp_status[qp_data["data"]["status"]] active_date = qp_data["data"].get("startDate","") if active_date: if qp_data["data"]["useCountAsVolume"] == False: data_plan = str(qp_data["data"]["totalDataVolume"]) + "M" data_usage = str(qp_data["data"]["usedDataVolume"]) + "M" data_balance = str(qp_data["data"]["totalDataVolume"] - qp_data["data"]["usedDataVolume"]) + "M" else: data_plan = str(qp_data["data"]["totalDataVolume"] * 1024) + "M" data_usage = str(qp_data["data"]["usedDataVolume"] * 1024) + "M" data_balance = str((qp_data["data"]["totalDataVolume"] - qp_data["data"]["usedDataVolume"])*1024) + "M" expiry_date = qp_data["data"]["expireDate"] else: data_plan = "未激活" data_usage = "未激活" data_balance = "未激活" expiry_date = "未激活" else: try: # 增加信金厂商 xj_data = self.xj_sim_info() if xj_data: sim_operators = 4 account_status = xj_data.get("status") active_date = xj_data.get("created_at") data_plan = str(xj_data.get("total")) + "M" data_usage = str(xj_data.get("used")) + "M" data_balance = str(xj_data.get("total") - xj_data.get("used")) + "M" expiry_date = xj_data.get("expire") else: sim_operators = 3 account_status = "未知" active_date = "未知" data_plan = "未知" data_usage = "未知" data_balance = "未知" expiry_date = "未知" except Exception as e: sim_operators = 3 account_status = "未知" active_date = "未知" data_plan = "未知" data_usage = "未知" data_balance = "未知" expiry_date = "未知" return sim_operators,account_status,active_date,data_plan,data_usage,data_balance,expiry_date if __name__ == "__main__": sim_operators,account_status,active_date,data_plan,data_usage,data_balance,expiry_date = GetSIMInfo("898604F81623D0432214").get_sim_info() print(sim_operators,account_status,active_date,data_plan,data_usage,data_balance,expiry_date)