from django.contrib.auth import authenticate from rest_framework.views import APIView from smartfarming.models.user import DeviceUser, MongoUserLoginLog, Role, HomeThemeModel import time import logging from kedong.utils import PrAes from smartfarming.utils import get_perm_list, get_all_pers, get_captcha from rest_framework.response import Response from django.contrib.auth.models import User from smartfarming.serializers.user_serializers import RoleSerializers, HomeThemeModelSerializers from django.core.paginator import Paginator from kedong import settings from django.db.models import Q from kedong.tools import RedisPool from smartfarming.models.agriculture import APKLogs import logging logger = logging.getLogger("myapp") class RoleAPIView(APIView): def post(self, request): # 获取角色列表 request_data = request.data page_num = int(request_data.get("pagenum", 1)) page_size = int(request_data.get("pagesize", 10)) role_name = request_data.get("role_name") if role_name: queryset = Role.objects.filter(role_status=1, role_name__icontains=role_name).order_by("-id") else: queryset = Role.objects.filter(role_status=1).order_by("-id") try: total_obj = queryset.count() paginator = Paginator(queryset, page_size) page_obj = paginator.get_page(page_num) serializer = RoleSerializers(page_obj, many=True) return Response({"code": 0, "data": serializer.data, "count": total_obj}) except Exception as e: return Response({"code": 2, "msg": "获取角色列表失败"}) class RoleAddAPIView(APIView): def post(self, request): # 新增角色 request_data = request.data role_name = request_data.get("role_name") role_perm = request_data.get("role_perm") mark = request_data.get("mark") if not role_name or not role_perm: return Response({"code": 2, "msg": "请输入角色名称与权限"}) role = Role.objects.create( role_name=role_name, role_message=request_data.get("role_message"), role_perm=role_perm, mark=mark, addtime=int(time.time()), uptime=int(time.time()) ) if role: return Response({"code": 0, "msg": "success"}) else: return Response({"code": 2, "msg": "创建角色失败"}) class RoleDelAPIView(APIView): def post(self, request): # 删除角色 request_data = request.data role_id = request_data.get("role_id") device_user = DeviceUser.objects.filter(role_id=role_id) if device_user: return Response({"code": 2, "msg": "该角色下存在用户,请删除该角色下的用户后再删除该角色"}) else: Role.objects.filter(id=role_id).update(role_status=0) return Response({"code": 0, "msg": "success"}) class RoleUpdateAPIView(APIView): def post(self, request): # 更新角色 request_data = request.data role_id=request_data.get("role_id") try: update = Role.objects.filter(id=role_id).update( role_name=request_data.get("role_name"), role_message=request_data.get("role_message"), role_perm=request_data.get("role_perm"), mark=request_data.get("mark"), uptime=int(time.time()) ) return Response({"code": 0, "msg": "success"}) except Exception as e: return Response({"code": 2, "msg": "更新角色失败"}) class APPUserLogin(APIView): permission_classes = [] authentication_classes = [] def post(self, request, format=None): # 用户登录 request_data = request.data username = request_data.get("username") password = request_data.get("password") user = authenticate(username=username, password=password) if user: ip = request.META.get('HTTP_X_FORWARDED_FOR', "") if not ip: ip = request.META.get('REMOTE_ADDR', "") request.session["user_id"] = user.id request.session.save() MongoUserLoginLog(uid=user.id, ip=ip, addtime=int(time.time())).save() device_user = DeviceUser.objects.get(uid=user.id) real_name = device_user.real_name mobile = device_user.mobile en_text = PrAes.aesencrypt(request.session.session_key) return Response({"msg": "success", "code": 0, "data": { "real_name": real_name, "mobile": mobile, "username": username, "session_key":en_text }}) else: return Response({"msg": "失败", "code": 2}) class LoginAPIView(APIView): permission_classes = [] authentication_classes = [] def post(self, request, format=None): # 用户登录 request_data = request.data code_str = request_data.get("code_str") captcha = request_data.get("captcha") username = request_data.get("username") password = request_data.get("password") captcha_low = "" if not (code_str or captcha): msg = "请输入验证码" return Response({"msg": msg, "code": 2}) else: captcha_low = captcha.lower() key = f"{code_str}_{captcha_low}" redis_tools = RedisPool().get_redis_pool(settings.redis_db["captcha"]) is_captcha = redis_tools.get(key) if is_captcha: if captcha_low == captcha_low: pass else: return Response({"msg": "验证码校验失败", "code": 2}) else: return Response({"msg": "验证码已过期或验证码错误", "code": 2}) user = authenticate(username=username, password=password) if user: redis_tools.delete(key) device_user = DeviceUser.objects.filter(uid=user.id, state=1) device_user = device_user.first() if device_user else None msg = "" if not device_user: msg = "用户不存在或已被禁止登录" else: ip = request.META.get('HTTP_X_FORWARDED_FOR', "") if not ip: ip = request.META.get('REMOTE_ADDR', "") now_time = int(time.time()) device_user.login_time = now_time request.session["user_id"] = user.id request.session.save() MongoUserLoginLog(uid=user.id, ip=ip,addtime=now_time).save() en_text = PrAes.aesencrypt(request.session.session_key) myuser_type = device_user.user_type username = device_user.real_name data = { "username":username, "myuser_type":myuser_type, "user_login_time":now_time, "myuid":user.id } return Response({"session_key":en_text,"status":True,"data":data, "code": 0}) else: msg = "用户名或密码错误" return Response({"msg": msg, "code": 2}) class CaptchaView(APIView): permission_classes = [] authentication_classes = [] def post(self, request, *args, **kwargs): """刷新验证码接口""" prefix = str(int(time.time()*1000)) code_str, img_data = get_captcha() key = f"{prefix}_{code_str}" key_lower = key.lower() redis_tools = RedisPool().get_redis_pool(settings.redis_db["captcha"]) redis_tools.set(key_lower, code_str, 300) return Response({'code_str': prefix, 'img_data': img_data, 'code': 0}) class LoginInfoAPIView(APIView): def post(self, request, format=None): # 获取登录信息 try: device_user = request.myuser perm_list, mark = get_perm_list(device_user) user = User.objects.get(id=device_user.uid) username = user.username if not perm_list: return Response({"code": 2, "msg": "请为用户分配角色"}) real_name = device_user.real_name # 增加二维码链接 app = APKLogs.objects.all().order_by("-upltime") qr_code = "" if app: app = app.first() qr_code = app.history_qr_code data = { "username":real_name if real_name else username, "children":perm_list, "mark": mark, "myuser_type":device_user.user_type, "user_login_time":int(time.time()), "myuid":device_user.uid, "qr_code": qr_code } return Response({"code": 0, "msg": "success", "data":data}) except Exception as e: logger.error(f"获取权限列表失败: {e}") return Response({"code": 3, "msg": "获取权限列表失败"}) class UserListAPIView(APIView): def post(self, request): # 用户列表 state 启用0 禁用 1 删除 4 request_data = request.data search = request_data.get("search") page_num = int(request_data.get("pagenum", 1)) page_size = int(request_data.get("pagesize", 10)) users = User.objects.filter(is_active=1).exclude(username='yunfei').values("id", "username", "date_joined").order_by("-date_joined") uids = [] if search: device_user = DeviceUser.objects.filter(Q(real_name__icontains=search) | Q(mobile__icontains=search)) uids = [i.uid for i in device_user] if uids: users = users.filter(id__in = uids, is_active=1) user_lst = [] try: for user in users: user["add_time"] = int(user.get("date_joined").timestamp()) if user.get("date_joined", "") else int(time.time()) device_user = DeviceUser.objects.filter(uid=user.get("id"), state__in=[0, 1]).values("real_name", "mobile", "state", "remark", "role_id") if device_user: user.update(device_user[0]) user["role_name"] = Role.objects.get(id=user.get("role_id")).role_name user_lst.append(user) else: user.update( { "real_name": "", "mobile": 0, "state": 0, "remark": "", "role_name": "" } ) user_tp = user_lst[(page_num - 1) * page_size: page_num * page_size] return Response({"code": 0, "msg": "success", "data":user_tp, "count": len(user_lst)}) except Exception as e: print(e) return Response({"code": 2, "msg": "用户列表失败"}) class UserAddAPIView(APIView): def post(self, request): # 新增用户 try: request_data = request.data username = request_data.get("username") password = request_data.get("password") real_name = request_data.get("real_name") mobile = request_data.get("mobile") role_id = request_data.get("role_id") state = request_data.get("state") # 1 正常 user = User.objects.filter(username=username, is_active=1) if user: return Response({"code": 2, "msg": "该用户已存在"}) user = User.objects.create_user(username=username, password=password) device_user = DeviceUser.objects.create( uid = user.id, real_name = real_name, mobile = mobile, role_id = role_id, state = state ) if user and device_user: return Response({"code": 0, "msg": "success"}) else: return Response({"code": 2, "msg": "创建用户失败"}) except Exception as e: logger.error(f"创建用户失败: {e}") return Response({"code": 2, "msg": "创建用户失败"}) class UserUpdateAPIView(APIView): def post(self, request): # 修改用户 request_data = request.data uid = request_data.get("uid") password = request_data.get("password") real_name = request_data.get("real_name") mobile = request_data.get("mobile") role_id = request_data.get("role_id") state = request_data.get("state") try: if not uid: return Response({"code": 2, "msg": "请输入用户ID"}) if password: user = User.objects.filter(id=uid).first() user.set_password(password) user.save() DeviceUser.objects.filter(uid=uid).update( real_name = real_name, mobile = mobile, role_id = role_id, state = state ) return Response({"code": 0, "msg": "success"}) except Exception as e: return Response({"code": 2, "msg": "修改用户失败"}) class UserDeleteAPIView(APIView): def post(self, request): # 删除用户 request_data = request.data uid = request_data.get("uid") try: User.objects.filter(id=uid).delete() DeviceUser.objects.filter(uid=uid).update(state=4) return Response({"code": 0, "msg": "success"}) except Exception as e: return Response({"code": 2, "msg": "修改用户失败"}) class PerAPIView(APIView): def post(self, request): # 权限列表 data = get_all_pers() return Response({"code": 0, "data": data}) class LoginOutAPIView(APIView): def post(self, request): # 退出登录 request.session.flush() return Response({"code": 0, "msg": "success"}) class HomeThemeModelAPIView(APIView): def post(self, request): # 修改主题 request_data = request.data logo_url = request_data.get("logo_url") title_name = request_data.get("title_name") copyright = request_data.get("copyright") theme = HomeThemeModel.objects.update_or_create( id = 1, defaults={ "logo_url": logo_url, "title_name": title_name, "copyright": copyright } ) if theme: return Response({"code": 0, "msg": "success"}) else: return Response({"code": 2, "msg": "保存主题失败"}) class HomeThemeModelListAPIView(APIView): def post(self, request): # 展示主题信息 try: query = HomeThemeModel.objects.all().first() serializer = HomeThemeModelSerializers(query) return Response({"code": 0, "msg": "success", "data": serializer.data}) except Exception as e: print(e) return Response({"code": 2, "msg": "请联系管理员初始化主题"})