# -*- coding: utf-8 -*- # File Name:mqtt_chat_client.py # Python Version:3.5.1 import os import django os.environ.setdefault("DJANGO_SETTINGS_MODULE", "second_pro.settings") # project_name 项目名称 django.setup() print("<-----python_mqtt_client_qxz is run----->") import paho.mqtt.client as mqtt import json from apps.AppInfo.models import Equip, Equip_type, MyUser, QXZdata, QXZstatus, QXZdata_New, QXZstatus_New import re import datetime import copy class CJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj) # 连接后的操作: 0为成功 def on_connect(client, userdata, flags, rc): # print("Connected with result code "+str(rc)) client.subscribe("/yfkj/qxz/pub/#") # *****成功发布****** def on_publish(msg, rc): if rc == 0: print("publish success,msg = "+msg) dict_qxz = {"e1":"","e2":"","e3":"","e4":"","e5":"","e6":"","e7":"","e8":"","e9":"","e10":"","e11":"","e12":"","e13":"","e14":""} # 从服务器接受到消息后回调此函数 : def on_message(client, userdata, msg): print('\r') print('=================================================') print('\r') print("<-----topic:\n" + msg.topic + ';\n') print("Message:\n" + str(msg.payload) + "----->\n") # 从主题中获取imei # imei = msg.topic[14:len(msg.topic)] imei = re.sub("\D", "", msg.topic) print("<-----imei:", imei, "----->") try: # 判断主题: if "pub" in msg.topic: # 将json字符串解析: payload = json.loads(msg.payload.decode()) qxz_exist = Equip.objects.filter(equip_id=imei) if payload.get("cmd") == "terminalData": print("<-----uploading data!----->") extdata = payload.get("ext") # print("extdata:", extdata) # 设备存在,进一步判断状态表是否存在: if qxz_exist.exists(): print("<-----this equip is existed!----->") try: e_id = Equip.objects.get(equip_id=imei) except: print("<-----this equip didn't exist!----->") try: # 设备数据表直接储存数据 print("-----------数据类型---------") dict_a = copy.deepcopy(dict_qxz) for i in extdata['data']: dict_a[i["eKey"]] = i["eValue"] + "#" + i["eNum"] + "#" + i["eKey"] print(dict_a) QXZdata_New.objects.create( equip_id=e_id, e1 = dict_a["e1"], e2 = dict_a["e2"], e3 = dict_a["e3"], e4 = dict_a["e4"], e5 = dict_a["e5"], e6 = dict_a["e6"], e7 = dict_a["e7"], e8 = dict_a["e8"], e9 = dict_a["e9"], e10 = dict_a["e10"], e11 = dict_a["e11"], e12 = dict_a["e12"], e13 = dict_a["e13"], e14 = dict_a["e14"] ) print("-----------数据类型---------") QXZdata.objects.create(equip_id=e_id, qxz_data=extdata) print("<-----data update success!----->") except: print("<-----data update failed!----->") # 设备状态表存在、刷新状态表: if QXZstatus.objects.filter(equip_id=imei).exists(): print("<-----this equip's status is existed!----->") try: sta = QXZstatus_New.objects.get(equip_id=imei) sta.e1=dict_a["e1"] sta.e2=dict_a["e2"] sta.e3=dict_a["e3"] sta.e4=dict_a["e4"] sta.e5=dict_a["e5"] sta.e6=dict_a["e6"] sta.e7=dict_a["e7"] sta.e8=dict_a["e8"] sta.e9=dict_a["e9"] sta.e10=dict_a["e10"] sta.e11=dict_a["e11"] sta.e12=dict_a["e12"] sta.e13=dict_a["e13"] sta.e14=dict_a["e14"] sta.save() print("<-----status update success!----->") except: print("<-----status update failed!----->") else: # 设备状态表不存在、创建状态表: print("<-----this equip's status is not existed!----->") try: QXZstatus.objects.create( equip_id=e_id, e1 = dict_a["e1"], e2 = dict_a["e2"], e3 = dict_a["e3"], e4 = dict_a["e4"], e5 = dict_a["e5"], e6 = dict_a["e6"], e7 = dict_a["e7"], e8 = dict_a["e8"], e9 = dict_a["e9"], e10 = dict_a["e10"], e11 = dict_a["e11"], e12 = dict_a["e12"], e13 = dict_a["e13"], e14 = dict_a["e14"] ) print("<-----this equip's status table re-create successed!----->") except: print("<-----this equip's status table re-create failed!----->") else: # 设备不存在,在设备列表中创建: equip_t = Equip_type.objects.get(type_id=5) try: e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t) print("<-----this imei add successed!----->") try: print(extdata) # 设备数据表直接储存数据 QXZdata.objects.create(equip_id=e_id, qxz_data=extdata) print("<-----data update success!----->") except: print("<-----data update failed!----->") # try: # QXZstatus.objects.create(equip_id=e_id, qxz_status=extdata) # print("<-----this imei register successed!----->") # except: # print("<-----this imei register failed!----->") except: print("<-----this imei add failed!----->") elif payload.get("cmd") == "config": sta = QXZstatus_New.objects.get(equip_id=imei) extdata = payload.get("ext") interval = extdata['interval'] sta.interval = interval sta.save() except: pass if __name__ == '__main__': client = mqtt.Client( client_id="PY_MQTT_CLIENTC_QXZ1", clean_session=True, userdata=None, # protocol=MQTTv311,# 数据库版本 ) # 必须设置,否则会返回「Connected with result code 4」 client.username_pw_set("admin", "password") client.on_connect = on_connect client.on_message = on_message HOST = "127.0.0.1" client.connect(HOST, 1883, 60) client.loop_forever()