scd_mqtt_client.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_scd is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Equip, Equip_type, SCDdata, SCDstatus, MyUser,Alarm_record, RecentSCDdata, SCDstatus_all
  16. import re
  17. import datetime
  18. class CJSONEncoder(json.JSONEncoder):
  19. def default(self, obj):
  20. if isinstance(obj, datetime):
  21. return obj.strftime('%Y-%m-%d %H:%M:%S')
  22. elif isinstance(obj, date):
  23. return obj.strftime('%Y-%m-%d')
  24. else:
  25. return json.JSONEncoder.default(self, obj)
  26. # 连接后的操作: 0为成功
  27. def on_connect(client, userdata, flags, rc):
  28. # print("Connected with result code "+str(rc))
  29. client.subscribe("/yfkj/scd/pub/#")
  30. client.subscribe("/yfkj/scd/offline/#")
  31. # *****成功发布******
  32. def on_publish(msg, rc):
  33. if rc == 0:
  34. print("publish success,msg = "+msg)
  35. # 从服务器接受到消息后回调此函数 :
  36. def on_message(client, userdata, msg):
  37. time_a = datetime.datetime.now()
  38. print('\r')
  39. print('=================================================')
  40. print('\r')
  41. print("<-----topic:\n" + msg.topic + ';\n')
  42. print("Message:\n" + str(msg.payload) + "----->\n")
  43. # 从主题中获取imei
  44. # imei = msg.topic[14:len(msg.topic)]
  45. imei = re.sub("\D", "", msg.topic)
  46. print("<-----imei:", imei, "----->")
  47. if imei == "862285036228686":
  48. pass
  49. else:
  50. # 判断主题:
  51. if "pub" in msg.topic:
  52. # 将json字符串解析:
  53. payload = json.loads(msg.payload.decode())
  54. if payload.get("cmd") == "status":
  55. print("<-----uploading status!----->")
  56. extdata = payload.get("ext")
  57. # print("extdata:", extdata)
  58. scd_exist = Equip.objects.filter(equip_id=imei)
  59. # 设备存在,进一步判断状态表是否存在:
  60. if scd_exist.exists():
  61. # 新建数据记录:
  62. try:
  63. e_id = Equip.objects.get(equip_id=imei)
  64. SCDstatus_all.objects.create(
  65. equip_id=e_id, scd_status=extdata, lng=extdata['lng'], lat=extdata['lat'], is_online = '1', ds=extdata['ds'])
  66. print("<-----status_all upload success!----->")
  67. except:
  68. print("<-----status_all upload failed!----->")
  69. # 设备状态表存在、刷新状态表:
  70. print("<-----this equip is existed!----->")
  71. if SCDstatus.objects.filter(equip_id=imei).exists():
  72. print("<-----this equip's status is existed!----->")
  73. try:
  74. sta = SCDstatus.objects.get(equip_id=imei)
  75. sta.scd_status=extdata
  76. sta.lng=extdata['lng']
  77. sta.lat=extdata['lat']
  78. sta.is_online = '1'
  79. sta.save()
  80. print("<-----status update success!----->")
  81. time_b = datetime.datetime.now()
  82. print("一共用时:",time_b-time_a)
  83. except:
  84. print("<-----status update failed!----->")
  85. else:
  86. # 设备状态表不存在、创建状态表:
  87. print("<-----this equip's status is not existed!----->")
  88. try:
  89. e_id = Equip.objects.get(equip_id=imei)
  90. try:
  91. SCDstatus.objects.create(equip_id=e_id, ds=extdata['ds'],
  92. scd_status=extdata, lng=extdata['lng'], lat=extdata['lat'],
  93. is_online = '1')
  94. print("<-----this equip's status table re-create successed!----->")
  95. except:
  96. print("<-----this equip's status table re-create failed!----->")
  97. except:
  98. print("<-----this equip didn't exist!----->")
  99. else:
  100. # 设备不存在,在设备列表中创建:
  101. try:
  102. # 得到设备类型实例:
  103. equip_t = Equip_type.objects.get(type_id=extdata['dtype'])
  104. try:
  105. e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
  106. print("<-----this imei add successed!----->")
  107. try:
  108. SCDstatus.objects.create(equip_id=e_id, ds=extdata['ds'],
  109. scd_status=extdata,lng=extdata['lng'], lat=extdata['lat'],
  110. is_online = '1')
  111. print("<-----this imei register successed!----->")
  112. except:
  113. print("<-----this imei register failed!----->")
  114. except:
  115. print("<-----this imei add failed!----->")
  116. except:
  117. print("<-----this imei register failed because this equip type is not exist,!----->")
  118. # rtn="register success!!!"
  119. # print("注册连接回应:"+"/yfkj/scd/cmd/"+payload.get("equip_id"))
  120. # client.publish("/yfkj/scd/cmd/"+payload.get("equip_id"), json.dumps({"msg": rtn}))
  121. elif payload.get("cmd") == "data":
  122. print("<-----uploading data!----->")
  123. extdata = payload.get("ext")
  124. # print("extdata:", extdata)
  125. # 新建数据记录:
  126. try:
  127. e_id = Equip.objects.get(equip_id=imei)
  128. SCDdata.objects.create(
  129. equip_id=e_id, ds=extdata["ds"], scd_data=extdata)
  130. print("<-----data upload success!----->")
  131. except:
  132. print("<-----data upload failed!----->")
  133. # 如果是新设备,表里没有,需要做判断
  134. if RecentSCDdata.objects.filter(equip_id=imei).exists():
  135. # 设备存在,更新数据
  136. try:
  137. Recdata = RecentSCDdata.objects.get(equip_id=imei)
  138. Recdata.ds=extdata["ds"]
  139. Recdata.scd_data=extdata
  140. Recdata.upl_time=datetime.datetime.now()
  141. Recdata.save()
  142. print("<-----RecentSCDdata update success!----->")
  143. except:
  144. print("<-----RecentSCDdata update failed!----->")
  145. else:
  146. # 如果是新设备,则新建数据
  147. try:
  148. e_id = Equip.objects.get(equip_id=imei)
  149. RecentSCDdata.objects.create(
  150. equip_id=e_id, ds=extdata["ds"], scd_data=extdata, upl_time=datetime.datetime.now())
  151. print("<-----RecentSCDdata create success!----->")
  152. except:
  153. print("<-----RecentSCDdata create failed!----->")
  154. # 参数配置信息:
  155. elif payload.get("cmd") == "paramconf":
  156. print("<-----uploading paramconf!----->")
  157. extdata = payload.get("ext")
  158. print("extdata:", extdata)
  159. # 更新状态表中的参数配置信息;
  160. try:
  161. sta = SCDstatus.objects.get(equip_id=imei)
  162. sta.paramconf=extdata
  163. sta.save()
  164. print("<-----status.paramconf update success!----->")
  165. except:
  166. print("<-----scdstatus table is not exist,status.paramconf upload failed!----->")
  167. # 设备异常预警
  168. elif payload.get("cmd") == "warn":
  169. print("<-----uploading warn!----->")
  170. print("%s is warn ! ! !" % imei)
  171. extdata = payload.get("ext")
  172. scd_exist = Equip.objects.filter(equip_id=imei)
  173. if scd_exist.exists():
  174. try:
  175. e_id = Equip.objects.get(equip_id=imei)
  176. # 创建预警记录:
  177. Alarm_record.objects.create(equip_id=e_id, alarm_desc=extdata, e_type="2")
  178. print("update warn ok!")
  179. except:
  180. print("update warn failed!")
  181. else:
  182. print("this imei is not exist!")
  183. # 离线消息:
  184. elif "offline" in msg.topic:
  185. try:
  186. # 将json字符串解析:
  187. payload = json.loads(msg.payload.decode())
  188. if payload.get("cmd") == "offline":
  189. print("<-----离线消息!----->")
  190. print("%s is offline!" % imei)
  191. scd_exist = Equip.objects.filter(equip_id=imei)
  192. # 设备存在,进一步判断状态表是否存在:
  193. if scd_exist.exists():
  194. try:
  195. e_id = Equip.objects.get(equip_id=imei)
  196. # now_time = json.dumps(datetime.datetime.now(), cls=CJSONEncoder)
  197. # 更新状态表中未离线状态:
  198. SCDstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now())
  199. # 增加状态历史的离线消息
  200. sta1 = SCDstatus.objects.get(equip_id=imei)
  201. # sta1.update(is_online = '0',off_time = datetime.datetime.now())
  202. SCDstatus_all.objects.create(equip_id=e_id, ds=sta1.ds, off_time = datetime.datetime.now(),
  203. scd_status=sta1.scd_status,lng=sta1.lng, lat=sta1.lat,
  204. is_online = '0')
  205. # 创建预警记录:
  206. Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}",e_type="2")
  207. print("update offline ok!")
  208. except Exception as e:
  209. print(e)
  210. print("update offline failed!")
  211. else:
  212. print("this imei is not exist!")
  213. except:
  214. pass
  215. if __name__ == '__main__':
  216. client = mqtt.Client(
  217. client_id="PY_MQTT_CLIENTC_SCD",
  218. clean_session=True,
  219. userdata=None,
  220. # protocol=MQTTv311,# 数据库版本
  221. )
  222. # 必须设置,否则会返回「Connected with result code 4」
  223. client.username_pw_set("admin", "password")
  224. client.on_connect = on_connect
  225. client.on_message = on_message
  226. HOST = "127.0.0.1"
  227. client.connect(HOST, 1883, 60)
  228. client.loop_forever()
  229. # # 输入发布的话题名称:
  230. # # user = input("请输入名称:")
  231. # topic = "/yfkj/scd/cmd/2001"
  232. # client.user_data_set(topic)
  233. # client.loop_start()
  234. # while True:
  235. # str = input()
  236. # if str:
  237. # client.publish("/yfkj/scd/cmd/2001", json.dumps({"topic": topic, "cmd": str}))