qxz_mqtt_client.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  7. "second_pro.settings") # project_name 项目名称
  8. django.setup()
  9. print("<-----python_mqtt_client_qxz is run----->")
  10. import paho.mqtt.client as mqtt
  11. import json
  12. from apps.AppInfo.models import Equip, Equip_type, MyUser, QXZdata, QXZstatus, QXZdata_New, QXZstatus_New
  13. import re
  14. import datetime
  15. import copy
  16. class CJSONEncoder(json.JSONEncoder):
  17. def default(self, obj):
  18. if isinstance(obj, datetime):
  19. return obj.strftime('%Y-%m-%d %H:%M:%S')
  20. elif isinstance(obj, date):
  21. return obj.strftime('%Y-%m-%d')
  22. else:
  23. return json.JSONEncoder.default(self, obj)
  24. # 连接后的操作: 0为成功
  25. def on_connect(client, userdata, flags, rc):
  26. # print("Connected with result code "+str(rc))
  27. client.subscribe("/yfkj/qxz/pub/#")
  28. # *****成功发布******
  29. def on_publish(msg, rc):
  30. if rc == 0:
  31. print("publish success,msg = "+msg)
  32. dict_qxz = {"e1":"","e2":"","e3":"","e4":"","e5":"","e6":"","e7":"","e8":"","e9":"","e10":"","e11":"","e12":"","e13":"","e14":""}
  33. # 从服务器接受到消息后回调此函数 :
  34. def on_message(client, userdata, msg):
  35. print('\r')
  36. print('=================================================')
  37. print('\r')
  38. print("<-----topic:\n" + msg.topic + ';\n')
  39. print("Message:\n" + str(msg.payload) + "----->\n")
  40. # 从主题中获取imei
  41. # imei = msg.topic[14:len(msg.topic)]
  42. imei = re.sub("\D", "", msg.topic)
  43. print("<-----imei:", imei, "----->")
  44. try:
  45. # 判断主题:
  46. if "pub" in msg.topic:
  47. # 将json字符串解析:
  48. payload = json.loads(msg.payload.decode())
  49. qxz_exist = Equip.objects.filter(equip_id=imei)
  50. if payload.get("cmd") == "terminalData":
  51. print("<-----uploading data!----->")
  52. extdata = payload.get("ext")
  53. # print("extdata:", extdata)
  54. # 设备存在,进一步判断状态表是否存在:
  55. if qxz_exist.exists():
  56. print("<-----this equip is existed!----->")
  57. try:
  58. e_id = Equip.objects.get(equip_id=imei)
  59. except:
  60. print("<-----this equip didn't exist!----->")
  61. try:
  62. # 设备数据表直接储存数据
  63. print("-----------数据类型---------")
  64. dict_a = copy.deepcopy(dict_qxz)
  65. for i in extdata['data']:
  66. dict_a[i["eKey"]] = i["eValue"] + "#" + i["eNum"] + "#" + i["eKey"]
  67. print(dict_a)
  68. QXZdata_New.objects.create(
  69. equip_id=e_id,
  70. e1 = dict_a["e1"],
  71. e2 = dict_a["e2"],
  72. e3 = dict_a["e3"],
  73. e4 = dict_a["e4"],
  74. e5 = dict_a["e5"],
  75. e6 = dict_a["e6"],
  76. e7 = dict_a["e7"],
  77. e8 = dict_a["e8"],
  78. e9 = dict_a["e9"],
  79. e10 = dict_a["e10"],
  80. e11 = dict_a["e11"],
  81. e12 = dict_a["e12"],
  82. e13 = dict_a["e13"],
  83. e14 = dict_a["e14"]
  84. )
  85. print("-----------数据类型---------")
  86. QXZdata.objects.create(equip_id=e_id, qxz_data=extdata)
  87. print("<-----data update success!----->")
  88. except:
  89. print("<-----data update failed!----->")
  90. # 设备状态表存在、刷新状态表:
  91. if QXZstatus.objects.filter(equip_id=imei).exists():
  92. print("<-----this equip's status is existed!----->")
  93. try:
  94. sta = QXZstatus_New.objects.get(equip_id=imei)
  95. sta.e1=dict_a["e1"]
  96. sta.e2=dict_a["e2"]
  97. sta.e3=dict_a["e3"]
  98. sta.e4=dict_a["e4"]
  99. sta.e5=dict_a["e5"]
  100. sta.e6=dict_a["e6"]
  101. sta.e7=dict_a["e7"]
  102. sta.e8=dict_a["e8"]
  103. sta.e9=dict_a["e9"]
  104. sta.e10=dict_a["e10"]
  105. sta.e11=dict_a["e11"]
  106. sta.e12=dict_a["e12"]
  107. sta.e13=dict_a["e13"]
  108. sta.e14=dict_a["e14"]
  109. sta.save()
  110. print("<-----status update success!----->")
  111. except:
  112. print("<-----status update failed!----->")
  113. else:
  114. # 设备状态表不存在、创建状态表:
  115. print("<-----this equip's status is not existed!----->")
  116. try:
  117. QXZstatus.objects.create(
  118. equip_id=e_id,
  119. e1 = dict_a["e1"],
  120. e2 = dict_a["e2"],
  121. e3 = dict_a["e3"],
  122. e4 = dict_a["e4"],
  123. e5 = dict_a["e5"],
  124. e6 = dict_a["e6"],
  125. e7 = dict_a["e7"],
  126. e8 = dict_a["e8"],
  127. e9 = dict_a["e9"],
  128. e10 = dict_a["e10"],
  129. e11 = dict_a["e11"],
  130. e12 = dict_a["e12"],
  131. e13 = dict_a["e13"],
  132. e14 = dict_a["e14"]
  133. )
  134. print("<-----this equip's status table re-create successed!----->")
  135. except:
  136. print("<-----this equip's status table re-create failed!----->")
  137. else:
  138. # 设备不存在,在设备列表中创建:
  139. equip_t = Equip_type.objects.get(type_id=5)
  140. try:
  141. e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
  142. print("<-----this imei add successed!----->")
  143. try:
  144. print(extdata)
  145. # 设备数据表直接储存数据
  146. QXZdata.objects.create(equip_id=e_id, qxz_data=extdata)
  147. print("<-----data update success!----->")
  148. except:
  149. print("<-----data update failed!----->")
  150. # try:
  151. # QXZstatus.objects.create(equip_id=e_id, qxz_status=extdata)
  152. # print("<-----this imei register successed!----->")
  153. # except:
  154. # print("<-----this imei register failed!----->")
  155. except:
  156. print("<-----this imei add failed!----->")
  157. elif payload.get("cmd") == "config":
  158. sta = QXZstatus_New.objects.get(equip_id=imei)
  159. extdata = payload.get("ext")
  160. interval = extdata['interval']
  161. sta.interval = interval
  162. sta.save()
  163. except:
  164. pass
  165. if __name__ == '__main__':
  166. client = mqtt.Client(
  167. client_id="PY_MQTT_CLIENTC_QXZ1",
  168. clean_session=True,
  169. userdata=None,
  170. # protocol=MQTTv311,# 数据库版本
  171. )
  172. # 必须设置,否则会返回「Connected with result code 4」
  173. client.username_pw_set("admin", "password")
  174. client.on_connect = on_connect
  175. client.on_message = on_message
  176. HOST = "127.0.0.1"
  177. client.connect(HOST, 1883, 60)
  178. client.loop_forever()