bzy_mqtt_transpond.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_bzy is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Alarm_record, BZYdata, BZYstatus, Equip, Equip_Forward, Equip_type, MyUser
  16. from apps.ReportManage.all_dict import transpont_equip_bzy
  17. import re
  18. import datetime
  19. import time
  20. import requests
  21. class CJSONEncoder(json.JSONEncoder):
  22. def default(self, obj):
  23. if isinstance(obj, datetime):
  24. return obj.strftime('%Y-%m-%d %H:%M:%S')
  25. elif isinstance(obj, date):
  26. return obj.strftime('%Y-%m-%d')
  27. else:
  28. return json.JSONEncoder.default(self, obj)
  29. # 连接后的操作: 0为成功
  30. def on_connect(client, userdata, flags, rc):
  31. # print("Connected with result code "+str(rc))
  32. # for x in transpont_equip_bzy:
  33. # client.subscribe("/yfkj/bzy/c2s/%s"%x)
  34. # client.subscribe("/yfkj/bzy/offline/%s"%x)
  35. client.subscribe("/yfkj/bzy/c2s/#")
  36. client.subscribe("/yfkj/bzy/offline/#")
  37. # *****成功发布******
  38. def on_publish(msg, rc):
  39. if rc == 0:
  40. print("publish success,msg = "+msg)
  41. # 从服务器接受到消息后回调此函数 :
  42. def on_message(client, userdata, msg):
  43. print('\r')
  44. print('=================================================')
  45. print('\r')
  46. print("<-----topic:\n" + msg.topic + ';\n')
  47. print("Message:\n" + str(msg.payload) + "----->\n")
  48. # 从主题中获取imei
  49. # imei = msg.topic[14:len(msg.topic)]
  50. reg = re.compile(r"(?<=/)\d+")
  51. imei = reg.search(msg.topic).group(0)
  52. # imei = re.sub("\D", "", msg.topic)
  53. # nowtime = datetime.datetime.now().strftime('%Y%m%d')
  54. # origin = sys.stdout
  55. # f = open('./logs/'+nowtime+'bzymqtt.txt','a+')
  56. # sys.stdout = f
  57. print("<-----imei:", imei, "----->")
  58. # 判断主题:
  59. if "c2s" in msg.topic:
  60. # 将json字符串解析:
  61. payload = json.loads(msg.payload.decode())
  62. data = {"topic":msg.topic,"payload":payload}
  63. print(data)
  64. try:
  65. if Equip_Forward.objects.filter(equip_id=imei).exists():
  66. equip_id = Equip_Forward.objects.get(equip_id=imei)
  67. time_a = datetime.datetime.now()
  68. data = json.dumps(data, cls=CJSONEncoder)
  69. print("参数类型为:",type(data))
  70. headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'}
  71. res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3,headers=headers)
  72. print("res_1",res_1)
  73. print("res_1",res_1.text)
  74. print("send success")
  75. time_b = datetime.datetime.now()
  76. print("一共用时:",time_b-time_a)
  77. except Exception as e:
  78. print(e)
  79. # sys.stdout = origin
  80. # f.close()
  81. if __name__ == '__main__':
  82. client = mqtt.Client(
  83. client_id="PY_MQTT_TRANSPOND_BZY",
  84. clean_session=True,
  85. userdata=None,
  86. # protocol=MQTTv311,# 数据库版本
  87. )
  88. # 必须设置,否则会返回「Connected with result code 4」
  89. client.username_pw_set("admin", "password")
  90. client.on_connect = on_connect
  91. client.on_message = on_message
  92. HOST = "127.0.0.1"
  93. client.connect(HOST, 1883, 60)
  94. client.loop_forever()