# -*- coding: utf-8 -*- # File Name:mqtt_chat_client.py # Python Version:3.5.1 import os import django import sys BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录 sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir))) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "yfwlw_pro.settings") # project_name 项目名称 django.setup() print("<-----python_mqtt_client_cbd is run----->") import paho.mqtt.client as mqtt import json from apps.AppInfoManage.models import Equip, Equip_type, CBDdata, CBDstatus, MyUser,Alarm_record, CBDstatus_all,Equip_Forward from apps.ReportManage.all_dict import transpont_equip_cbd,transpont_equip_cbd_new import re import requests import datetime import time import sys class CJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj) # 连接后的操作: 0为成功 def on_connect(client, userdata, flags, rc): # print("Connected with result code "+str(rc)) # for x in transpont_equip_cbd: # client.subscribe("/yfkj/cbd/pub/%s"%x) # client.subscribe("/yfkj/cbd/offline/%s"%x) # client.subscribe("/tran/cbd/pub/12345789") # client.subscribe("/tran/cbd/offline/12345789") # print("---------!---------sub success") # 订阅所有的意思 # equip_list = Equip_Forward.objects.all() # for x in equip_list: client.subscribe("/yfkj/cbd/pub/#") client.subscribe("/yfkj/cbd/offline/#") # 发布消息完成回调函数: def on_publish(msg, rc): if rc == 0: print("publish success,msg = " + msg) # 从服务器接收消息的回调函数 : def on_message(client, userdata, msg): # print(on_connect(client,userdata)) # nowtime = datetime.datetime.now().strftime('%Y%m%d') # origin = sys.stdout # f = open('logs/'+nowtime+'.txt','a') # sys.stdout = f print('\r') print('=================================================') print('\r') print("<-----topic:\n" + msg.topic + ';\n') print("Message:\n" + str(msg.payload) + "----->\n") print("datetime:",datetime.datetime.now()) # ----------------------------------------------------------- # http 转发 # 从主题中获取imei # imei = msg.topic[14:len(msg.topic)] imei = re.sub("\D", "", msg.topic) print("<-----imei:", imei, "----->") payload = json.loads(msg.payload.decode()) if payload.get("cmd") == "warn": print("------------") else: # payload = json.dumps(payload, cls=CJSONEncoder) # print("type_payload",type(payload)) data = {"topic":msg.topic,"payload":payload} time_a = datetime.datetime.now() data = json.dumps(data, cls=CJSONEncoder) print("参数类型为:",type(data)) try: if Equip_Forward.objects.filter(equip_id=imei).exists(): equip_id = Equip_Forward.objects.get(equip_id=imei) # res_1 = requests.post(transpont_equip_cbd[imei], data=data, timeout=3) if equip_id.equip_data_between == "1": res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3) elif equip_id.equip_data_between == "2": headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'} res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3,headers=headers) print("res_1",res_1) print("res_1",res_1.text) print("send success") time_b = datetime.datetime.now() print("一共用时:",time_b-time_a) except Exception as e: print(e) # sys.stdout = origin # f.close() if __name__ == '__main__': client = mqtt.Client( client_id="PY_MQTT_TRANSPOND_CBD", clean_session=True, userdata=None, # protocol=MQTTv311,# 数据库版本 ) # 必须设置,否则会返回「Connected with result code 4」 client.username_pw_set("admin", "password") #设置连接上服务器回调函数: client.on_connect = on_connect #设置接收到服务器消息回调函数: client.on_message = on_message # #设置与服务器断开连接回调函数 # client.on_disconnect = on_disconnect HOST = "127.0.0.1" client.connect(HOST, 1883, 60) client.loop_forever() # logger = logging.getLogger('sourceDns.webdns.views') #刚才在setting.py中配置的logger # logger.error(client.loop_forever()) #直接将错误写入到日志文件 # # 输入发布的话题名称: # # user = input("请输入名称:") # topic = "/yfkj/scd/cmd/2001" # client.user_data_set(topic) # client.loop_start() # while True: # str = input() # if str: # client.publish("/yfkj/scd/cmd/2001", json.dumps({"topic": topic, "cmd": str}))