import pymongo import pymysql from urllib import parse import time import requests import json import datetime import hashlib from requests.auth import HTTPBasicAuth DEBUG = False def user_info(query): if DEBUG: # 设置数据库连接 connection = pymysql.connect( host="114.55.0.7", port=3306, user='root', password='yfkj@6020', database="smartfarming" ) else: # 设置数据库连接 connection = pymysql.connect( host='8.136.98.49', user='root', port=61882, password='yfkj@6020', database='smartfarming' ) try: cursor = connection.cursor() # 使用模糊查询(如 '%query%')进行设备名称匹配 sql_query = "SELECT uid, user_type FROM sa_device_user where user_type in (2,4) and real_name = %s" cursor.execute(sql_query,(query, )) result = cursor.fetchall() # 提取设备名称并返回 if result: print(result) uid,user_type = result[0][0],result[0][1] if user_type == 2: rek = {"user_dealer": uid} else: rek = {"owner_uid": uid} return rek else: return {} except Exception as e: print(e.args) return {} finally: cursor.close() connection.close() def bigata_mongo(): if DEBUG: user = parse.quote_plus("root") passwd = parse.quote_plus("yfkj@6020") myclient = pymongo.MongoClient("mongodb://{0}:{1}@114.55.0.7:27017/".format(user,passwd)) else: user = parse.quote_plus("root") passwd = parse.quote_plus("yfkj@6020") myclient = pymongo.MongoClient("mongodb://{0}:{1}@8.136.98.49:57017/".format(user,passwd)) return myclient.smartfarming def sim_query_new(iccid): finally_response = {"status": None, "total": -1, "used": -1, "expire": 0, "iccid": iccid, "code": True, "msg": f"卡号:{iccid}, 查询失败,请联系管理员!"} if not iccid: return {"code": True, "msg": "未传ICCID"} # 企鹏 qp_appid = "102420177762" qp_app_secret = "6397d7e6a56589f1d93284e9800493e1" qp_now = lambda: int(round(time.time() * 1000)) qp_data = f"appid={qp_appid}&iccid={iccid}×tamp={qp_now()}{qp_app_secret}" sign = hashlib.sha256(qp_data.encode('utf-8')).hexdigest() data = { "appid": qp_appid, "iccid": iccid, "timestamp":qp_now(), "sign":sign } qp_url = "https://api.simboss.com/2.0/device/detail" try: qp_ret = requests.post(qp_url, data=data) qp_response = json.loads(qp_ret.text) qp_code = qp_response.get("code") except Exception as e: qp_code = "1" if qp_code == "0": qp_data = qp_response.get("data") qp_st_dict = { "TEST_READY_NAME": "可测试", "INVENTORY_NAME": "库存", "ACTIVATION_READY_NAME": "可激活", "ACTIVATED_NAME": "已激活", "DEACTIVATED_NAME": "已停卡", "RETIRED_NAME": "已销卡", "PURGED_NAME": "已清除", } return { "status": qp_st_dict.get(qp_data.get("deviceStatus")) , "total": qp_data.get("totalDataVolume"), "used": qp_data.get("dataUsage") if qp_data.get("dataUsage") else 0, "expire": qp_data.get("expireDate"), "iccid": iccid, "package": qp_data.get("iratePlanName"), "company": "企鹏", "code": False } else: # 合宙 hz_url = 'http://sim.brlink.cn/api/open/iotcard/card' hz_appkey = "iaO2DKgS8KdlnVgU" hz_appsecret = "qzKgO4sBdzMrjRwv9H22S9ufepNv8Hl5ehPqkYVD31DCICjyKwqUdj7zihQQKfgx" try: hz_ret = requests.post( hz_url, json={'iccid':iccid}, auth=HTTPBasicAuth(hz_appkey, hz_appsecret), timeout=(5,10) ) hz_response = json.loads(hz_ret.text) hz_code = hz_response.get("code") except Exception as e: hz_code = 1 if hz_code == 0: hz_data = hz_response.get("data") hz_st_dict = { "0": "未知", "1": "测试期", "2": "沉默期", "3": "使用中", "4": "停机", "5": "停机保号", "6": "预销号", "7": "销号", } # 有效期 expiry_date = hz_data.get("expiry_date") return { "status": hz_st_dict.get(str(hz_data.get("account_status", 0))), "total": hz_data.get("data_plan"), "used": hz_data.get("data_usage") if hz_data.get("data_usage") else 0, "expire": datetime.datetime.fromtimestamp(expiry_date).strftime("%Y-%m-%d %H:%M:%S"), "iccid": iccid, "company": "合宙", "code": False } else: # 信金 xj_userid = 137 xj_now = int(time.time()) xj_apikey = "2ae46f82215a187ba22656db9a9848d8" xj_params = f"userId={xj_userid}&apikey={xj_apikey}×={xj_now}" md5 = hashlib.md5() md5.update(xj_params.encode('utf-8')) xj_sign = (md5.hexdigest()).upper() url = f"http://hywx.xjict.com:32040/api/v1/getChaxun?userId={xj_userid}&cardno={iccid}×={xj_now}&sign={xj_sign}" try: xj_ret = requests.get( url, timeout=(10,30) ) xj_response = json.loads(xj_ret.text) xj_code = xj_response.get("code") except Exception as e: xj_code = 1 if xj_code == 0: xj_st_dict = { "0":"未知", "00":"正常", "01":"单向停机", "02":"停机", "03":"预销号", "04":"销号/拆机", "05":"过户", "06":"休眠", "07":"待激", "08":"已停用", "09":"库存", "10":"已失效", "11":"违章停机", "12":"挂失", "13":"用户报停", "14":"测试", "99":"不存在" } try: xj_data = xj_response.get("data") autoname = xj_data.get("autoname") nub = None nub = 0 if "M" in autoname: nubs = autoname.split("M") if nubs: nub = nubs[0] if nub and nub.isdigit(): nub = int(nub) if "G" in autoname: nubs = autoname.split("G") if nubs: nub = nubs[0] if nub and nub.isdigit(): nub = int(nub) * 1024 return { "status": xj_st_dict.get(xj_data.get("state")), "total": nub, "used": int(xj_data.get("used")/1024) if xj_data.get("used") else 0, "expire": xj_data.get("expired_at"), "iccid": iccid, "package": autoname, "company": "信金", "code": False } except Exception as e: return finally_response return finally_response