# -*- coding: utf-8 -*- # File Name:mqtt_chat_client.py # Python Version:3.5.1 import os import django import sys BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录 sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir))) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "yfwlw_pro.settings") # project_name 项目名称 django.setup() print("<-----python_mqtt_client_cbd is run----->") import paho.mqtt.client as mqtt import json from apps.AppInfoManage.models import Equip, Equip_type, CBDdata, CBDstatus, MyUser,Alarm_record, CBDstatus_all, RecentCBDdata import re import datetime import shutil,os import threading class CJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj) # 连接后的操作: 0为成功 def on_connect(client, userdata, flags, rc): # print("Connected with result code "+str(rc)) client.subscribe("/yfkj/cbd/pub/#") client.subscribe("/yfkj/cbd/offline/#") # 发布消息完成回调函数: def on_publish(msg, rc): if rc == 0: print("publish success,msg = " + msg) def msg_thread(msg): print('\r') print('=================================================') print('\r') print("<-----topic:\n" + msg.topic + ';\n') print("Message:\n" + str(msg.payload) + "----->\n") # 从主题中获取imei # imei = msg.topic[14:len(msg.topic)] imei = re.sub("\D", "", msg.topic) print("<-----imei:", imei, "----->") # 判断主题: if "pub" in msg.topic: # 将json字符串解析: payload = json.loads(msg.payload.decode()) if payload.get("cmd") == "status": print("<-----uploading status!----->") extdata = payload.get("ext") # print("extdata:", extdata) cbd_exist = Equip.objects.filter(equip_id=imei) # 设备存在,进一步判断状态表是否存在: if cbd_exist.exists(): try: e_id = Equip.objects.get(equip_id=imei) CBDstatus_all.objects.create( equip_id=e_id, cbd_status=extdata, lng=extdata['lng'], lat=extdata['lat'],is_online = '1') print("<-----status_all upload success!----->") except: print("<-----status_all upload failed!----->") # 设备状态表存在、刷新状态表: print("<-----this equip is existed!----->") if CBDstatus.objects.filter(equip_id=imei).exists(): print("<-----this equip's status is existed!----->") try: sta = CBDstatus.objects.get(equip_id=imei) sta.cbd_status=extdata sta.lng=extdata['lng'] sta.lat=extdata['lat'] sta.is_online = '1' sta.save() print("<-----status update success!----->") except: print("<-----status update failed!----->") else: # 设备状态表不存在、创建状态表: print("<-----this equip's status is not existed!----->") try: e_id = Equip.objects.get(equip_id=imei) try: CBDstatus.objects.create(equip_id=e_id, cbd_status=extdata, lng=extdata['lng'], lat=extdata['lat'], is_online = '1' ) print("<-----this equip's status table re-create successed!----->") except: print("<-----this equip's status table re-create failed!----->") except: print("<-----this equip didn't exist!----->") else: # 设备不存在,在设备列表中创建: try: # 得到设备类型实例: equip_t = Equip_type.objects.get(type_id=extdata['dtype']) try: e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t) print("<-----this imei add successed!----->") try: dver = extdata['dver'] if dver.startswith("5") and dver.endswith("HK"): etype = "4.0测报灯" elif dver.startswith("4") and dver.endswith("HK"): etype = "3.0测报灯" elif dver.startswith("4") and dver.endswith("re"): etype = "3.0测报灯" elif dver.startswith("1") or dver.startswith("2") or dver.startswith("3"): etype = "2.0测报灯" elif dver.endswith("ZP"): etype = "1.0测报灯" elif dver.startswith("4") and dver.endswith("GK"): etype = "高空测报灯" elif dver.endswith("TC"): etype = "糖醋测报灯" elif dver.startswith("5") and dver.endswith("gk"): etype = "简易高空测报灯" CBDstatus.objects.create(equip_id=e_id, cbd_status=extdata, lng=extdata['lng'], lat=extdata['lat'],etype=etype, is_online = '1' ) print("<-----this imei register successed!----->") except: print("<-----this imei register failed!----->") except: print("<-----this imei add failed!----->") except: print("<-----register failed because this equip type is not existed!----->") # rtn="register success!!!" # print("注册连接回应:"+"/yfkj/scd/cmd/"+payload.get("equip_id")) # client.publish("/yfkj/scd/cmd/"+payload.get("equip_id"), json.dumps({"msg": rtn})) # 上传数据: elif payload.get("cmd") == "data": print("<-----uploading data!----->") extdata = payload.get("ext") # print("extdata:", extdata) # 新建数据记录: try: e_id = Equip.objects.get(equip_id=imei) CBDdata.objects.create(equip_id=e_id, cbd_data=extdata) print("<-----data upload success!----->") except: print("<-----data upload failed!----->") # 如果是新设备,表里没有,需要做判断 if RecentCBDdata.objects.filter(equip_id=imei).exists(): # 设备存在,更新数据 try: Recdata = RecentCBDdata.objects.get(equip_id=imei) Recdata.cbd_data=extdata Recdata.upl_time=datetime.datetime.now() Recdata.save() print("<-----RecentCBDdata update success!----->") except: print("<-----RecentCBDdata update failed!----->") else: # 如果是新设备,则新建数据 try: print("<-----this imei add successed!----->") e_id = Equip.objects.get(equip_id=imei) etype = CBDstatus.objects.get(equip_id=e_id).etype RecentCBDdata.objects.create( equip_id=e_id, cbd_data=extdata, etype=etype, upl_time=datetime.datetime.now()) print("<-----RecentCBDdata create success!----->") except: print("<-----RecentCBDdata create failed!----->") # 参数配置信息: elif payload.get("cmd") == "paramconf": print("<-----uploading paramconf!----->") extdata = payload.get("ext") # print("extdata:", extdata) # 更新状态表中的参数配置信息; try: sta = CBDstatus.objects.get(equip_id=imei) sta.paramconf=extdata sta.save() print("<-----status.paramconf update success!----->") except: print("<-----cbdstatus table is not exist,status.paramconf upload failed!----->") # 设备不能拍照应急处理: elif payload.get("cmd") == "picture": try: nowtime = datetime.datetime.now().strftime('%Y%m%d') pic_name = datetime.datetime.now().strftime('%H%M%S') path = "/data/yfwlw/pyftp/ftp_file/ykm_cbd/" + imei + "/" + nowtime isExists=os.path.exists(path) # 判断结果 if not isExists: # 如果不存在则创建目录 # 创建目录操作函数 os.makedirs(path) print(path+' 创建成功') else: # 如果目录存在则不创建,并提示目录已存在 print(path+' 目录已存在') dest = "/data/yfwlw/pyftp/ftp_file/ykm_cbd/" + imei + "/" + nowtime + "/" + pic_name + ".jpg" shutil.copy("/data/yfwlw/logs/aaaa.jpg",dest) print("picture------------------success") except: print("picture-------------------failed") # 设备异常预警 elif payload.get("cmd") == "warn": print("<-----uploading warn!----->") print("%s is warn ! ! !" % imei) extdata = payload.get("ext") cbd_exist = Equip.objects.filter(equip_id=imei) # 设备存在,进一步判断状态表是否存在: if cbd_exist.exists(): try: e_id = Equip.objects.get(equip_id=imei) # 创建预警记录: Alarm_record.objects.create(equip_id=e_id, alarm_desc=extdata, e_type="3") print("update warn ok!") except: print("update warn failed!") else: print("this imei is not exist!") # 4.0带RE上报RTU信息 elif payload.get("cmd") == "rtu": print("<-----uploading rtu!----->") extdata = payload.get("ext") cbd_exist = Equip.objects.filter(equip_id=imei) # 设备存在,进一步判断状态表是否存在: if cbd_exist.exists(): print("=======>>",extdata["version"]) if extdata["version"] == "": pass else: try: sta = CBDstatus.objects.get(equip_id=imei) sta.rtuinfo=extdata sta.save() print("update rtu_info success!") except: print("update rtu_info failed!") else: print("this imei is not exist!") # 修改imei的板子上报原始imei elif payload.get("cmd") == "inquire" or payload.get("cmd") == "imei": print("<-----uploading inquire!----->") extdata = payload.get("ext") cbd_exist = Equip.objects.filter(equip_id=imei) # 设备存在,进一步判断状态表是否存在: if cbd_exist.exists(): print("=======>>",extdata) try: print("上报设备号为",extdata["status"]) sta = CBDstatus.objects.get(equip_id=imei) sta.old_eid=extdata["status"] sta.save() print("update rtu_info success!") except: print("update rtu_info failed!") else: print("this imei is not exist!") # 离线消息: elif "offline" in msg.topic: try: # 将json字符串解析: payload = json.loads(msg.payload.decode()) if payload.get("cmd") == "offline": print("<-----离线消息!----->") print("%s is offline!" % imei) cbd_exist = Equip.objects.filter(equip_id=imei) # 设备存在,进一步判断状态表是否存在: if cbd_exist.exists(): try: e_id = Equip.objects.get(equip_id=imei) # now_time = json.dumps(datetime.datetime.now(), cls=CJSONEncoder) # 更新状态表中未离线状态: CBDstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now()) # 增加状态历史的离线消息 sta1 = CBDstatus.objects.get(equip_id=imei) # sta1.update(is_online = '0',off_time = datetime.datetime.now()) CBDstatus_all.objects.create(equip_id=e_id, off_time = datetime.datetime.now(), cbd_status=sta1.cbd_status,lng=sta1.lng, lat=sta1.lat, is_online = '0') # 创建预警记录: Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}", e_type="3") print("update offline ok!") except: print("update offline failed!") else: print("this imei is not exist!") except: pass # 从服务器接收消息的回调函数 : def on_message(client, userdata, msg): t = threading.Thread(target=msg_thread,args=(msg,)) #打印出当前线程的名称和id # print(threading.currentThread().name) # t.setDaemon(True) t.start() return if __name__ == '__main__': client = mqtt.Client( client_id="PY_MQTT_CLIENT_CBD", clean_session=True, userdata=None, # protocol=MQTTv311,# 数据库版本 ) # 必须设置,否则会返回「Connected with result code 4」 client.username_pw_set("admin", "password") #设置连接上服务器回调函数: client.on_connect = on_connect #设置接收到服务器消息回调函数: client.on_message = on_message # #设置与服务器断开连接回调函数 # client.on_disconnect = on_disconnect HOST = "127.0.0.1" client.connect(HOST, 1883, 60) client.loop_forever() # # 输入发布的话题名称: # # user = input("请输入名称:") # topic = "/yfkj/scd/cmd/2001" # client.user_data_set(topic) # client.loop_start() # while True: # str = input() # if str: # client.publish("/yfkj/scd/cmd/2001", json.dumps({"topic": topic, "cmd": str}))