cbd_mqtt_transpond.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # -*- coding: utf-8 -*-
  2. # File Name:mqtt_chat_client.py
  3. # Python Version:3.5.1
  4. import os
  5. import django
  6. import sys
  7. BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
  8. sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE",
  10. "yfwlw_pro.settings") # project_name 项目名称
  11. django.setup()
  12. print("<-----python_mqtt_client_cbd is run----->")
  13. import paho.mqtt.client as mqtt
  14. import json
  15. from apps.AppInfoManage.models import Equip, Equip_type, CBDdata, CBDstatus, MyUser,Alarm_record, CBDstatus_all,Equip_Forward
  16. from apps.ReportManage.all_dict import transpont_equip_cbd,transpont_equip_cbd_new
  17. import re
  18. import requests
  19. import datetime
  20. import time
  21. import sys
  22. class CJSONEncoder(json.JSONEncoder):
  23. def default(self, obj):
  24. if isinstance(obj, datetime):
  25. return obj.strftime('%Y-%m-%d %H:%M:%S')
  26. elif isinstance(obj, date):
  27. return obj.strftime('%Y-%m-%d')
  28. else:
  29. return json.JSONEncoder.default(self, obj)
  30. # 连接后的操作: 0为成功
  31. def on_connect(client, userdata, flags, rc):
  32. # print("Connected with result code "+str(rc))
  33. # for x in transpont_equip_cbd:
  34. # client.subscribe("/yfkj/cbd/pub/%s"%x)
  35. # client.subscribe("/yfkj/cbd/offline/%s"%x)
  36. # client.subscribe("/tran/cbd/pub/12345789")
  37. # client.subscribe("/tran/cbd/offline/12345789")
  38. # print("---------!---------sub success")
  39. # 订阅所有的意思
  40. # equip_list = Equip_Forward.objects.all()
  41. # for x in equip_list:
  42. client.subscribe("/yfkj/cbd/pub/#")
  43. client.subscribe("/yfkj/cbd/offline/#")
  44. # 发布消息完成回调函数:
  45. def on_publish(msg, rc):
  46. if rc == 0:
  47. print("publish success,msg = " + msg)
  48. # 从服务器接收消息的回调函数 :
  49. def on_message(client, userdata, msg):
  50. # print(on_connect(client,userdata))
  51. # nowtime = datetime.datetime.now().strftime('%Y%m%d')
  52. # origin = sys.stdout
  53. # f = open('logs/'+nowtime+'.txt','a')
  54. # sys.stdout = f
  55. print('\r')
  56. print('=================================================')
  57. print('\r')
  58. print("<-----topic:\n" + msg.topic + ';\n')
  59. print("Message:\n" + str(msg.payload) + "----->\n")
  60. print("datetime:",datetime.datetime.now())
  61. # -----------------------------------------------------------
  62. # http 转发
  63. # 从主题中获取imei
  64. # imei = msg.topic[14:len(msg.topic)]
  65. imei = re.sub("\D", "", msg.topic)
  66. print("<-----imei:", imei, "----->")
  67. payload = json.loads(msg.payload.decode())
  68. if payload.get("cmd") == "warn":
  69. print("------------")
  70. else:
  71. # payload = json.dumps(payload, cls=CJSONEncoder)
  72. # print("type_payload",type(payload))
  73. data = {"topic":msg.topic,"payload":payload}
  74. time_a = datetime.datetime.now()
  75. data = json.dumps(data, cls=CJSONEncoder)
  76. print("参数类型为:",type(data))
  77. try:
  78. if Equip_Forward.objects.filter(equip_id=imei).exists():
  79. equip_id = Equip_Forward.objects.get(equip_id=imei)
  80. # res_1 = requests.post(transpont_equip_cbd[imei], data=data, timeout=3)
  81. if equip_id.equip_data_between == "1":
  82. res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3)
  83. elif equip_id.equip_data_between == "2":
  84. headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'}
  85. res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3,headers=headers)
  86. print("res_1",res_1)
  87. print("res_1",res_1.text)
  88. print("send success")
  89. time_b = datetime.datetime.now()
  90. print("一共用时:",time_b-time_a)
  91. except Exception as e:
  92. print(e)
  93. # sys.stdout = origin
  94. # f.close()
  95. if __name__ == '__main__':
  96. client = mqtt.Client(
  97. client_id="PY_MQTT_TRANSPOND_CBD",
  98. clean_session=True,
  99. userdata=None,
  100. # protocol=MQTTv311,# 数据库版本
  101. )
  102. # 必须设置,否则会返回「Connected with result code 4」
  103. client.username_pw_set("admin", "password")
  104. #设置连接上服务器回调函数:
  105. client.on_connect = on_connect
  106. #设置接收到服务器消息回调函数:
  107. client.on_message = on_message
  108. # #设置与服务器断开连接回调函数
  109. # client.on_disconnect = on_disconnect
  110. HOST = "127.0.0.1"
  111. client.connect(HOST, 1883, 60)
  112. client.loop_forever()
  113. # logger = logging.getLogger('sourceDns.webdns.views') #刚才在setting.py中配置的logger
  114. # logger.error(client.loop_forever()) #直接将错误写入到日志文件
  115. # # 输入发布的话题名称:
  116. # # user = input("请输入名称:")
  117. # topic = "/yfkj/scd/cmd/2001"
  118. # client.user_data_set(topic)
  119. # client.loop_start()
  120. # while True:
  121. # str = input()
  122. # if str:
  123. # client.publish("/yfkj/scd/cmd/2001", json.dumps({"topic": topic, "cmd": str}))