ybq_mqtt_client.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_ybq is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Equip, Equip_type, SCDdata, SCDstatus, MyUser, Alarm_record, RecentSCDdata, YBQdata, YBQstatus
  16. import re
  17. import datetime
  18. import copy
  19. class CJSONEncoder(json.JSONEncoder):
  20. def default(self, obj):
  21. if isinstance(obj, datetime):
  22. return obj.strftime('%Y-%m-%d %H:%M:%S')
  23. elif isinstance(obj, date):
  24. return obj.strftime('%Y-%m-%d')
  25. else:
  26. return json.JSONEncoder.default(self, obj)
  27. # 连接后的操作: 0为成功
  28. def on_connect(client, userdata, flags, rc):
  29. # print("Connected with result code "+str(rc))
  30. client.subscribe("yfkj/xycb/c2s/#")
  31. client.subscribe("yfkj/xycb/offline/#")
  32. # *****成功发布******
  33. def on_publish(msg, rc):
  34. if rc == 0:
  35. print("publish success,msg = "+msg)
  36. # 从服务器接受到消息后回调此函数 :
  37. def on_message(client, userdata, msg):
  38. print('\r')
  39. print('=================================================')
  40. print('\r')
  41. print("<-----topic:\n" + msg.topic + ';\n')
  42. print("Message:\n" + str(msg.payload) + "----->\n")
  43. # 从主题中获取imei
  44. # imei = msg.topic[14:len(msg.topic)]
  45. # imei = re.sub("\D", "", msg.topic)
  46. reg = re.compile(r"(?<=/)\d+")
  47. imei = reg.search(msg.topic).group(0)
  48. print("<-----imei:", imei, "----->")
  49. # 判断主题:
  50. if "c2s" in msg.topic:
  51. # 将json字符串解析
  52. print("------!!!",msg.payload.decode("utf8","replace"))
  53. print("------type",type(msg.payload.decode("utf8","replace")))
  54. try:
  55. payload = json.loads(msg.payload.decode("utf8","replace"))
  56. except:
  57. payload = ""
  58. if payload.get("cmd") == "data":
  59. print("<-----uploading data and status!----->")
  60. extdata = payload.get("ext")
  61. extdata_1 = copy.deepcopy(extdata)
  62. # print("extdata:", extdata)
  63. ybq_exist = Equip.objects.filter(equip_id=imei)
  64. # 设备存在,进一步判断状态表是否存在:
  65. if extdata['proj'][-1] == "2":
  66. if ybq_exist.exists():
  67. # 设备状态表存在、刷新状态表:
  68. print("<-----this equip is existed!----->")
  69. if YBQstatus.objects.filter(equip_id=imei).exists():
  70. print("<-----this equip's status is existed!----->")
  71. try:
  72. e_id = Equip.objects.get(equip_id=imei)
  73. sta = YBQstatus.objects.get(equip_id=imei)
  74. extdata['dev_num'] = sta.equip_code
  75. # sta.equip_code = bb
  76. sta.ybq_status=extdata
  77. sta.lng=extdata['lng']
  78. sta.lat=extdata['lat']
  79. sta.is_online = '1'
  80. sta.save()
  81. YBQdata.objects.create(equip_id=e_id,equip_code=sta.equip_code,ybq_data=extdata)
  82. print("<-----status update success!----->")
  83. except:
  84. # print("---------------",e)
  85. print("<-----status update failed!----->")
  86. else:
  87. try:
  88. aa = YBQstatus.objects.first(sex_type="2").equip_code
  89. except:
  90. aa = "0"
  91. bb = int(aa) + 1
  92. bb = ((5-len(str(bb)))*"0"+str(bb))
  93. extdata['dev_num'] = bb
  94. # 设备状态表不存在、创建状态表:
  95. print("<-----this equip's status is not existed!----->")
  96. try:
  97. e_id = Equip.objects.get(equip_id=imei)
  98. try:
  99. YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],equip_code=bb,sex_type=extdata['proj'][-1],
  100. ybq_status=extdata, lng=extdata['lng'], lat=extdata['lat'],is_online = '1')
  101. YBQdata.objects.create(equip_id=e_id,equip_code=bb,ybq_data=extdata)
  102. print("<-----this equip's status table re-create successed!----->")
  103. except:
  104. print("<-----this equip's status table re-create failed!----->")
  105. except:
  106. print("<-----this equip didn't exist!----->")
  107. if extdata_1['dev_num'] != extdata['dev_num']:
  108. TASK_TOPIC = 'yfkj/xycb/s2c/%s'%imei
  109. payload_1 = {"dev_num":YBQstatus.objects.get(equip_id_id=imei).equip_code}
  110. # publish(主题:Topic; 消息内容)
  111. # client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
  112. # client = mqtt.Client(client_id, transport='tcp')
  113. # client.connect("120.27.222.26", 1883, 60) # 此处端口默认为1883,通信端口期keepalive默认60
  114. # client.loop_start()
  115. client.publish(TASK_TOPIC, json.dumps(payload_1, ensure_ascii=False))
  116. else:
  117. # 设备不存在,在设备列表中创建:
  118. try:
  119. aa = YBQstatus.objects.filter(sex_type="2").first().equip_code
  120. # print(aa)
  121. except Exception as e:
  122. print("错误信息======>>",e)
  123. aa = "0"
  124. bb = int(aa) + 1
  125. bb = ((5-len(str(bb)))*"0"+str(bb))
  126. extdata['dev_num'] = bb
  127. try:
  128. # 得到设备类型实例:
  129. equip_t = Equip_type.objects.get(type_id=4)
  130. try:
  131. e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
  132. print("<-----this imei add successed!----->")
  133. try:
  134. YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],sex_type=extdata['proj'][-1],
  135. equip_code = bb, ybq_status=extdata, lng=extdata['lng'],lat=extdata['lat'], is_online = '1')
  136. YBQdata.objects.create(equip_id=e_id, equip_code = bb, ybq_data=extdata)
  137. print("<-----this imei register successed!----->")
  138. except:
  139. print("<-----this imei register failed!----->")
  140. except:
  141. print("<-----this imei add failed!----->")
  142. except:
  143. print("<-----this imei register failed because this equip type is not exist,!----->")
  144. if extdata_1['dev_num'] == "99999":
  145. TASK_TOPIC = 'yfkj/xycb/s2c/%s'%imei
  146. payload_1 = {"dev_num":YBQstatus.objects.get(equip_id_id=imei).equip_code}
  147. # publish(主题:Topic; 消息内容)
  148. # client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
  149. # client = mqtt.Client(client_id, transport='tcp')
  150. # client.connect("120.27.222.26", 1883, 60) # 此处端口默认为1883,通信端口期keepalive默认60
  151. # client.loop_start()
  152. client.publish(TASK_TOPIC, json.dumps(payload_1, ensure_ascii=False))
  153. print("code==========>>",extdata_1['dev_num'])
  154. elif extdata['proj'][-1] == "1":
  155. if ybq_exist.exists():
  156. # 设备状态表存在、刷新状态表:
  157. print("<-----this equip is existed!----->")
  158. if YBQstatus.objects.filter(equip_id=imei).exists():
  159. print("<-----this equip's status is existed!----->")
  160. try:
  161. e_id = Equip.objects.get(equip_id=imei)
  162. sta = YBQstatus.objects.get(equip_id=imei)
  163. # sta.equip_code = bb
  164. sta.ybq_status=extdata
  165. sta.lng=extdata['lng']
  166. sta.lat=extdata['lat']
  167. sta.is_online = '1'
  168. sta.save()
  169. YBQdata.objects.create(equip_id=e_id,ybq_data=extdata)
  170. print("<-----status update success!----->")
  171. except:
  172. # print("---------------",e)
  173. print("<-----status update failed!----->")
  174. else:
  175. # 设备状态表不存在、创建状态表:
  176. print("<-----this equip's status is not existed!----->")
  177. try:
  178. e_id = Equip.objects.get(equip_id=imei)
  179. try:
  180. YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],sex_type=extdata['proj'][-1],
  181. ybq_status=extdata, lng=extdata['lng'], lat=extdata['lat'],is_online = '1')
  182. YBQdata.objects.create(equip_id=e_id,equip_code=bb,ybq_data=extdata)
  183. print("<-----this equip's status table re-create successed!----->")
  184. except:
  185. print("<-----this equip's status table re-create failed!----->")
  186. except:
  187. print("<-----this equip didn't exist!----->")
  188. else:
  189. # 设备不存在,在设备列表中创建:
  190. try:
  191. # 得到设备类型实例:
  192. equip_t = Equip_type.objects.get(type_id=4)
  193. try:
  194. e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
  195. print("<-----this imei add successed!----->")
  196. try:
  197. YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],sex_type=extdata['proj'][-1],
  198. ybq_status=extdata, lng=extdata['lng'],lat=extdata['lat'], is_online = '1')
  199. YBQdata.objects.create(equip_id=e_id, ybq_data=extdata)
  200. print("<-----this imei register successed!----->")
  201. except:
  202. print("<-----this imei register failed!----->")
  203. except:
  204. print("<-----this imei add failed!----->")
  205. except:
  206. print("<-----this imei register failed because this equip type is not exist,!----->")
  207. # 参数配置信息:
  208. elif payload.get("cmd") == "mqttconf":
  209. print("<-----uploading paramconf!----->")
  210. extdata = payload.get("ext")
  211. print("extdata:", extdata)
  212. # 更新状态表中的参数配置信息;
  213. try:
  214. sta = YBQstatus.objects.get(equip_id=imei)
  215. sta.serverconf=extdata
  216. sta.save()
  217. print("<-----status.paramconf update success!----->")
  218. except:
  219. print("<-----ybqstatus table is not exist,status.paramconf upload failed!----->")
  220. # 离线消息:
  221. elif "offline" in msg.topic:
  222. # 将json字符串解析:
  223. payload = json.loads(msg.payload.decode())
  224. if payload.get("cmd") == "offline":
  225. print("<-----离线消息!----->")
  226. print("%s is offline!" % imei)
  227. ybq_exist = Equip.objects.filter(equip_id=imei)
  228. # 设备存在,进一步判断状态表是否存在:
  229. if ybq_exist.exists():
  230. try:
  231. e_id = Equip.objects.get(equip_id=imei)
  232. # now_time = json.dumps(datetime.datetime.now(), cls=CJSONEncoder)
  233. # 更新状态表中未离线状态:
  234. YBQstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now())
  235. # 创建预警记录:
  236. Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}", e_type="4")
  237. print("update offline ok!")
  238. except:
  239. print("update offline failed!")
  240. else:
  241. print("this imei is not exist!")
  242. if __name__ == '__main__':
  243. client = mqtt.Client(
  244. client_id="PY_MQTT_CLIENTC_YBQ",
  245. clean_session=True,
  246. userdata=None,
  247. # protocol=MQTTv311,# 数据库版本
  248. )
  249. # 必须设置,否则会返回「Connected with result code 4」
  250. client.username_pw_set("admin", "password")
  251. client.on_connect = on_connect
  252. client.on_message = on_message
  253. HOST = "127.0.0.1"
  254. client.connect(HOST, 1883, 60)
  255. client.loop_forever()
  256. # # 输入发布的话题名称:
  257. # # user = input("请输入名称:")
  258. # topic = "/yfkj/ybq/cmd/2001"
  259. # client.user_data_set(topic)
  260. # client.loop_start()
  261. # while True:
  262. # str = input()
  263. # if str:
  264. # client.publish("/yfkj/ybq/cmd/2001", json.dumps({"topic": topic, "cmd": str}))