bzy_mqtt_client.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_bzy is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Equip, Equip_type, MyUser, Alarm_record, BZYdata, BZYstatus
  16. import re
  17. import datetime
  18. import time
  19. def mkdir(path):
  20. # 去除首位空格
  21. path=path.strip()
  22. # 去除尾部 \ 符号
  23. path=path.rstrip("\\")
  24. # 判断路径是否存在
  25. # 存在 True
  26. # 不存在 False
  27. isExists=os.path.exists(path)
  28. # 判断结果
  29. if not isExists:
  30. # 如果不存在则创建目录
  31. # 创建目录操作函数
  32. os.makedirs(path)
  33. print(path+' 创建成功')
  34. return True
  35. else:
  36. # 如果目录存在则不创建,并提示目录已存在
  37. print(path+' 目录已存在')
  38. return False
  39. def variance_of_laplacian(image):
  40. # 拉普拉斯的方差
  41. return cv2.Laplacian(image, cv2.CV_64F).var()
  42. class CJSONEncoder(json.JSONEncoder):
  43. def default(self, obj):
  44. if isinstance(obj, datetime):
  45. return obj.strftime('%Y-%m-%d %H:%M:%S')
  46. elif isinstance(obj, date):
  47. return obj.strftime('%Y-%m-%d')
  48. else:
  49. return json.JSONEncoder.default(self, obj)
  50. # 连接后的操作: 0为成功
  51. def on_connect(client, userdata, flags, rc):
  52. # print("Connected with result code "+str(rc))
  53. client.subscribe("/yfkj/bzy/c2s/#")
  54. client.subscribe("/yfkj/bzy/offline/#")
  55. # *****成功发布******
  56. def on_publish(msg, rc):
  57. if rc == 0:
  58. print("publish success,msg = "+msg)
  59. # 从服务器接受到消息后回调此函数 :
  60. def on_message(client, userdata, msg):
  61. print('\r')
  62. print('=================================================')
  63. print('\r')
  64. print("<-----topic:\n" + msg.topic + ';\n')
  65. print("Message:\n" + str(msg.payload) + "----->\n")
  66. # 从主题中获取imei
  67. # imei = msg.topic[14:len(msg.topic)]
  68. reg = re.compile(r"(?<=/)\d+")
  69. imei = reg.search(msg.topic).group(0)
  70. # imei = re.sub("\D", "", msg.topic)
  71. # nowtime = datetime.datetime.now().strftime('%Y%m%d')
  72. # origin = sys.stdout
  73. # f = open('./logs/'+nowtime+'bzymqtt.txt','a+')
  74. # sys.stdout = f
  75. print("<-----imei:", imei, "----->")
  76. # 判断主题:
  77. if "c2s" in msg.topic:
  78. # 将json字符串解析:
  79. payload = json.loads(msg.payload.decode())
  80. if payload.get("cmd") == "status":
  81. print("<-----uploading data!----->")
  82. extdata = payload.get("ext")
  83. # print("extdata:", extdata)
  84. bzy_exist = Equip.objects.filter(equip_id=imei)
  85. # 设备存在,进一步判断状态表是否存在:
  86. if bzy_exist.exists():
  87. print("<-----this equip is existed!----->")
  88. try:
  89. e_id = Equip.objects.get(equip_id=imei)
  90. except:
  91. print("<-----this equip didn't exist!----->")
  92. try:
  93. # 设备数据表直接储存数据
  94. BZYdata.objects.create(equip_id=e_id, bzy_data=extdata)
  95. print("<-----data update success!----->")
  96. except:
  97. print("<-----data update failed!----->")
  98. # 设备状态表存在、刷新状态表:
  99. if BZYstatus.objects.filter(equip_id=imei).exists():
  100. print("<-----this equip's status is existed!----->")
  101. try:
  102. sta = BZYstatus.objects.get(equip_id=imei)
  103. sta.bzy_status=extdata
  104. sta.is_online = '1'
  105. sta.save()
  106. print("<-----status update success!----->")
  107. except:
  108. print("<-----status update failed!----->")
  109. else:
  110. # 设备状态表不存在、创建状态表:
  111. print("<-----this equip's status is not existed!----->")
  112. try:
  113. BZYstatus.objects.create(equip_id=e_id, bzy_status=extdata, is_online="1")
  114. print("<-----this equip's status table re-create successed!----->")
  115. except:
  116. print("<-----this equip's status table re-create failed!----->")
  117. else:
  118. # 设备不存在,在设备列表中创建:
  119. equip_t = Equip_type.objects.get(type_id=7)
  120. try:
  121. e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
  122. print("<-----this imei add successed!----->")
  123. try:
  124. # 设备数据表直接储存数据
  125. BZYdata.objects.create(equip_id=e_id, bzy_data=extdata)
  126. print("<-----data update success!----->")
  127. except:
  128. print("<-----data update failed!----->")
  129. try:
  130. BZYstatus.objects.create(equip_id=e_id, bzy_status=extdata, is_online="1")
  131. print("<-----this imei register successed!----->")
  132. except:
  133. print("<-----this imei register failed!----->")
  134. except:
  135. print("<-----this imei add failed!----->")
  136. elif payload.get("cmd") == "netconf":
  137. print("<-----uploading netconf!----->")
  138. mqttsta = payload.get("mqtt")
  139. ftpsta = payload.get("ftp")
  140. netsta = {"mqtt": mqttsta, "ftp": ftpsta}
  141. # print("extdata:", extdata)
  142. # 更新状态表中的参数配置信息;
  143. try:
  144. sta = BZYstatus.objects.get(equip_id=imei)
  145. sta.netconf = netsta
  146. sta.save()
  147. print("<-----status.netconf update success!----->")
  148. except:
  149. print("<-----bzystatus table is not exist,status.netconf upload failed!----->")
  150. # 离线消息:
  151. elif "offline" in msg.topic:
  152. try:
  153. # 将json字符串解析:
  154. payload = json.loads(msg.payload.decode())
  155. if payload.get("cmd") == "offline":
  156. print("<-----离线消息!----->")
  157. print("%s is offline!" % imei)
  158. bzy_exist = Equip.objects.filter(equip_id=imei)
  159. # 设备存在,进一步判断状态表是否存在:
  160. if bzy_exist.exists():
  161. try:
  162. e_id = Equip.objects.get(equip_id=imei)
  163. # now_time = json.dumps(datetime.datetime.now(), cls=CJSONEncoder)
  164. # 更新状态表中未离线状态:
  165. BZYstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now())
  166. # 创建预警记录:
  167. Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}")
  168. print("update offline ok!")
  169. except:
  170. print("update offline failed!")
  171. else:
  172. print("this imei is not exist!")
  173. except:
  174. pass
  175. # sys.stdout = origin
  176. # f.close()
  177. if __name__ == '__main__':
  178. client = mqtt.Client(
  179. client_id="PY_MQTT_CLIENTC_BZY",
  180. clean_session=True,
  181. userdata=None,
  182. # protocol=MQTTv311,# 数据库版本
  183. )
  184. # 必须设置,否则会返回「Connected with result code 4」
  185. client.username_pw_set("admin", "password")
  186. client.on_connect = on_connect
  187. client.on_message = on_message
  188. HOST = "127.0.0.1"
  189. client.connect(HOST, 1883, 60)
  190. client.loop_forever()