import string import random import requests import json import logging import time import hashlib import math from captcha.image import ImageCaptcha from io import BytesIO import base64 from datetime import datetime, timedelta import calendar from smartfarming.models.user import Role, UserPurview from kedong.tools import RedisPool from kedong import settings logger = logging.getLogger("other") redis_pool = RedisPool().get_redis_pool(settings.redis_db["jiankong"]) config = settings.CONFIG def perms_pc_app(type_id=1000, perms_lst=[]): menu = "PC" if type_id == 1000 else "APP" parent_ids = [] for p in perms_lst: # 先判断是否是父级菜单 per_obj = UserPurview.objects.filter(id=int(p), menu=menu).first() if per_obj: if per_obj.parent_perm_id == 0: parent_ids.append(p) perm_lst = [] for i in parent_ids: parent = UserPurview.objects.get(id=int(i), menu=menu) inner_parent = { "menu": parent.menu, "parent_perm_id": parent.parent_perm_id, "pur_id": parent.id, "purview_name": parent.purview_name, "url": parent.url } children_lst = [] # 获取此父级菜单中所有的子菜单 children = UserPurview.objects.filter(parent_perm_id=int(i), menu=menu) for child in children: if child.id in perms_lst: children_lst.append( { "menu": child.menu, "parent_perm_id": child.parent_perm_id, "pur_id": child.id, "purview_name": child.purview_name, "url": child.url } ) if children_lst: inner_parent["children"] = children_lst perm_lst.append(inner_parent) return perm_lst def get_perm_list(device_user): id = device_user.role_id try: role = Role.objects.get(id=id) role_perm = role.role_perm perms_lst = [int(i) for i in role_perm.split(",")] pc_perm_lst = perms_pc_app(type_id=1000, perms_lst=perms_lst) app_perm_lst = perms_pc_app(type_id=2000, perms_lst=perms_lst) if pc_perm_lst and app_perm_lst: perm_lst = [ { "menu": "", "parent_perm_id": 1000, "pur_id": 0, "purview_name": "登录PC端", "url": "", "children": pc_perm_lst }, { "menu": "", "parent_perm_id": 2000, "pur_id": 0, "purview_name": "登录APP端", "url": "", "children": app_perm_lst } ] elif pc_perm_lst: perm_lst = [ { "menu": "", "parent_perm_id": 1000, "pur_id": 0, "purview_name": "登录PC端", "url": "", "children": pc_perm_lst } ] elif app_perm_lst: perm_lst = [ { "menu": "", "parent_perm_id": 2000, "pur_id": 0, "purview_name": "登录APP端", "url": "", "children": app_perm_lst } ] else: perm_lst = [] mark = role.mark return perm_lst, mark except Exception as e: return [], "" def get_all_pers(): role_perm = UserPurview.objects.all().values("id") perms_lst = [] for i in role_perm: perms_lst.append(i.get("id")) pc_perm_lst = perms_pc_app(type_id=1000, perms_lst=perms_lst) app_perm_lst = perms_pc_app(type_id=2000, perms_lst=perms_lst) perm_lst = [ { "menu": "", "parent_perm_id": 1000, "pur_id": 0, "purview_name": "登录PC端", "url": "", "children": pc_perm_lst }, { "menu": "", "parent_perm_id": 2000, "pur_id": 0, "purview_name": "登录APP端", "url": "", "children": app_perm_lst } ] return perm_lst def get_captcha(): """获取图片验证码 return : 验证码字符串,验证码图片base64数据 """ chr_all = string.ascii_uppercase + string.digits chr_all = chr_all.replace('0', '').replace('1', '').replace('2', '').replace('O', '').replace('I', '') code_str = ''.join(random.sample(chr_all, 4)) image = ImageCaptcha().generate_image(code_str) f = BytesIO() image.save(f, 'png') data = f.getvalue() f.close() encode_data = base64.b64encode(data) data = str(encode_data, encoding='utf-8') img_data = "data:image/jpeg;base64,{data}".format(data=data) return code_str, img_data def get_recent_month(num): # 获取最近N个月的时间戳 end_time = datetime.now() # 结束时间,当前时间 # 获取当前月份和年份 current_month = end_time.month current_year = end_time.year # 计算开始时间 start_month = current_month - num # 当前月份往前推N个月 start_year = current_year if start_month <= 0: # 如果计算得到的月份为负数,则需要向前借位 start_month += 12 start_year -= 1 # 获取开始时间所在月份的最后一天 _, last_day = calendar.monthrange(start_year, start_month) start_time = datetime(start_year, start_month, last_day).timestamp() return start_time def get_addr_by_lag_lng(lat,lng): # 根据经纬度获取省市级 is_success, province, city, district = False, "", "", "" try: # 使用腾讯接口 url = "https://apis.map.qq.com/ws/geocoder/v1?location={},{}&get_poi=0&key=OA4BZ-FX43U-E5VV2-45M6S-C4HD3-NIFFI&output=json".format(lat,lng) requests.adapters.DEFAULT_RETRIES = 5 # 增加重连次数 s = requests.session() s.keep_alive = False # 关闭多余连接 ret = s.get(url) rest = json.loads(ret.text) province = rest["result"]["ad_info"]["province"] city = rest["result"]["ad_info"]["city"] district = rest["result"]["ad_info"]["district"] is_success = True except Exception as e: logger.error(f"获取腾讯地理位置接口失败 {e.args}") if not is_success: try: # 使用百度接口 url = "http://api.map.baidu.com/geocoder?location=%s,%s&output=json"%(lat,lng) requests.adapters.DEFAULT_RETRIES = 5 # 增加重连次数 s = requests.session() s.keep_alive = False # 关闭多余连接 ret = s.get(url) addr = json.loads(ret.text) province = addr["result"]["addressComponent"]["province"] city = addr["result"]["addressComponent"]["city"] district = addr["result"]["addressComponent"]["district"] is_success = True except Exception as e: logger.error(f"获取百度地理位置接口失败 {e.args}") return is_success, province, city, district # 获取天气数据 def get_address_by_lntlat(lng, lat): province, city, district = "", "", "" try: key = "78ce288400f4fc6d9458989875c833c2" adcode_url = "http://restapi.amap.com/v3/geocode/regeo?key={key}&location={lng},{lat}&poitype=&radius=&" \ "extensions=all&batch=false&roadlevel=0".format(key=key, lng=lng, lat=lat) res = requests.get(adcode_url) rsp = res.json() address_info = rsp['regeocode']['addressComponent'] adcode = address_info['adcode'] if adcode != "900000": province, city, district = address_info['province'], address_info['city'], address_info['district'] if (not city) and district: city = district except Exception as e: pass return province, city, district # 获取天气相关信息 def get_weather_info(lng=None, lat=None): data = {} if lng and lat: try: url = "http://114.115.147.140:8080/weather" res = requests.post(url, {"lat": lat, "lng": lng}) result = res.json() if result['status'] == 'true': data = result['data'] except Exception as e: pass return data def md5value(appSecret): """sign加密""" times = int(time.time()) # 随机生成32位字符串 nonce = ''.join(random.sample(string.ascii_letters + string.digits, 32)) sign = "time:{},nonce:{},appSecret:{}".format(times, nonce, appSecret) input_name = hashlib.md5() input_name.update(sign.encode("utf-8")) sign = (input_name.hexdigest()).lower() return times,nonce,sign def deviceAccessToken(appId, appSecret): """ 乐橙云账号秘钥 appSecret appId 使用乐橙云账号的appid和appSecret获取监控token 请求时数据格式 { "id":"98a7a257-c4e4-4db3-a2d3-d97a3836b87c", "system":{ "ver":"1.0", "appId":"lcdxxxxxxxxx", "sign":"b7e5bbcc6cc07941725d9ad318883d8e", "time":1599013514, "nonce":"fbf19fc6-17a1-4f73-a967-75eadbc805a2"}, "params":{} } 签名原始串为:"time:1531401328,nonce:61f38836685b66f3201469543d8365f7,appSecret:12459ac547434b3ea83db5e6d56789" """ # 先从redis 中获取token tokens = redis_pool.get(appId) if tokens and tokens !=None: redis_jk_data = eval(tokens) status = redis_jk_data.get("status",1) if status: # 查看到期时间 expireTime = redis_pool.ttl(appId) # 设置到期时间 秒 # redis_pool.expire('ttt', 10) redis_jk_data["result"]["data"]["expireTime"] = expireTime else: redis_jk_data = {"status":0} return redis_jk_data else: status = 0 if not status: url = config.get("camera").get("lc_token") # 加密 times,nonce,sign = md5value(appSecret) data = { "system":{ "ver":"1.0", "appId":appId, "sign":sign, "time":times, "nonce":nonce }, "id":nonce, "params":{ } } token_data = requests.post(url,json=data, timeout=5) if token_data.status_code == 200: try: token_json_data = json.loads(token_data.text) expireTime = token_json_data["result"]["data"]["expireTime"] # status 1 代表获取token 成功 0失败 token_json_data["status"] = 1 except Exception as e: token_json_data = {"status":0} expireTime = 5 else: token_json_data = {"status":0} expireTime = 5 redis_pool.set(appId,token_json_data,expireTime) return token_json_data def device_config(appId,appSecret): """ 配置项 """ time,nonce,sign = md5value(appSecret) token_data = deviceAccessToken(appId, appSecret) if token_data.get("status","0"): token = token_data["result"]["data"]["accessToken"] else: token = "" return time,nonce,sign,token def controlMove(appId,appSecret,deviceId,operation,channelId="0"): """ 云台控制接口 请求样例: operation 0-上,1-下,2-左,3-右8-放大9-缩小10-停止 { "id":"d5c287b4-5b2f-4f03-baf5-8032c5c354af", "system":{ "ver":"1.0", "appId":"lcdxxxxxxxxx", "sign":"74bc756ebd53e9eef2836f8cf1730c10", "time":1599031074, "nonce":"8ec8a7bd1d1c95420de20d92422d457d" }, "params":{ "token":"At_00006acr3456d12312d3grf60147de7ec", "deviceId":"MEGREZ0000001842", "channelId":"0", "operation":"1", "duration":"1000" # 移动持续时间,单位毫秒 } } """ url = config.get("camera").get("lc_controler") time,nonce,sign,token = device_config(appId, appSecret) if not token: return {"msg":"控制失败"} data = { "system":{ "ver":"1.0", "appId":appId, "sign":sign, "time":time, "nonce":nonce }, "id":nonce, "params":{ "token":token, "deviceId":deviceId, "channelId":channelId, "operation":operation, "duration":"1000" } } control_Move = requests.post(url,json=data) return control_Move.json() def expire_time(times): data_time = datetime.datetime.now() cultivate_time_expire = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(times)) time1 = datetime.datetime.strptime(cultivate_time_expire,"%Y-%m-%d %H:%M:%S") num=(time1-data_time).days print("设备到期天数---",num) if int(num) <=7 and int(num) >= 0: print("即将到期") status = 2 elif int(num) < 0: print("已到期") status = 1 else: print("未到期") status = 0 return status def get_center_lnglat(lnglats_list): x, y, z = 0, 0, 0 length = len(lnglats_list) for n, a in lnglats_list: g, t = math.radians(float(n)), math.radians(float(a)) x += math.cos(t) * math.cos(g) y += math.cos(t) * math.sin(g) z += math.sin(t) x = float(x / length) y = float(y / length) z = float(z / length) lng = round(math.degrees(math.atan2(y, x)), 4) lat = round(math.degrees(math.atan2(z, math.sqrt(x*x + y*y))), 4) return lng, lat def bindDeviceLive(appId,appSecret,deviceId,channelId="0"): """ 获取直播地址 """ url = config.get("camera").get("lc_addr") time,nonce,sign,token = device_config(appId, appSecret) data = { "system":{ "ver":"1.0", "appId":appId, "sign":sign, "time":time, "nonce":nonce }, "id":nonce, "params":{ "streamId":1, "deviceId":deviceId, "channelId":channelId, "token":token } } device_live_data = requests.post(url,json=data) return device_live_data.json() def getLive(appId,appSecret,deviceId,channelId="0"): """ 查询监控地址 用于设备直播地址创建后再次查询使用 """ url = config.get("camera").get("lc_stream") time,nonce,sign,token = device_config(appId, appSecret) data = { "system":{ "ver":"1.0", "appId":appId, "sign":sign, "time":time, "nonce":nonce }, "id":nonce, "params":{ "deviceId":deviceId, "channelId":channelId, "token":token } } live_data = requests.post(url,json=data) return live_data.json() if __name__ == "__main__": t = get_recent_month(1) print(t)