qxz_mqtt_transpond.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_qxz is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Alarm_record, Equip, Equip_Forward, Equip_type, MyUser, QXZ_Base_Info, QXZdata, QXZstatus, QXZstatus_New, QXZswitchdata, QXZswitchstatus
  16. from apps.ReportManage.all_dict import transpont_equip_qxz,transpont_equip_qxz_li,transpont_equip_qxz_li,transpont_equip_qxz_params,qxz_dict_li
  17. import re
  18. import datetime
  19. import requests
  20. import time
  21. import numpy as np
  22. class CJSONEncoder(json.JSONEncoder):
  23. def default(self, obj):
  24. if isinstance(obj, datetime):
  25. return obj.strftime('%Y-%m-%d %H:%M:%S')
  26. elif isinstance(obj, date):
  27. return obj.strftime('%Y-%m-%d')
  28. else:
  29. return json.JSONEncoder.default(self, obj)
  30. # 连接后的操作: 0为成功
  31. def on_connect(client, userdata, flags, rc):
  32. # print("Connected with result code "+str(rc))
  33. # for x in transpont_equip_qxz:
  34. # client.subscribe("/yfkj/qxz/pub/%s"%x)
  35. # equip_list = Equip_Forward.objects.all()
  36. # for x in equip_list:
  37. client.subscribe("/yfkj/qxz/pub/#")
  38. # for y in transpont_equip_qxz_li:
  39. # client.subscribe("/yfkj/qxz/pub/%s"%y)
  40. # *****成功发布******
  41. def on_publish(msg, rc):
  42. if rc == 0:
  43. print("publish success,msg = "+msg)
  44. #李新气象站数据转发
  45. def equip_qxz_li(imei):
  46. try:
  47. qxz_list = QXZstatus_New.objects.get(equip_id=imei)
  48. qxz_base = QXZ_Base_Info.objects.get(equip_id=imei)
  49. if qxz_list.is_online == "1":
  50. is_online = "正常"
  51. else:
  52. is_online = "异常"
  53. info = {"sbid":imei,"time":qxz_list.upl_time.strftime('%Y-%m-%d %H:%M:%S'),"status":is_online,"type":"环境监测"}
  54. data = [qxz_list.e1,qxz_list.e2,qxz_list.e3,qxz_list.e4,qxz_list.e5,qxz_list.e6,qxz_list.e7,qxz_list.e8,qxz_list.e9,qxz_list.e10,qxz_list.e11,qxz_list.e12,qxz_list.e13,qxz_list.e14,
  55. qxz_list.e15,qxz_list.e16,qxz_list.e17,qxz_list.e18,
  56. qxz_list.e19,qxz_list.e20,qxz_list.e21,qxz_list.e22,
  57. qxz_list.e23,qxz_list.e24,qxz_list.e25,qxz_list.e26,
  58. qxz_list.e27,qxz_list.e28,qxz_list.e29,qxz_list.e30]
  59. test = [i for i in data if i != '']
  60. iotdata = []
  61. for i in test:
  62. qxz = i.split("#")
  63. qxz_title = qxz_dict_li[qxz[1]]
  64. iotdata.append({
  65. "name":qxz_title[1],"value":qxz[0],"unit":qxz_title[2],
  66. })
  67. iotdata.append({"name":"dl","value":qxz_base.volt,"unit":"%","name":"xhqd","value":qxz_base.rssi,"unit":""})
  68. data = {"info":info,"iotdata":iotdata}
  69. data = json.dumps(data)
  70. # headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'}
  71. # res_1 = requests.post(transpont_equip_qxz_li[imei], data=data, timeout=3,headers=headers)
  72. url = transpont_equip_qxz_li[imei] + "?cyyname=" + transpont_equip_qxz_params[imei] + "&data=" + data
  73. res_1 = requests.get(url)
  74. if res_1.status_code == "200":
  75. data = 1
  76. else:
  77. data = 0
  78. except Exception as e:
  79. print("错误信息---------》",e)
  80. data = 0
  81. return data
  82. # 从服务器接受到消息后回调此函数 :
  83. def on_message(client, userdata, msg):
  84. print('\r')
  85. print('=================================================')
  86. print('\r')
  87. print("<-----topic:\n" + msg.topic + ';\n')
  88. print("Message:\n" + str(msg.payload) + "----->\n")
  89. # 从主题中获取imei
  90. # imei = msg.topic[14:len(msg.topic)]
  91. imei = re.sub("\D", "", msg.topic)
  92. print("<-----imei:", imei, "----->")
  93. try:
  94. # 判断主题:
  95. if "pub" in msg.topic:
  96. # 将json字符串解析:
  97. payload = json.loads(msg.payload.decode())
  98. if payload.get("cmd") == "terminalData":
  99. print("<-----transpond data!----->")
  100. data = payload.get("ext")
  101. date = data["data"]
  102. dats = []
  103. for x in date:
  104. dats.append(float(x["eValue"]))
  105. arr = (np.array(dats) == 0.0).all() #判断要素值是否全部为0
  106. if -99.99 not in dats and arr == False:
  107. data = json.dumps(data, cls=CJSONEncoder)
  108. print("参数类型为:",type(data))
  109. print("发送数据为:",data)
  110. try:
  111. if Equip_Forward.objects.filter(equip_id=imei).exists():
  112. equip_id = Equip_Forward.objects.get(equip_id=imei)
  113. if equip_id.equip_data_between == "1":
  114. res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3)
  115. elif equip_id.equip_data_between == "2":
  116. headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'}
  117. res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3,headers=headers)
  118. except Exception as e:
  119. print(e)
  120. if imei in transpont_equip_qxz_li.keys():
  121. time.sleep(10)
  122. res_1 = equip_qxz_li(imei)
  123. if res_1 == 1:
  124. print("发送成功")
  125. else:
  126. print("发送失败")
  127. # print("res_1",res_1.text)
  128. print("send success")
  129. except Exception as e:
  130. print(e)
  131. if __name__ == '__main__':
  132. client = mqtt.Client(
  133. client_id="PY_MQTT_TRANSPOND_QXZ",
  134. clean_session=True,
  135. userdata=None,
  136. # protocol=MQTTv311,# 数据库版本
  137. )
  138. # 必须设置,否则会返回「Connected with result code 4」
  139. client.username_pw_set("admin", "password")
  140. client.on_connect = on_connect
  141. client.on_message = on_message
  142. HOST = "127.0.0.1"
  143. client.connect(HOST, 1883, 60)
  144. client.loop_forever()