# coding:utf-8 import pymongo import pymysql import pymysql.cursors from django.conf import settings class MySQLTool: def __init__(self): config = settings.USER_CONFIG['mysql'] self.conn = pymysql.connect( host=config['host'], user=config['user'], password=config['pwd'], port=config['port'], database='smartfarming', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) def execute_by_one(self, sql): cursor = self.conn.cursor() cursor.execute(sql) result = cursor.fetchone() cursor.close() if self.conn: self.conn.close() return result class MongoDBTools: def __init__(self, db_name, table_name): config = settings.USER_CONFIG['mongodb'] host, port = config['host'], config['port'] user, password = config['user'], config['pwd'] self.client = pymongo.MongoClient(host=host, port=port, username=user, password=password) self.db_name = db_name self.table_name = table_name self.tb = self.client[self.db_name][self.table_name] def find_one(self, wheres, options=None, is_reverse=True): if not options: options = {} projection = {'_id': 0} projection.update(options) sort = [] if is_reverse: sort.append(('_id', pymongo.DESCENDING)) result = self.tb.find_one(wheres, projection=projection, sort=sort) if not result: result = {} return result def find_many(self, wheres, options=None, is_reverse=True, skip=0, limit=0, sort=None): if not options: options = {} projection = {'_id': 0} projection.update(options) temp_sort = [] if sort: temp_sort.extend(sort) if is_reverse: temp_sort.append(('_id', pymongo.DESCENDING)) result = self.tb.find(wheres, projection=projection, skip=skip, limit=limit, sort=temp_sort) if not result: result = [] return result