| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- # -*- coding: utf-8 -*-
- # File Name:mqtt_chat_client.py
- # Python Version:3.5.1
- import os
- import django
- os.environ.setdefault("DJANGO_SETTINGS_MODULE",
- "second_pro.settings") # project_name 项目名称
- django.setup()
- print("<-----python_mqtt_client_qxz is run----->")
- import paho.mqtt.client as mqtt
- import json
- from apps.AppInfo.models import Equip, Equip_type, MyUser, QXZdata, QXZstatus, QXZdata_New, QXZstatus_New
- import re
- import datetime
- import copy
- class CJSONEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(obj, date):
- return obj.strftime('%Y-%m-%d')
- else:
- return json.JSONEncoder.default(self, obj)
- # 连接后的操作: 0为成功
- def on_connect(client, userdata, flags, rc):
- # print("Connected with result code "+str(rc))
- client.subscribe("/yfkj/qxz/pub/#")
- # *****成功发布******
- def on_publish(msg, rc):
- if rc == 0:
- print("publish success,msg = "+msg)
- dict_qxz = {"e1":"","e2":"","e3":"","e4":"","e5":"","e6":"","e7":"","e8":"","e9":"","e10":"","e11":"","e12":"","e13":"","e14":""}
- # 从服务器接受到消息后回调此函数 :
- def on_message(client, userdata, msg):
- print('\r')
- print('=================================================')
- print('\r')
- print("<-----topic:\n" + msg.topic + ';\n')
- print("Message:\n" + str(msg.payload) + "----->\n")
- # 从主题中获取imei
- # imei = msg.topic[14:len(msg.topic)]
- imei = re.sub("\D", "", msg.topic)
- print("<-----imei:", imei, "----->")
- try:
- # 判断主题:
- if "pub" in msg.topic:
- # 将json字符串解析:
- payload = json.loads(msg.payload.decode())
- qxz_exist = Equip.objects.filter(equip_id=imei)
- if payload.get("cmd") == "terminalData":
- print("<-----uploading data!----->")
- extdata = payload.get("ext")
- # print("extdata:", extdata)
- # 设备存在,进一步判断状态表是否存在:
- if qxz_exist.exists():
- print("<-----this equip is existed!----->")
- try:
- e_id = Equip.objects.get(equip_id=imei)
- except:
- print("<-----this equip didn't exist!----->")
- try:
- # 设备数据表直接储存数据
- print("-----------数据类型---------")
- dict_a = copy.deepcopy(dict_qxz)
- for i in extdata['data']:
- dict_a[i["eKey"]] = i["eValue"] + "#" + i["eNum"] + "#" + i["eKey"]
- print(dict_a)
- QXZdata_New.objects.create(
- equip_id=e_id,
- e1 = dict_a["e1"],
- e2 = dict_a["e2"],
- e3 = dict_a["e3"],
- e4 = dict_a["e4"],
- e5 = dict_a["e5"],
- e6 = dict_a["e6"],
- e7 = dict_a["e7"],
- e8 = dict_a["e8"],
- e9 = dict_a["e9"],
- e10 = dict_a["e10"],
- e11 = dict_a["e11"],
- e12 = dict_a["e12"],
- e13 = dict_a["e13"],
- e14 = dict_a["e14"]
- )
- print("-----------数据类型---------")
- QXZdata.objects.create(equip_id=e_id, qxz_data=extdata)
- print("<-----data update success!----->")
- except:
- print("<-----data update failed!----->")
- # 设备状态表存在、刷新状态表:
- if QXZstatus.objects.filter(equip_id=imei).exists():
- print("<-----this equip's status is existed!----->")
- try:
- sta = QXZstatus_New.objects.get(equip_id=imei)
- sta.e1=dict_a["e1"]
- sta.e2=dict_a["e2"]
- sta.e3=dict_a["e3"]
- sta.e4=dict_a["e4"]
- sta.e5=dict_a["e5"]
- sta.e6=dict_a["e6"]
- sta.e7=dict_a["e7"]
- sta.e8=dict_a["e8"]
- sta.e9=dict_a["e9"]
- sta.e10=dict_a["e10"]
- sta.e11=dict_a["e11"]
- sta.e12=dict_a["e12"]
- sta.e13=dict_a["e13"]
- sta.e14=dict_a["e14"]
- sta.save()
- print("<-----status update success!----->")
- except:
- print("<-----status update failed!----->")
- else:
- # 设备状态表不存在、创建状态表:
- print("<-----this equip's status is not existed!----->")
- try:
- QXZstatus.objects.create(
- equip_id=e_id,
- e1 = dict_a["e1"],
- e2 = dict_a["e2"],
- e3 = dict_a["e3"],
- e4 = dict_a["e4"],
- e5 = dict_a["e5"],
- e6 = dict_a["e6"],
- e7 = dict_a["e7"],
- e8 = dict_a["e8"],
- e9 = dict_a["e9"],
- e10 = dict_a["e10"],
- e11 = dict_a["e11"],
- e12 = dict_a["e12"],
- e13 = dict_a["e13"],
- e14 = dict_a["e14"]
- )
- print("<-----this equip's status table re-create successed!----->")
- except:
- print("<-----this equip's status table re-create failed!----->")
-
-
- else:
- # 设备不存在,在设备列表中创建:
- equip_t = Equip_type.objects.get(type_id=5)
- try:
- e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
- print("<-----this imei add successed!----->")
- try:
- print(extdata)
- # 设备数据表直接储存数据
- QXZdata.objects.create(equip_id=e_id, qxz_data=extdata)
- print("<-----data update success!----->")
- except:
- print("<-----data update failed!----->")
- # try:
- # QXZstatus.objects.create(equip_id=e_id, qxz_status=extdata)
- # print("<-----this imei register successed!----->")
- # except:
- # print("<-----this imei register failed!----->")
- except:
- print("<-----this imei add failed!----->")
-
- elif payload.get("cmd") == "config":
- sta = QXZstatus_New.objects.get(equip_id=imei)
- extdata = payload.get("ext")
- interval = extdata['interval']
- sta.interval = interval
- sta.save()
- except:
- pass
- if __name__ == '__main__':
- client = mqtt.Client(
- client_id="PY_MQTT_CLIENTC_QXZ1",
- clean_session=True,
- userdata=None,
- # protocol=MQTTv311,# 数据库版本
- )
- # 必须设置,否则会返回「Connected with result code 4」
- client.username_pw_set("admin", "password")
- client.on_connect = on_connect
- client.on_message = on_message
- HOST = "127.0.0.1"
- client.connect(HOST, 1883, 60)
- client.loop_forever()
|