| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- from django.contrib.auth import authenticate
- from rest_framework.views import APIView
- from smartfarming.models.user import DeviceUser, MongoUserLoginLog, Role, HomeThemeModel
- import time
- import logging
- from kedong.utils import PrAes
- from smartfarming.utils import get_perm_list, get_all_pers, get_captcha
- from rest_framework.response import Response
- from django.contrib.auth.models import User
- from smartfarming.serializers.user_serializers import RoleSerializers, HomeThemeModelSerializers
- from django.core.paginator import Paginator
- from kedong import settings
- from django.db.models import Q
- from kedong.tools import RedisPool
- from smartfarming.models.agriculture import APKLogs
- import logging
- logger = logging.getLogger("myapp")
- class RoleAPIView(APIView):
- def post(self, request):
- # 获取角色列表
- request_data = request.data
- page_num = int(request_data.get("pagenum", 1))
- page_size = int(request_data.get("pagesize", 10))
- role_name = request_data.get("role_name")
- if role_name:
- queryset = Role.objects.filter(role_status=1, role_name__icontains=role_name).order_by("-id")
- else:
- queryset = Role.objects.filter(role_status=1).order_by("-id")
- try:
- total_obj = queryset.count()
- paginator = Paginator(queryset, page_size)
- page_obj = paginator.get_page(page_num)
- serializer = RoleSerializers(page_obj, many=True)
- return Response({"code": 0, "data": serializer.data, "count": total_obj})
- except Exception as e:
- return Response({"code": 2, "msg": "获取角色列表失败"})
-
- class RoleAddAPIView(APIView):
- def post(self, request):
- # 新增角色
- request_data = request.data
- role_name = request_data.get("role_name")
- role_perm = request_data.get("role_perm")
- mark = request_data.get("mark")
- if not role_name or not role_perm:
- return Response({"code": 2, "msg": "请输入角色名称与权限"})
- role = Role.objects.create(
- role_name=role_name,
- role_message=request_data.get("role_message"),
- role_perm=role_perm,
- mark=mark,
- addtime=int(time.time()),
- uptime=int(time.time())
- )
- if role:
- return Response({"code": 0, "msg": "success"})
- else:
- return Response({"code": 2, "msg": "创建角色失败"})
-
- class RoleDelAPIView(APIView):
- def post(self, request):
- # 删除角色
- request_data = request.data
- role_id = request_data.get("role_id")
- device_user = DeviceUser.objects.filter(role_id=role_id)
- if device_user:
- return Response({"code": 2, "msg": "该角色下存在用户,请删除该角色下的用户后再删除该角色"})
- else:
- Role.objects.filter(id=role_id).update(role_status=0)
- return Response({"code": 0, "msg": "success"})
-
- class RoleUpdateAPIView(APIView):
- def post(self, request):
- # 更新角色
- request_data = request.data
- role_id=request_data.get("role_id")
- try:
- update = Role.objects.filter(id=role_id).update(
- role_name=request_data.get("role_name"),
- role_message=request_data.get("role_message"),
- role_perm=request_data.get("role_perm"),
- mark=request_data.get("mark"),
- uptime=int(time.time())
- )
- return Response({"code": 0, "msg": "success"})
- except Exception as e:
- return Response({"code": 2, "msg": "更新角色失败"})
-
- class APPUserLogin(APIView):
- permission_classes = []
- authentication_classes = []
- def post(self, request, format=None):
- # 用户登录
- request_data = request.data
- username = request_data.get("username")
- password = request_data.get("password")
- user = authenticate(username=username, password=password)
- if user:
- ip = request.META.get('HTTP_X_FORWARDED_FOR', "")
- if not ip:
- ip = request.META.get('REMOTE_ADDR', "")
- request.session["user_id"] = user.id
- request.session.save()
- MongoUserLoginLog(uid=user.id, ip=ip, addtime=int(time.time())).save()
- device_user = DeviceUser.objects.get(uid=user.id)
- real_name = device_user.real_name
- mobile = device_user.mobile
- en_text = PrAes.aesencrypt(request.session.session_key)
- return Response({"msg": "success", "code": 0, "data": {
- "real_name": real_name,
- "mobile": mobile,
- "username": username,
- "session_key":en_text
- }})
- else:
- return Response({"msg": "失败", "code": 2})
- class LoginAPIView(APIView):
- permission_classes = []
- authentication_classes = []
- def post(self, request, format=None):
- # 用户登录
- request_data = request.data
- code_str = request_data.get("code_str")
- captcha = request_data.get("captcha")
- username = request_data.get("username")
- password = request_data.get("password")
- captcha_low = ""
- if not (code_str or captcha):
- msg = "请输入验证码"
- return Response({"msg": msg, "code": 2})
- else:
- captcha_low = captcha.lower()
- key = f"{code_str}_{captcha_low}"
- redis_tools = RedisPool().get_redis_pool(settings.redis_db["captcha"])
- is_captcha = redis_tools.get(key)
- if is_captcha:
- if captcha_low == captcha_low:
- pass
- else:
- return Response({"msg": "验证码校验失败", "code": 2})
- else:
- return Response({"msg": "验证码已过期或验证码错误", "code": 2})
- user = authenticate(username=username, password=password)
- if user:
- redis_tools.delete(key)
- device_user = DeviceUser.objects.filter(uid=user.id, state=1)
- device_user = device_user.first() if device_user else None
- msg = ""
- if not device_user:
- msg = "用户不存在或已被禁止登录"
- else:
- ip = request.META.get('HTTP_X_FORWARDED_FOR', "")
- if not ip:
- ip = request.META.get('REMOTE_ADDR', "")
- now_time = int(time.time())
- device_user.login_time = now_time
- request.session["user_id"] = user.id
- request.session.save()
- MongoUserLoginLog(uid=user.id, ip=ip,addtime=now_time).save()
- en_text = PrAes.aesencrypt(request.session.session_key)
- myuser_type = device_user.user_type
- username = device_user.real_name
- data = {
- "username":username,
- "myuser_type":myuser_type,
- "user_login_time":now_time,
- "myuid":user.id
- }
- return Response({"session_key":en_text,"status":True,"data":data, "code": 0})
- else:
- msg = "用户名或密码错误"
- return Response({"msg": msg, "code": 2})
-
- class CaptchaView(APIView):
- permission_classes = []
- authentication_classes = []
- def post(self, request, *args, **kwargs):
- """刷新验证码接口"""
- prefix = str(int(time.time()*1000))
- code_str, img_data = get_captcha()
- key = f"{prefix}_{code_str}"
- key_lower = key.lower()
- redis_tools = RedisPool().get_redis_pool(settings.redis_db["captcha"])
- redis_tools.set(key_lower, code_str, 300)
- return Response({'code_str': prefix, 'img_data': img_data, 'code': 0})
- class LoginInfoAPIView(APIView):
- def post(self, request, format=None):
- # 获取登录信息
- try:
-
- device_user = request.myuser
- perm_list, mark = get_perm_list(device_user)
- user = User.objects.get(id=device_user.uid)
- username = user.username
- if not perm_list:
- return Response({"code": 2, "msg": "请为用户分配角色"})
- real_name = device_user.real_name
- # 增加二维码链接
- app = APKLogs.objects.all().order_by("-upltime")
- qr_code = ""
- if app:
- app = app.first()
- qr_code = app.history_qr_code
- data = {
- "username":real_name if real_name else username,
- "children":perm_list,
- "mark": mark,
- "myuser_type":device_user.user_type,
- "user_login_time":int(time.time()),
- "myuid":device_user.uid,
- "qr_code": qr_code
- }
- return Response({"code": 0, "msg": "success", "data":data})
- except Exception as e:
- logger.error(f"获取权限列表失败: {e}")
- return Response({"code": 3, "msg": "获取权限列表失败"})
-
- class UserListAPIView(APIView):
- def post(self, request):
- # 用户列表 state 启用0 禁用 1 删除 4
- request_data = request.data
- search = request_data.get("search")
- page_num = int(request_data.get("pagenum", 1))
- page_size = int(request_data.get("pagesize", 10))
- users = User.objects.filter(is_active=1).exclude(username='yunfei').values("id", "username", "date_joined").order_by("-date_joined")
- uids = []
- if search:
- device_user = DeviceUser.objects.filter(Q(real_name__icontains=search) | Q(mobile__icontains=search))
- uids = [i.uid for i in device_user]
- if uids:
- users = users.filter(id__in = uids, is_active=1)
- user_lst = []
- try:
- for user in users:
- user["add_time"] = int(user.get("date_joined").timestamp()) if user.get("date_joined", "") else int(time.time())
- device_user = DeviceUser.objects.filter(uid=user.get("id"), state__in=[0, 1]).values("real_name", "mobile", "state", "remark", "role_id")
- if device_user:
- user.update(device_user[0])
- user["role_name"] = Role.objects.get(id=user.get("role_id")).role_name
- user_lst.append(user)
- else:
- user.update(
- {
- "real_name": "",
- "mobile": 0,
- "state": 0,
- "remark": "",
- "role_name": ""
- }
- )
- user_tp = user_lst[(page_num - 1) * page_size: page_num * page_size]
- return Response({"code": 0, "msg": "success", "data":user_tp, "count": len(user_lst)})
- except Exception as e:
- print(e)
- return Response({"code": 2, "msg": "用户列表失败"})
-
- class UserAddAPIView(APIView):
- def post(self, request):
- # 新增用户
- try:
- request_data = request.data
- username = request_data.get("username")
- password = request_data.get("password")
- real_name = request_data.get("real_name")
- mobile = request_data.get("mobile")
- role_id = request_data.get("role_id")
- state = request_data.get("state") # 1 正常
- user = User.objects.filter(username=username, is_active=1)
- if user:
- return Response({"code": 2, "msg": "该用户已存在"})
- user = User.objects.create_user(username=username, password=password)
- device_user = DeviceUser.objects.create(
- uid = user.id,
- real_name = real_name,
- mobile = mobile,
- role_id = role_id,
- state = state
- )
- if user and device_user:
- return Response({"code": 0, "msg": "success"})
- else:
- return Response({"code": 2, "msg": "创建用户失败"})
- except Exception as e:
- logger.error(f"创建用户失败: {e}")
- return Response({"code": 2, "msg": "创建用户失败"})
- class UserUpdateAPIView(APIView):
- def post(self, request):
- # 修改用户
- request_data = request.data
- uid = request_data.get("uid")
- password = request_data.get("password")
- real_name = request_data.get("real_name")
- mobile = request_data.get("mobile")
- role_id = request_data.get("role_id")
- state = request_data.get("state")
- try:
- if not uid:
- return Response({"code": 2, "msg": "请输入用户ID"})
- if password:
- user = User.objects.filter(id=uid).first()
- user.set_password(password)
- user.save()
- DeviceUser.objects.filter(uid=uid).update(
- real_name = real_name,
- mobile = mobile,
- role_id = role_id,
- state = state
- )
- return Response({"code": 0, "msg": "success"})
- except Exception as e:
- return Response({"code": 2, "msg": "修改用户失败"})
-
- class UserDeleteAPIView(APIView):
- def post(self, request):
- # 删除用户
- request_data = request.data
- uid = request_data.get("uid")
- try:
- User.objects.filter(id=uid).delete()
- DeviceUser.objects.filter(uid=uid).update(state=4)
- return Response({"code": 0, "msg": "success"})
- except Exception as e:
- return Response({"code": 2, "msg": "修改用户失败"})
- class PerAPIView(APIView):
- def post(self, request):
- # 权限列表
- data = get_all_pers()
- return Response({"code": 0, "data": data})
- class LoginOutAPIView(APIView):
- def post(self, request):
- # 退出登录
- request.session.flush()
- return Response({"code": 0, "msg": "success"})
- class HomeThemeModelAPIView(APIView):
- def post(self, request):
- # 修改主题
- request_data = request.data
- logo_url = request_data.get("logo_url")
- title_name = request_data.get("title_name")
- copyright = request_data.get("copyright")
- theme = HomeThemeModel.objects.update_or_create(
- id = 1, defaults={
- "logo_url": logo_url,
- "title_name": title_name,
- "copyright": copyright
- }
- )
- if theme:
- return Response({"code": 0, "msg": "success"})
- else:
- return Response({"code": 2, "msg": "保存主题失败"})
- class HomeThemeModelListAPIView(APIView):
- def post(self, request):
- # 展示主题信息
- try:
- query = HomeThemeModel.objects.all().first()
- serializer = HomeThemeModelSerializers(query)
- return Response({"code": 0, "msg": "success", "data": serializer.data})
- except Exception as e:
- print(e)
- return Response({"code": 2, "msg": "请联系管理员初始化主题"})
|