# -*- coding: utf-8 -*- # File Name:mqtt_chat_client.py # Python Version:3.5.1 import os import django import sys BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录 sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir))) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "yfwlw_pro.settings") # project_name 项目名称 django.setup() print("<-----python_mqtt_client_qxz is run----->") import paho.mqtt.client as mqtt import json from apps.AppInfoManage.models import Alarm_record, Equip, Equip_Forward, Equip_type, MyUser, QXZ_Base_Info, QXZdata, QXZstatus, QXZstatus_New, QXZswitchdata, QXZswitchstatus from apps.ReportManage.all_dict import transpont_equip_qxz,transpont_equip_qxz_li,transpont_equip_qxz_params,qxz_dict_li import re import datetime import requests import time import numpy as np class CJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj) # 连接后的操作: 0为成功 def on_connect(client, userdata, flags, rc): # print("Connected with result code "+str(rc)) # for x in transpont_equip_qxz: # client.subscribe("/yfkj/qxz/pub/%s"%x) # equip_list = Equip_Forward.objects.all() # for x in equip_list: client.subscribe("/yfkj/qxz/pub/#") # for y in transpont_equip_qxz_li: # client.subscribe("/yfkj/qxz/pub/%s"%y) # *****成功发布****** def on_publish(msg, rc): if rc == 0: print("publish success,msg = "+msg) #李新气象站数据转发 def equip_qxz_li(imei): try: qxz_list = QXZstatus_New.objects.get(equip_id=imei) qxz_base = QXZ_Base_Info.objects.get(equip_id=imei) if qxz_list.is_online == "1": is_online = "正常" else: is_online = "异常" info = {"sbid":imei,"time":qxz_list.upl_time.strftime('%Y-%m-%d %H:%M:%S'),"status":is_online,"type":"环境监测"} data = [qxz_list.e1,qxz_list.e2,qxz_list.e3,qxz_list.e4,qxz_list.e5,qxz_list.e6,qxz_list.e7,qxz_list.e8,qxz_list.e9,qxz_list.e10,qxz_list.e11,qxz_list.e12,qxz_list.e13,qxz_list.e14, qxz_list.e15,qxz_list.e16,qxz_list.e17,qxz_list.e18, qxz_list.e19,qxz_list.e20,qxz_list.e21,qxz_list.e22, qxz_list.e23,qxz_list.e24,qxz_list.e25,qxz_list.e26, qxz_list.e27,qxz_list.e28,qxz_list.e29,qxz_list.e30] test = [i for i in data if i != ''] iotdata = [] for i in test: qxz = i.split("#") qxz_title = qxz_dict_li[qxz[1]] iotdata.append({ "name":qxz_title[1],"value":qxz[0],"unit":qxz_title[2], }) iotdata.append({"name":"dl","value":qxz_base.volt,"unit":"%","name":"xhqd","value":qxz_base.rssi,"unit":""}) data = {"info":info,"iotdata":iotdata} data = json.dumps(data) # headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'} # res_1 = requests.post(transpont_equip_qxz_li[imei], data=data, timeout=3,headers=headers) url = transpont_equip_qxz_li[imei] + "?cyyname=" + transpont_equip_qxz_params[imei] + "&data=" + data res_1 = requests.get(url) if res_1.status_code == "200": data = 1 else: data = 0 except Exception as e: print("错误信息---------》",e) data = 0 return data # 从服务器接受到消息后回调此函数 : def on_message(client, userdata, msg): print('\r') print('=================================================') print('\r') print("<-----topic:\n" + msg.topic + ';\n') print("Message:\n" + str(msg.payload) + "----->\n") # 从主题中获取imei # imei = msg.topic[14:len(msg.topic)] imei = re.sub("\D", "", msg.topic) print("<-----imei:", imei, "----->") try: # 判断主题: if "pub" in msg.topic: # 将json字符串解析: payload = json.loads(msg.payload.decode()) if payload.get("cmd") == "terminalData": print("<-----transpond data!----->") data = payload.get("ext") date = data["data"] dats = [] for x in date: dats.append(float(x["eValue"])) arr = (np.array(dats) == 0.0).all() #判断要素值是否全部为0 if -99.99 not in dats and arr == False: data = json.dumps(data, cls=CJSONEncoder) print("参数类型为:",type(data)) print("发送数据为:",data) nowtime = datetime.datetime.now().strftime('%Y%m%d') origin = sys.stdout f = open('../logs/'+ 'qxz' + nowtime +'.txt','a') sys.stdout = f print("------>>",imei) try: if Equip_Forward.objects.filter(equip_id=imei).exists(): equip_id = Equip_Forward.objects.get(equip_id=imei) if equip_id.equip_data_between == "1": res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3) elif equip_id.equip_data_between == "2": headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'} res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3,headers=headers) print("res_1",res_1) except Exception as e: print(e) sys.stdout = origin f.close() if imei in transpont_equip_qxz_li.keys(): time.sleep(10) res_1 = equip_qxz_li(imei) if res_1 == 1: print("发送成功") else: print("发送失败") # print("res_1",res_1.text) print("send success") except Exception as e: print(e) if __name__ == '__main__': client = mqtt.Client( client_id="PY_MQTT_TRANSPOND_QXZ", clean_session=True, userdata=None, # protocol=MQTTv311,# 数据库版本 ) # 必须设置,否则会返回「Connected with result code 4」 client.username_pw_set("admin", "password") client.on_connect = on_connect client.on_message = on_message HOST = "127.0.0.1" client.connect(HOST, 1883, 60) client.loop_forever()