| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546 |
- import string
- import random
- import requests
- import json
- import logging
- import time
- import hashlib
- import math
- from captcha.image import ImageCaptcha
- from io import BytesIO
- import base64
- from datetime import datetime, timedelta
- import calendar
- from smartfarming.models.user import Role, UserPurview
- from kedong.tools import RedisPool
- from kedong import settings
- logger = logging.getLogger("other")
- redis_pool = RedisPool().get_redis_pool(settings.redis_db["jiankong"])
- config = settings.CONFIG
- def perms_pc_app(type_id=1000, perms_lst=[], device_user=None):
- menu = "PC" if type_id == 1000 else "APP"
- if menu == "APP":
- children_lst = []
- apps = UserPurview.objects.filter(menu=menu)
- for i in apps:
- children_lst.append(
- {
- "menu": "APP",
- "parent_perm_id": 99,
- "pur_id": i.id,
- "purview_name": i.purview_name,
- "url": i.url
- }
- )
- return children_lst
- else:
- parent_ids = []
- for p in perms_lst:
- # 先判断是否是父级菜单
- per_obj = UserPurview.objects.filter(id=int(p), menu=menu).first()
- if per_obj:
- if per_obj.parent_perm_id == 0:
- parent_ids.append(p)
- perm_lst = []
- for i in parent_ids:
- parent = UserPurview.objects.get(id=int(i), menu=menu)
- inner_parent = {
- "menu": parent.menu,
- "parent_perm_id": parent.parent_perm_id,
- "pur_id": parent.id,
- "purview_name": parent.purview_name,
- "url": parent.url
- }
- children_lst = []
- # 获取此父级菜单中所有的子菜单
- children = UserPurview.objects.filter(parent_perm_id=int(i), menu=menu)
- for child in children:
- if child.id in perms_lst:
- if device_user and device_user.real_name == "yunfei" and child.parent_perm_id == 1:
- children_lst.append(
- {
- "menu": child.menu,
- "parent_perm_id": child.parent_perm_id,
- "pur_id": child.id,
- "purview_name": child.purview_name,
- "url": child.url
- }
- )
- if child.purview_name == "用户管理":
- children_lst.append(
- {
- "menu": "PC",
- "parent_perm_id": 0,
- "pur_id": 38,
- "purview_name": "监控定位",
- "url": "0"
- }
- )
- children_lst.append(
- {
- "menu": "PC",
- "parent_perm_id": 0,
- "pur_id": 39,
- "purview_name": "APP信息",
- "url": "0"
- }
- )
- else:
- children_lst.append(
- {
- "menu": child.menu,
- "parent_perm_id": child.parent_perm_id,
- "pur_id": child.id,
- "purview_name": child.purview_name,
- "url": child.url
- }
- )
- if children_lst:
- inner_parent["children"] = children_lst
- perm_lst.append(inner_parent)
- return perm_lst
- def get_perm_list(device_user):
- id = device_user.role_id
- try:
- role = Role.objects.get(id=id)
- role_perm = role.role_perm
- perms_lst = [int(i) for i in role_perm.split(",")]
- pc_perm_lst = perms_pc_app(type_id=1000, perms_lst=perms_lst, device_user=device_user)
- app_perm_lst = perms_pc_app(type_id=2000, perms_lst=perms_lst)
- if pc_perm_lst and app_perm_lst:
- perm_lst = [
- {
- "menu": "",
- "parent_perm_id": 1000,
- "pur_id": 0,
- "purview_name": "登录PC端",
- "url": "",
- "children": pc_perm_lst
- },
- {
- "menu": "",
- "parent_perm_id": 2000,
- "pur_id": 99,
- "purview_name": "登录APP端",
- "url": "",
- "children": app_perm_lst
- }
- ]
- elif pc_perm_lst:
- perm_lst = [
- {
- "menu": "",
- "parent_perm_id": 1000,
- "pur_id": 0,
- "purview_name": "登录PC端",
- "url": "",
- "children": pc_perm_lst
- }
- ]
- elif app_perm_lst:
- perm_lst = [
- {
- "menu": "",
- "parent_perm_id": 2000,
- "pur_id": 99,
- "purview_name": "登录APP端",
- "url": "",
- "children": app_perm_lst
- }
- ]
- else:
- perm_lst = []
- mark = role.mark
- return perm_lst, mark
- except Exception as e:
- return [], ""
- def get_all_pers():
- role_perm = UserPurview.objects.all().values("id")
- perms_lst = []
- for i in role_perm:
- perms_lst.append(i.get("id"))
- pc_perm_lst = perms_pc_app(type_id=1000, perms_lst=perms_lst)
- app_perm_lst = perms_pc_app(type_id=2000, perms_lst=perms_lst)
- perm_lst = [
- {
- "menu": "",
- "parent_perm_id": 1000,
- "pur_id": 0,
- "purview_name": "登录PC端",
- "url": "",
- "children": pc_perm_lst
- },
- {
- "menu": "",
- "parent_perm_id": 2000,
- "pur_id": 99,
- "purview_name": "登录APP端",
- "url": "",
- "children": app_perm_lst
- }
- ]
- return perm_lst
- def get_captcha():
- """获取图片验证码
- return :
- 验证码字符串,验证码图片base64数据
- """
- chr_all = string.ascii_uppercase + string.digits
- chr_all = chr_all.replace('0', '').replace('1', '').replace('2', '').replace('O', '').replace('I', '')
- code_str = ''.join(random.sample(chr_all, 4))
- image = ImageCaptcha().generate_image(code_str)
- f = BytesIO()
- image.save(f, 'png')
- data = f.getvalue()
- f.close()
- encode_data = base64.b64encode(data)
- data = str(encode_data, encoding='utf-8')
- img_data = "data:image/jpeg;base64,{data}".format(data=data)
- return code_str, img_data
- def get_recent_month(num):
- # 获取最近N个月的时间戳
- end_time = datetime.now() # 结束时间,当前时间
- # 获取当前月份和年份
- current_month = end_time.month
- current_year = end_time.year
- # 计算开始时间
- start_month = current_month - num # 当前月份往前推N个月
- start_year = current_year
- if start_month <= 0: # 如果计算得到的月份为负数,则需要向前借位
- start_month += 12
- start_year -= 1
- # 获取开始时间所在月份的最后一天
- _, last_day = calendar.monthrange(start_year, start_month)
- start_time = datetime(start_year, start_month, last_day).timestamp()
- return start_time
- def get_addr_by_lag_lng(lat,lng):
- # 根据经纬度获取省市级
- is_success, province, city, district = False, "", "", ""
- try:
- # 使用腾讯接口
- url = "https://apis.map.qq.com/ws/geocoder/v1?location={},{}&get_poi=0&key=OA4BZ-FX43U-E5VV2-45M6S-C4HD3-NIFFI&output=json".format(lat,lng)
- requests.adapters.DEFAULT_RETRIES = 5 # 增加重连次数
- s = requests.session()
- s.keep_alive = False # 关闭多余连接
- ret = s.get(url)
- rest = json.loads(ret.text)
- province = rest["result"]["ad_info"]["province"]
- city = rest["result"]["ad_info"]["city"]
- district = rest["result"]["ad_info"]["district"]
- is_success = True
- except Exception as e:
- logger.error(f"获取腾讯地理位置接口失败 {e.args}")
- if not is_success:
- try:
- # 使用百度接口
- url = "http://api.map.baidu.com/geocoder?location=%s,%s&output=json"%(lat,lng)
- requests.adapters.DEFAULT_RETRIES = 5 # 增加重连次数
- s = requests.session()
- s.keep_alive = False # 关闭多余连接
- ret = s.get(url)
- addr = json.loads(ret.text)
- province = addr["result"]["addressComponent"]["province"]
- city = addr["result"]["addressComponent"]["city"]
- district = addr["result"]["addressComponent"]["district"]
- is_success = True
- except Exception as e:
- logger.error(f"获取百度地理位置接口失败 {e.args}")
- return is_success, province, city, district
- # 获取天气数据
- def get_address_by_lntlat(lng, lat):
- province, city, district = "", "", ""
- try:
- key = "78ce288400f4fc6d9458989875c833c2"
- adcode_url = "http://restapi.amap.com/v3/geocode/regeo?key={key}&location={lng},{lat}&poitype=&radius=&" \
- "extensions=all&batch=false&roadlevel=0".format(key=key, lng=lng, lat=lat)
- res = requests.get(adcode_url)
- rsp = res.json()
- address_info = rsp['regeocode']['addressComponent']
- adcode = address_info['adcode']
- if adcode != "900000":
- province, city, district = address_info['province'], address_info['city'], address_info['district']
- if (not city) and district:
- city = district
- except Exception as e:
- pass
- return province, city, district
- # 获取天气相关信息
- def get_weather_info(lng=None, lat=None):
- data = {}
- if lng and lat:
- try:
- url = "http://114.115.147.140:8080/weather"
- res = requests.post(url, {"lat": lat, "lng": lng})
- result = res.json()
- if result['status'] == 'true':
- data = result['data']
- except Exception as e:
- pass
- return data
- def expire_time(times):
- data_time = datetime.datetime.now()
- cultivate_time_expire = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(times))
- time1 = datetime.datetime.strptime(cultivate_time_expire,"%Y-%m-%d %H:%M:%S")
- num=(time1-data_time).days
- print("设备到期天数---",num)
- if int(num) <=7 and int(num) >= 0:
- print("即将到期")
- status = 2
-
- elif int(num) < 0:
- print("已到期")
- status = 1
- else:
- print("未到期")
- status = 0
-
- return status
- def get_center_lnglat(lnglats_list):
- x, y, z = 0, 0, 0
- length = len(lnglats_list)
- for n, a in lnglats_list:
- g, t = math.radians(float(n)), math.radians(float(a))
- x += math.cos(t) * math.cos(g)
- y += math.cos(t) * math.sin(g)
- z += math.sin(t)
- x = float(x / length)
- y = float(y / length)
- z = float(z / length)
- lng = round(math.degrees(math.atan2(y, x)), 4)
- lat = round(math.degrees(math.atan2(z, math.sqrt(x*x + y*y))), 4)
- return lng, lat
- def md5value(appSecret):
- """sign加密"""
- times = int(time.time())
- # 随机生成32位字符串
- nonce = ''.join(random.sample(string.ascii_letters + string.digits, 32))
- sign = "time:{},nonce:{},appSecret:{}".format(times, nonce, appSecret)
- input_name = hashlib.md5()
- input_name.update(sign.encode("utf-8"))
- sign = (input_name.hexdigest()).lower()
- return times,nonce,sign
- def deviceAccessToken(appId, appSecret):
- # 乐橙云账号秘钥
- # 先从redis 中获取token
- tokens = redis_pool.get(appId)
- if tokens and tokens !=None:
- redis_jk_data = eval(tokens)
- status = redis_jk_data.get("status",1)
- if status:
- # 查看到期时间
- expireTime = redis_pool.ttl(appId)
- # 设置到期时间 秒
- # redis_pool.expire('ttt', 10)
- redis_jk_data["result"]["data"]["expireTime"] = expireTime
- else:
- redis_jk_data = {"status":0}
- return redis_jk_data
- else:
- status = 0
- if not status:
- url = config.get("camera").get("lc_token")
- # 加密
- times,nonce,sign = md5value(appSecret)
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":times,"nonce":nonce},
- "id":nonce,
- "params":{}
- }
- token_data = requests.post(url,json=data, timeout=5)
- if token_data.status_code == 200:
- try:
- token_json_data = json.loads(token_data.text)
- expireTime = token_json_data["result"]["data"]["expireTime"]
- # status 1 代表获取token 成功 0失败
- token_json_data["status"] = 1
- except Exception as e:
- token_json_data = {"status":0}
- expireTime = 5
- else:
- token_json_data = {"status":0}
- expireTime = 5
- redis_pool.set(appId,str(token_json_data),expireTime)
- return token_json_data
- def device_config(appId,appSecret):
- # 配置项
- time,nonce,sign = md5value(appSecret)
- token_data = deviceAccessToken(appId, appSecret)
- if token_data.get("status","0"):
- token = token_data["result"]["data"]["accessToken"]
- else:
- token = ""
- return time,nonce,sign,token
- def controlMove(appId,appSecret,deviceId,operation,channelId="0"):
- # 云台控制接口
- url = config.get("camera").get("lc_controler")
- time,nonce,sign,token = device_config(appId, appSecret)
- if not token:
- return {"msg":"控制失败"}
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "id":nonce,
- "params":{"token":token,"deviceId":deviceId,"channelId":channelId,"operation":operation,"duration":"1000"}
- }
- control_Move = requests.post(url,json=data)
- return control_Move.json()
- def bindDeviceLive(appId,appSecret,deviceId,channelId="0"):
- # 获取直播地址
- url = config.get("camera").get("lc_addr")
- time,nonce,sign,token = device_config(appId, appSecret)
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "params":{"streamId":1,"deviceId":deviceId,"channelId":channelId,"token":token},
- "id":nonce
- }
- device_live_data = requests.post(url,json=data)
- return device_live_data.json()
- def getLive(appId,appSecret,deviceId,channelId="0"):
- # 查询监控地址 用于设备直播地址创建后再次查询使用
- url = config.get("camera").get("lc_stream")
- time,nonce,sign,token = device_config(appId, appSecret)
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "params":{"deviceId":deviceId, "channelId":channelId,"token":token},
- "id":nonce
- }
- live_data = requests.post(url,json=data)
- return live_data.json()
- def modify_live_time(appId,appSecret,liveToken,start_time,end_time,period="everyday"):
- # 开启,修改直播计划时间
- # period 直播周期,always:永久,once:指定某天, everyday:每天
- url = config.get("camera").get("lc_modify_live_plan")
- time,nonce,sign,token = device_config(appId, appSecret)
- if not token:
- return {'result': {'msg': '获取token失败。', 'code': '1'}, 'id': 'rWfBFgohj7VwkxsqbM5p4JKDOtQNU2XA'}
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "id":nonce,
- "params":{"token":token,"period":period,"liveToken":liveToken,"beginTime":start_time,"endTime": end_time}
- }
- if period == "always":
- del data["params"]["beginTime"]
- del data["params"]["endTime"]
- device_live_data = requests.post(url,json=data).json()
- return device_live_data
- def off_on_live(appId,appSecret,liveToken,status):
- # 关闭、开启当前直播
- url = config.get("camera").get("lc_modify_live_plan_status")
- time,nonce,sign,token = device_config(appId, appSecret)
- if not token:
- return {'result': {'msg': '获取token失败。', 'code': '1'}, 'id': 'rWfBFgohj7VwkxsqbM5p4JKDOtQNU2XA'}
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "params":{"token":token,"liveToken":liveToken,"status":status},
- "id":nonce
- }
- device_live_data = requests.post(url,json=data).json()
- return device_live_data
- def setDeviceSnap(appId,appSecret,deviceId,channelId="0"):
- # 抓图接口
- url = config.get("camera").get("lc_device_snap_enhanced")
- # url = config.get("camera").get("lc_device_snap")
- time,nonce,sign,token = device_config(appId, appSecret)
- if not token:
- return {"msg": "抓图失败"}
- data = {
- "system":{"ver":"1.0", "appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "params":{"deviceId":deviceId,"channelId":channelId,"token":token},
- "id":nonce
- }
- img_data = requests.post(url,json=data)
- result = img_data.json()
- if result.get("result").get("code") == "0":
- return result.get("result").get("data").get("url")
- def get_weather(ip):
- # 获取天气
- res = requests.get("http://v0.yiketianqi.com/api?unescape=1&version=v91&appid=69334222&appsecret=2u4bHXHD&ext=")
- response = json.loads(res.text)
- return response.get("data")
- def getKitToken(appId,appSecret,deviceId,channelId="0"):
- # 获取录像回放所需要的token
- url = config.get("camera").get("lc_kit_token")
- time,nonce,sign,token = device_config(appId, appSecret)
- if not token:
- return {"result":{"code":"1"}}
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "id":nonce,
- "params":{"type":0,"deviceId":deviceId,"channelId":channelId,"token":token}
- }
- like_token_data = requests.post(url,json=data)
- return like_token_data.json()
- def getRecord(appId,appSecret,deviceId,channelId="0"):
- url = config.get("camera").get("lc_local_record")
- time,nonce,sign,token = device_config(appId, appSecret)
- if not token:
- return {"result":{"code":"1"}}
- data = {
- "system":{"ver":"1.0","appId":appId,"sign":sign,"time":time,"nonce":nonce},
- "id":nonce,
- "params":{
- "deviceId":deviceId,
- "channelId":channelId,
- "token":token,
- "beginTime":"2023-07-19 00:00:00",
- "endTime": "2023-07-25 00:00:00",
- "type": "All",
- "count": 100
- }
- }
- record_token_data = requests.post(url,json=data)
- return record_token_data.json()
- if __name__ == "__main__":
- t = get_recent_month(1)
- print(t)
|