| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- # -*- coding: utf-8 -*-
- # File Name:mqtt_chat_client.py
- # Python Version:3.5.1
- import os
- import django
- import sys
- BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
- sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
- os.environ.setdefault("DJANGO_SETTINGS_MODULE",
- "yfwlw_pro.settings") # project_name 项目名称
- django.setup()
- print("<-----python_mqtt_client_qxz is run----->")
- import paho.mqtt.client as mqtt
- import json
- from apps.AppInfoManage.models import Alarm_record, Equip, Equip_Forward, Equip_type, MyUser, QXZ_Base_Info, QXZdata, QXZstatus, QXZstatus_New, QXZswitchdata, QXZswitchstatus
- from apps.ReportManage.all_dict import transpont_equip_qxz,transpont_equip_qxz_li,transpont_equip_qxz_li,transpont_equip_qxz_params,qxz_dict_li
- import re
- import datetime
- import requests
- import time
- import numpy as np
- class CJSONEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(obj, date):
- return obj.strftime('%Y-%m-%d')
- else:
- return json.JSONEncoder.default(self, obj)
- # 连接后的操作: 0为成功
- def on_connect(client, userdata, flags, rc):
- # print("Connected with result code "+str(rc))
- # for x in transpont_equip_qxz:
- # client.subscribe("/yfkj/qxz/pub/%s"%x)
- # equip_list = Equip_Forward.objects.all()
- # for x in equip_list:
- client.subscribe("/yfkj/qxz/pub/#")
- # for y in transpont_equip_qxz_li:
- # client.subscribe("/yfkj/qxz/pub/%s"%y)
- # *****成功发布******
- def on_publish(msg, rc):
- if rc == 0:
- print("publish success,msg = "+msg)
- #李新气象站数据转发
- def equip_qxz_li(imei):
- try:
- qxz_list = QXZstatus_New.objects.get(equip_id=imei)
- qxz_base = QXZ_Base_Info.objects.get(equip_id=imei)
- if qxz_list.is_online == "1":
- is_online = "正常"
- else:
- is_online = "异常"
- info = {"sbid":imei,"time":qxz_list.upl_time.strftime('%Y-%m-%d %H:%M:%S'),"status":is_online,"type":"环境监测"}
- data = [qxz_list.e1,qxz_list.e2,qxz_list.e3,qxz_list.e4,qxz_list.e5,qxz_list.e6,qxz_list.e7,qxz_list.e8,qxz_list.e9,qxz_list.e10,qxz_list.e11,qxz_list.e12,qxz_list.e13,qxz_list.e14,
- qxz_list.e15,qxz_list.e16,qxz_list.e17,qxz_list.e18,
- qxz_list.e19,qxz_list.e20,qxz_list.e21,qxz_list.e22,
- qxz_list.e23,qxz_list.e24,qxz_list.e25,qxz_list.e26,
- qxz_list.e27,qxz_list.e28,qxz_list.e29,qxz_list.e30]
- test = [i for i in data if i != '']
- iotdata = []
- for i in test:
- qxz = i.split("#")
- qxz_title = qxz_dict_li[qxz[1]]
- iotdata.append({
- "name":qxz_title[1],"value":qxz[0],"unit":qxz_title[2],
- })
- iotdata.append({"name":"dl","value":qxz_base.volt,"unit":"%","name":"xhqd","value":qxz_base.rssi,"unit":""})
- data = {"info":info,"iotdata":iotdata}
- data = json.dumps(data)
- # headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'}
- # res_1 = requests.post(transpont_equip_qxz_li[imei], data=data, timeout=3,headers=headers)
- url = transpont_equip_qxz_li[imei] + "?cyyname=" + transpont_equip_qxz_params[imei] + "&data=" + data
- res_1 = requests.get(url)
- if res_1.status_code == "200":
- data = 1
- else:
- data = 0
- except Exception as e:
- print("错误信息---------》",e)
- data = 0
- return data
- # 从服务器接受到消息后回调此函数 :
- def on_message(client, userdata, msg):
- print('\r')
- print('=================================================')
- print('\r')
- print("<-----topic:\n" + msg.topic + ';\n')
- print("Message:\n" + str(msg.payload) + "----->\n")
- # 从主题中获取imei
- # imei = msg.topic[14:len(msg.topic)]
- imei = re.sub("\D", "", msg.topic)
- print("<-----imei:", imei, "----->")
- try:
- # 判断主题:
- if "pub" in msg.topic:
- # 将json字符串解析:
- payload = json.loads(msg.payload.decode())
- if payload.get("cmd") == "terminalData":
- print("<-----transpond data!----->")
- data = payload.get("ext")
- date = data["data"]
- dats = []
- for x in date:
- dats.append(float(x["eValue"]))
- arr = (np.array(dats) == 0.0).all() #判断要素值是否全部为0
- if -99.99 not in dats and arr == False:
- data = json.dumps(data, cls=CJSONEncoder)
- print("参数类型为:",type(data))
- print("发送数据为:",data)
- try:
- if Equip_Forward.objects.filter(equip_id=imei).exists():
- equip_id = Equip_Forward.objects.get(equip_id=imei)
- if equip_id.equip_data_between == "1":
- res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3)
- elif equip_id.equip_data_between == "2":
- headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'}
- res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3,headers=headers)
- except Exception as e:
- print(e)
- if imei in transpont_equip_qxz_li.keys():
- time.sleep(10)
- res_1 = equip_qxz_li(imei)
- if res_1 == 1:
- print("发送成功")
- else:
- print("发送失败")
- # print("res_1",res_1.text)
- print("send success")
- except Exception as e:
- print(e)
- if __name__ == '__main__':
- client = mqtt.Client(
- client_id="PY_MQTT_TRANSPOND_QXZ",
- clean_session=True,
- userdata=None,
- # protocol=MQTTv311,# 数据库版本
- )
- # 必须设置,否则会返回「Connected with result code 4」
- client.username_pw_set("admin", "password")
- client.on_connect = on_connect
- client.on_message = on_message
- HOST = "127.0.0.1"
- client.connect(HOST, 1883, 60)
- client.loop_forever()
|