cbd_mqtt_client.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_cbd is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Equip, Equip_type, CBDdata, CBDstatus, MyUser,Alarm_record, CBDstatus_all, RecentCBDdata
  16. import re
  17. import datetime
  18. import shutil,os
  19. import threading
  20. class CJSONEncoder(json.JSONEncoder):
  21. def default(self, obj):
  22. if isinstance(obj, datetime):
  23. return obj.strftime('%Y-%m-%d %H:%M:%S')
  24. elif isinstance(obj, date):
  25. return obj.strftime('%Y-%m-%d')
  26. else:
  27. return json.JSONEncoder.default(self, obj)
  28. # 连接后的操作: 0为成功
  29. def on_connect(client, userdata, flags, rc):
  30. # print("Connected with result code "+str(rc))
  31. client.subscribe("/yfkj/cbd/pub/#")
  32. client.subscribe("/yfkj/cbd/offline/#")
  33. # 发布消息完成回调函数:
  34. def on_publish(msg, rc):
  35. if rc == 0:
  36. print("publish success,msg = " + msg)
  37. def msg_thread(msg):
  38. print('\r')
  39. print('=================================================')
  40. print('\r')
  41. print("<-----topic:\n" + msg.topic + ';\n')
  42. print("Message:\n" + str(msg.payload) + "----->\n")
  43. # 从主题中获取imei
  44. # imei = msg.topic[14:len(msg.topic)]
  45. imei = re.sub("\D", "", msg.topic)
  46. print("<-----imei:", imei, "----->")
  47. # 判断主题:
  48. if "pub" in msg.topic:
  49. # 将json字符串解析:
  50. payload = json.loads(msg.payload.decode())
  51. if payload.get("cmd") == "status":
  52. print("<-----uploading status!----->")
  53. extdata = payload.get("ext")
  54. # print("extdata:", extdata)
  55. cbd_exist = Equip.objects.filter(equip_id=imei)
  56. # 设备存在,进一步判断状态表是否存在:
  57. if cbd_exist.exists():
  58. try:
  59. e_id = Equip.objects.get(equip_id=imei)
  60. CBDstatus_all.objects.create(
  61. equip_id=e_id, cbd_status=extdata, lng=extdata['lng'], lat=extdata['lat'],is_online = '1')
  62. print("<-----status_all upload success!----->")
  63. except:
  64. print("<-----status_all upload failed!----->")
  65. # 设备状态表存在、刷新状态表:
  66. print("<-----this equip is existed!----->")
  67. if CBDstatus.objects.filter(equip_id=imei).exists():
  68. print("<-----this equip's status is existed!----->")
  69. try:
  70. sta = CBDstatus.objects.get(equip_id=imei)
  71. sta.cbd_status=extdata
  72. sta.lng=extdata['lng']
  73. sta.lat=extdata['lat']
  74. sta.is_online = '1'
  75. sta.save()
  76. print("<-----status update success!----->")
  77. except:
  78. print("<-----status update failed!----->")
  79. else:
  80. # 设备状态表不存在、创建状态表:
  81. print("<-----this equip's status is not existed!----->")
  82. try:
  83. e_id = Equip.objects.get(equip_id=imei)
  84. try:
  85. CBDstatus.objects.create(equip_id=e_id, cbd_status=extdata,
  86. lng=extdata['lng'], lat=extdata['lat'],
  87. is_online = '1'
  88. )
  89. print("<-----this equip's status table re-create successed!----->")
  90. except:
  91. print("<-----this equip's status table re-create failed!----->")
  92. except:
  93. print("<-----this equip didn't exist!----->")
  94. else:
  95. # 设备不存在,在设备列表中创建:
  96. try:
  97. # 得到设备类型实例:
  98. equip_t = Equip_type.objects.get(type_id=extdata['dtype'])
  99. try:
  100. e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
  101. print("<-----this imei add successed!----->")
  102. try:
  103. dver = extdata['dver']
  104. if dver.startswith("5") and dver.endswith("HK"):
  105. etype = "4.0测报灯"
  106. elif dver.startswith("4") and dver.endswith("HK"):
  107. etype = "3.0测报灯"
  108. elif dver.startswith("4") and dver.endswith("re"):
  109. etype = "3.0测报灯"
  110. elif dver.startswith("1") or dver.startswith("2") or dver.startswith("3"):
  111. etype = "2.0测报灯"
  112. elif dver.endswith("ZP"):
  113. etype = "1.0测报灯"
  114. elif dver.startswith("4") and dver.endswith("GK"):
  115. etype = "高空测报灯"
  116. elif dver.endswith("TC"):
  117. etype = "糖醋测报灯"
  118. elif dver.startswith("5") and dver.endswith("gk"):
  119. etype = "简易高空测报灯"
  120. CBDstatus.objects.create(equip_id=e_id, cbd_status=extdata,
  121. lng=extdata['lng'], lat=extdata['lat'],etype=etype,
  122. is_online = '1'
  123. )
  124. print("<-----this imei register successed!----->")
  125. except:
  126. print("<-----this imei register failed!----->")
  127. except:
  128. print("<-----this imei add failed!----->")
  129. except:
  130. print("<-----register failed because this equip type is not existed!----->")
  131. # rtn="register success!!!"
  132. # print("注册连接回应:"+"/yfkj/scd/cmd/"+payload.get("equip_id"))
  133. # client.publish("/yfkj/scd/cmd/"+payload.get("equip_id"), json.dumps({"msg": rtn}))
  134. # 上传数据:
  135. elif payload.get("cmd") == "data":
  136. print("<-----uploading data!----->")
  137. extdata = payload.get("ext")
  138. # print("extdata:", extdata)
  139. # 新建数据记录:
  140. try:
  141. e_id = Equip.objects.get(equip_id=imei)
  142. CBDdata.objects.create(equip_id=e_id, cbd_data=extdata)
  143. print("<-----data upload success!----->")
  144. except:
  145. print("<-----data upload failed!----->")
  146. # 如果是新设备,表里没有,需要做判断
  147. if RecentCBDdata.objects.filter(equip_id=imei).exists():
  148. # 设备存在,更新数据
  149. try:
  150. Recdata = RecentCBDdata.objects.get(equip_id=imei)
  151. Recdata.cbd_data=extdata
  152. Recdata.upl_time=datetime.datetime.now()
  153. Recdata.save()
  154. print("<-----RecentCBDdata update success!----->")
  155. except:
  156. print("<-----RecentCBDdata update failed!----->")
  157. else:
  158. # 如果是新设备,则新建数据
  159. try:
  160. print("<-----this imei add successed!----->")
  161. e_id = Equip.objects.get(equip_id=imei)
  162. etype = CBDstatus.objects.get(equip_id=e_id).etype
  163. RecentCBDdata.objects.create(
  164. equip_id=e_id, cbd_data=extdata, etype=etype, upl_time=datetime.datetime.now())
  165. print("<-----RecentCBDdata create success!----->")
  166. except:
  167. print("<-----RecentCBDdata create failed!----->")
  168. # 参数配置信息:
  169. elif payload.get("cmd") == "paramconf":
  170. print("<-----uploading paramconf!----->")
  171. extdata = payload.get("ext")
  172. # print("extdata:", extdata)
  173. # 更新状态表中的参数配置信息;
  174. try:
  175. sta = CBDstatus.objects.get(equip_id=imei)
  176. sta.paramconf=extdata
  177. sta.save()
  178. print("<-----status.paramconf update success!----->")
  179. except:
  180. print("<-----cbdstatus table is not exist,status.paramconf upload failed!----->")
  181. # 设备不能拍照应急处理:
  182. elif payload.get("cmd") == "picture":
  183. try:
  184. nowtime = datetime.datetime.now().strftime('%Y%m%d')
  185. pic_name = datetime.datetime.now().strftime('%H%M%S')
  186. path = "/data/yfwlw/pyftp/ftp_file/ykm_cbd/" + imei + "/" + nowtime
  187. isExists=os.path.exists(path)
  188. # 判断结果
  189. if not isExists:
  190. # 如果不存在则创建目录
  191. # 创建目录操作函数
  192. os.makedirs(path)
  193. print(path+' 创建成功')
  194. else:
  195. # 如果目录存在则不创建,并提示目录已存在
  196. print(path+' 目录已存在')
  197. dest = "/data/yfwlw/pyftp/ftp_file/ykm_cbd/" + imei + "/" + nowtime + "/" + pic_name + ".jpg"
  198. shutil.copy("/data/yfwlw/logs/aaaa.jpg",dest)
  199. print("picture------------------success")
  200. except:
  201. print("picture-------------------failed")
  202. # 设备异常预警
  203. elif payload.get("cmd") == "warn":
  204. print("<-----uploading warn!----->")
  205. print("%s is warn ! ! !" % imei)
  206. extdata = payload.get("ext")
  207. cbd_exist = Equip.objects.filter(equip_id=imei)
  208. # 设备存在,进一步判断状态表是否存在:
  209. if cbd_exist.exists():
  210. try:
  211. e_id = Equip.objects.get(equip_id=imei)
  212. # 创建预警记录:
  213. Alarm_record.objects.create(equip_id=e_id, alarm_desc=extdata, e_type="3")
  214. print("update warn ok!")
  215. except:
  216. print("update warn failed!")
  217. else:
  218. print("this imei is not exist!")
  219. # 4.0带RE上报RTU信息
  220. elif payload.get("cmd") == "rtu":
  221. print("<-----uploading rtu!----->")
  222. extdata = payload.get("ext")
  223. cbd_exist = Equip.objects.filter(equip_id=imei)
  224. # 设备存在,进一步判断状态表是否存在:
  225. if cbd_exist.exists():
  226. print("=======>>",extdata["version"])
  227. if extdata["version"] == "":
  228. pass
  229. else:
  230. try:
  231. sta = CBDstatus.objects.get(equip_id=imei)
  232. sta.rtuinfo=extdata
  233. sta.save()
  234. print("update rtu_info success!")
  235. except:
  236. print("update rtu_info failed!")
  237. else:
  238. print("this imei is not exist!")
  239. # 修改imei的板子上报原始imei
  240. elif payload.get("cmd") == "inquire" or payload.get("cmd") == "imei":
  241. print("<-----uploading inquire!----->")
  242. extdata = payload.get("ext")
  243. cbd_exist = Equip.objects.filter(equip_id=imei)
  244. # 设备存在,进一步判断状态表是否存在:
  245. if cbd_exist.exists():
  246. print("=======>>",extdata)
  247. try:
  248. print("上报设备号为",extdata["status"])
  249. sta = CBDstatus.objects.get(equip_id=imei)
  250. sta.old_eid=extdata["status"]
  251. sta.save()
  252. print("update rtu_info success!")
  253. except:
  254. print("update rtu_info failed!")
  255. else:
  256. print("this imei is not exist!")
  257. # 离线消息:
  258. elif "offline" in msg.topic:
  259. try:
  260. # 将json字符串解析:
  261. payload = json.loads(msg.payload.decode())
  262. if payload.get("cmd") == "offline":
  263. print("<-----离线消息!----->")
  264. print("%s is offline!" % imei)
  265. cbd_exist = Equip.objects.filter(equip_id=imei)
  266. # 设备存在,进一步判断状态表是否存在:
  267. if cbd_exist.exists():
  268. try:
  269. e_id = Equip.objects.get(equip_id=imei)
  270. # now_time = json.dumps(datetime.datetime.now(), cls=CJSONEncoder)
  271. # 更新状态表中未离线状态:
  272. CBDstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now())
  273. # 增加状态历史的离线消息
  274. sta1 = CBDstatus.objects.get(equip_id=imei)
  275. # sta1.update(is_online = '0',off_time = datetime.datetime.now())
  276. CBDstatus_all.objects.create(equip_id=e_id, off_time = datetime.datetime.now(),
  277. cbd_status=sta1.cbd_status,lng=sta1.lng, lat=sta1.lat,
  278. is_online = '0')
  279. # 创建预警记录:
  280. Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}", e_type="3")
  281. print("update offline ok!")
  282. except:
  283. print("update offline failed!")
  284. else:
  285. print("this imei is not exist!")
  286. except:
  287. pass
  288. # 从服务器接收消息的回调函数 :
  289. def on_message(client, userdata, msg):
  290. t = threading.Thread(target=msg_thread,args=(msg,))
  291. #打印出当前线程的名称和id
  292. # print(threading.currentThread().name)
  293. # t.setDaemon(True)
  294. t.start()
  295. return
  296. if __name__ == '__main__':
  297. client = mqtt.Client(
  298. client_id="PY_MQTT_CLIENT_CBD",
  299. clean_session=True,
  300. userdata=None,
  301. # protocol=MQTTv311,# 数据库版本
  302. )
  303. # 必须设置,否则会返回「Connected with result code 4」
  304. client.username_pw_set("admin", "password")
  305. #设置连接上服务器回调函数:
  306. client.on_connect = on_connect
  307. #设置接收到服务器消息回调函数:
  308. client.on_message = on_message
  309. # #设置与服务器断开连接回调函数
  310. # client.on_disconnect = on_disconnect
  311. HOST = "127.0.0.1"
  312. client.connect(HOST, 1883, 60)
  313. client.loop_forever()
  314. # # 输入发布的话题名称:
  315. # # user = input("请输入名称:")
  316. # topic = "/yfkj/scd/cmd/2001"
  317. # client.user_data_set(topic)
  318. # client.loop_start()
  319. # while True:
  320. # str = input()
  321. # if str:
  322. # client.publish("/yfkj/scd/cmd/2001", json.dumps({"topic": topic, "cmd": str}))