from rest_framework.views import APIView from rest_framework.response import Response import sqlite3 import logging import json province_dict = { "11": '北京市', "12": '天津市', "13": '河北省', "14": '山西省', "15": '内蒙古自治区', "21": '辽宁省', "22": '吉林省', "23": '黑龙江省', "31": '上海市', "32": '江苏省', "33": '浙江省', "34": '安徽省', "35": '福建省', "36": '江西省', "37": '山东省', "41": '河南省', "42": '湖北省', "43": '湖南省', "44": '广东省', "45": '广西壮族自治区', "46": '海南省', "50": '重庆市', "51": '四川省', "52": '贵州省', "53": '云南省', "54": '西藏自治区', "61": '陕西省', "62": '甘肃省', "63": '青海省', "64": '宁夏回族自治区', "65": '新疆维吾尔自治区' } logging.basicConfig(level=logging.DEBUG, filename='/data/logs/app.log', filemode='w', format='%(asctime)s - %(levelname)s - %(message)s') class GetWeather(APIView): def post(self, request): # 对外提供天气接口,需要提供省,市,区(县) try: db = "/data/weather/weather.db" data = request.data cityid = data.get("cityid", "") province = data.get("province") city = data.get("city") district = data.get("district") day_type = data.get("day_type") conn = sqlite3.connect(db) cursor = conn.cursor() username = data.get("username") password = data.get("password") # sql = """ # select * from user where username = ? and password = ? # """ # cursor.execute(sql, (username, password,)) # user = cursor.fetchone() user_info = { "admin001": "why123456", "admin001": " yf@mtg", "fujinsuyuan": "yf@fj" } if not (user_info.get(username) == password): return Response({"msg":"认证失败", "code": 400}) table = "day_data" if day_type == "1" else "serven_day_data" table = "day_data" if day_type == "1" else "server_day_data" if cityid: logging.warning(f"{cityid}: 使用cityid查询") sql = f"select content from {table} where cityid = '{cityid}'" cursor.execute(sql) result = cursor.fetchone() if result: temp = result[0].replace("'", '"') result = json.loads(temp) return Response({"content": result, "msg": "success", "code": 200}) else: return Response({"msg": "请联系管理员排查", "code": 500}) else: logging.warning("使用province,city,distinct查询") sql = f"select content from {table} where province = '{province}' and city ='{city}' and district ='{district}'" cursor.execute(sql) result = cursor.fetchone() if not result: # 省市正确 sql = f"select content from {table} where province= '{province}' and city = '{city}'" cursor.execute(sql) result = cursor.fetchone() if not result: # 省正确 sql = f"select content from {table} where province = '{province}'" cursor.execute(sql) result = cursor.fetchone() if result: try: temp = result[0].replace("'", '"') result = json.loads(temp) return Response({"content": result, "msg": "success", "code": 200}) except Exception as e: logging.info(e) return Response({"msg": "请联系管理员", "code": "50001"}) else: return Response({"content": "", "msg": "success", "code": 500}) except Exception as e: logging.info(e) return Response({"msg": "请联系管理员", "code": "50001"}) class TestAPI(APIView): def post(self, request): return Response({"test": 111})