tccb_mqtt_client.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_cbd is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Equip, Equip_type, TCCBdata, TCCBstatus, MyUser,Alarm_record
  16. import re
  17. import datetime
  18. import shutil,os
  19. import threading
  20. class CJSONEncoder(json.JSONEncoder):
  21. def default(self, obj):
  22. if isinstance(obj, datetime):
  23. return obj.strftime('%Y-%m-%d %H:%M:%S')
  24. elif isinstance(obj, date):
  25. return obj.strftime('%Y-%m-%d')
  26. else:
  27. return json.JSONEncoder.default(self, obj)
  28. # 连接后的操作: 0为成功
  29. def on_connect(client, userdata, flags, rc):
  30. # print("Connected with result code "+str(rc))
  31. client.subscribe("/yfkj/tccbd/pub/#")
  32. client.subscribe("/yfkj/tccbd/offline/#")
  33. # 发布消息完成回调函数:
  34. def on_publish(msg, rc):
  35. if rc == 0:
  36. print("publish success,msg = " + msg)
  37. def msg_thread(msg):
  38. print('\r')
  39. print('=================================================')
  40. print('\r')
  41. print("<-----topic:\n" + msg.topic + ';\n')
  42. print("Message:\n" + str(msg.payload) + "----->\n")
  43. # 从主题中获取imei
  44. # imei = msg.topic[14:len(msg.topic)]
  45. imei = re.sub("\D", "", msg.topic)
  46. print("<-----imei:", imei, "----->")
  47. # 判断主题:
  48. if "pub" in msg.topic:
  49. # 将json字符串解析:
  50. payload = json.loads(msg.payload.decode())
  51. if payload.get("cmd") == "status":
  52. print("<-----uploading status!----->")
  53. extdata = payload.get("ext")
  54. # print("extdata:", extdata)
  55. cbd_exist = Equip.objects.filter(equip_id=imei)
  56. # 设备存在,进一步判断状态表是否存在:
  57. if cbd_exist.exists():
  58. # 设备状态表存在、刷新状态表:
  59. print("<-----this equip is existed!----->")
  60. e_id = Equip.objects.get(equip_id=imei)
  61. TCCBdata.objects.create(equip_id=e_id, tccb_data=extdata)
  62. if TCCBstatus.objects.filter(equip_id=imei).exists():
  63. print("<-----this equip's status is existed!----->")
  64. try:
  65. sta = TCCBstatus.objects.get(equip_id=imei)
  66. sta.tccb_status=extdata
  67. sta.lng=extdata['lng']
  68. sta.lat=extdata['lat']
  69. sta.is_online = '1'
  70. sta.save()
  71. print("<-----status update success!----->")
  72. except:
  73. print("<-----status update failed!----->")
  74. else:
  75. # 设备状态表不存在、创建状态表:
  76. print("<-----this equip's status is not existed!----->")
  77. try:
  78. e_id = Equip.objects.get(equip_id=imei)
  79. try:
  80. TCCBstatus.objects.create(equip_id=e_id, tccb_status=extdata,
  81. lng=extdata['lng'], lat=extdata['lat'],
  82. is_online = '1'
  83. )
  84. TCCBdata.objects.create(equip_id=e_id, tccb_data=extdata)
  85. print("<-----this equip's status table re-create successed!----->")
  86. except:
  87. print("<-----this equip's status table re-create failed!----->")
  88. except:
  89. print("<-----this equip didn't exist!----->")
  90. else:
  91. # 设备不存在,在设备列表中创建:
  92. try:
  93. # 得到设备类型实例:
  94. equip_t = Equip_type.objects.get(type_id="9")
  95. try:
  96. e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
  97. print("<-----this imei add successed!----->")
  98. TCCBstatus.objects.create(equip_id=e_id, tccb_status=extdata,
  99. lng=extdata['lng'], lat=extdata['lat'],
  100. is_online = '1'
  101. )
  102. TCCBdata.objects.create(equip_id=e_id, tccb_data=extdata)
  103. except:
  104. print("<-----this imei add failed!----->")
  105. except:
  106. print("<-----register failed because this equip type is not existed!----->")
  107. # 参数配置信息:
  108. elif payload.get("cmd") == "paramconf":
  109. print("<-----uploading paramconf!----->")
  110. extdata = payload.get("ext")
  111. # print("extdata:", extdata)
  112. # 更新状态表中的参数配置信息;
  113. try:
  114. sta = TCCBstatus.objects.get(equip_id=imei)
  115. sta.paramconf=extdata
  116. sta.save()
  117. print("<-----status.paramconf update success!----->")
  118. except:
  119. print("<-----cbdstatus table is not exist,status.paramconf upload failed!----->")
  120. # 设备异常预警
  121. elif payload.get("cmd") == "warn":
  122. print("<-----uploading warn!----->")
  123. print("%s is warn ! ! !" % imei)
  124. extdata = payload.get("ext")
  125. cbd_exist = Equip.objects.filter(equip_id=imei)
  126. # 设备存在,进一步判断状态表是否存在:
  127. if cbd_exist.exists():
  128. try:
  129. e_id = Equip.objects.get(equip_id=imei)
  130. TCCBstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now())
  131. # 创建预警记录:
  132. Alarm_record.objects.create(equip_id=e_id, alarm_desc=extdata, e_type="9")
  133. print("update warn ok!")
  134. except:
  135. print("update warn failed!")
  136. else:
  137. print("this imei is not exist!")
  138. # 离线消息:
  139. elif "offline" in msg.topic:
  140. try:
  141. # 将json字符串解析:
  142. payload = json.loads(msg.payload.decode())
  143. if payload.get("cmd") == "offline":
  144. print("<-----离线消息!----->")
  145. print("%s is offline!" % imei)
  146. cbd_exist = Equip.objects.filter(equip_id=imei)
  147. # 设备存在,进一步判断状态表是否存在:
  148. if cbd_exist.exists():
  149. try:
  150. e_id = Equip.objects.get(equip_id=imei)
  151. TCCBstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now())
  152. # 创建预警记录:
  153. Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}", e_type="3")
  154. print("update offline ok!")
  155. except:
  156. print("update offline failed!")
  157. else:
  158. print("this imei is not exist!")
  159. except:
  160. pass
  161. # 从服务器接收消息的回调函数 :
  162. def on_message(client, userdata, msg):
  163. t = threading.Thread(target=msg_thread,args=(msg,))
  164. #打印出当前线程的名称和id
  165. # print(threading.currentThread().name)
  166. # t.setDaemon(True)
  167. t.start()
  168. return
  169. if __name__ == '__main__':
  170. client = mqtt.Client(
  171. client_id="PY_MQTT_CLIENT_TCCBD",
  172. clean_session=True,
  173. userdata=None,
  174. # protocol=MQTTv311,# 数据库版本
  175. )
  176. # 必须设置,否则会返回「Connected with result code 4」
  177. client.username_pw_set("admin", "password")
  178. #设置连接上服务器回调函数:
  179. client.on_connect = on_connect
  180. #设置接收到服务器消息回调函数:
  181. client.on_message = on_message
  182. # #设置与服务器断开连接回调函数
  183. # client.on_disconnect = on_disconnect
  184. HOST = "127.0.0.1"
  185. client.connect(HOST, 1883, 60)
  186. client.loop_forever()
  187. # # 输入发布的话题名称:
  188. # # user = input("请输入名称:")
  189. # topic = "/yfkj/scd/cmd/2001"
  190. # client.user_data_set(topic)
  191. # client.loop_start()
  192. # while True:
  193. # str = input()
  194. # if str:
  195. # client.publish("/yfkj/scd/cmd/2001", json.dumps({"topic": topic, "cmd": str}))