# -*- coding: utf-8 -*- # File Name:mqtt_chat_client.py # Python Version:3.5.1 import os import django import sys BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录 sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir))) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "yfwlw_pro.settings") # project_name 项目名称 django.setup() print("<-----python_mqtt_client_ybq is run----->") import paho.mqtt.client as mqtt import json from apps.AppInfoManage.models import Equip, Equip_type, SCDdata, SCDstatus, MyUser, Alarm_record, RecentSCDdata, YBQdata, YBQstatus import re import datetime import copy class CJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj) # 连接后的操作: 0为成功 def on_connect(client, userdata, flags, rc): # print("Connected with result code "+str(rc)) client.subscribe("yfkj/xycb/c2s/#") client.subscribe("yfkj/xycb/offline/#") # *****成功发布****** def on_publish(msg, rc): if rc == 0: print("publish success,msg = "+msg) # 从服务器接受到消息后回调此函数 : def on_message(client, userdata, msg): print('\r') print('=================================================') print('\r') print("<-----topic:\n" + msg.topic + ';\n') print("Message:\n" + str(msg.payload) + "----->\n") # 从主题中获取imei # imei = msg.topic[14:len(msg.topic)] # imei = re.sub("\D", "", msg.topic) reg = re.compile(r"(?<=/)\d+") imei = reg.search(msg.topic).group(0) print("<-----imei:", imei, "----->") # 判断主题: if "c2s" in msg.topic: # 将json字符串解析 print("------!!!",msg.payload.decode("utf8","replace")) print("------type",type(msg.payload.decode("utf8","replace"))) try: payload = json.loads(msg.payload.decode("utf8","replace")) except: payload = "" if payload.get("cmd") == "data": print("<-----uploading data and status!----->") extdata = payload.get("ext") extdata_1 = copy.deepcopy(extdata) # print("extdata:", extdata) ybq_exist = Equip.objects.filter(equip_id=imei) # 设备存在,进一步判断状态表是否存在: if extdata['proj'][-1] == "2": if ybq_exist.exists(): # 设备状态表存在、刷新状态表: print("<-----this equip is existed!----->") if YBQstatus.objects.filter(equip_id=imei).exists(): print("<-----this equip's status is existed!----->") try: e_id = Equip.objects.get(equip_id=imei) sta = YBQstatus.objects.get(equip_id=imei) extdata['dev_num'] = sta.equip_code # sta.equip_code = bb sta.ybq_status=extdata sta.lng=extdata['lng'] sta.lat=extdata['lat'] sta.is_online = '1' sta.save() YBQdata.objects.create(equip_id=e_id,equip_code=sta.equip_code,ybq_data=extdata) print("<-----status update success!----->") except: # print("---------------",e) print("<-----status update failed!----->") else: try: aa = YBQstatus.objects.first(sex_type="2").equip_code except: aa = "0" bb = int(aa) + 1 bb = ((5-len(str(bb)))*"0"+str(bb)) extdata['dev_num'] = bb # 设备状态表不存在、创建状态表: print("<-----this equip's status is not existed!----->") try: e_id = Equip.objects.get(equip_id=imei) try: YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],equip_code=bb,sex_type=extdata['proj'][-1], ybq_status=extdata, lng=extdata['lng'], lat=extdata['lat'],is_online = '1') YBQdata.objects.create(equip_id=e_id,equip_code=bb,ybq_data=extdata) print("<-----this equip's status table re-create successed!----->") except: print("<-----this equip's status table re-create failed!----->") except: print("<-----this equip didn't exist!----->") if extdata_1['dev_num'] != extdata['dev_num']: TASK_TOPIC = 'yfkj/xycb/s2c/%s'%imei payload_1 = {"dev_num":YBQstatus.objects.get(equip_id_id=imei).equip_code} # publish(主题:Topic; 消息内容) # client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) # client = mqtt.Client(client_id, transport='tcp') # client.connect("120.27.222.26", 1883, 60) # 此处端口默认为1883,通信端口期keepalive默认60 # client.loop_start() client.publish(TASK_TOPIC, json.dumps(payload_1, ensure_ascii=False)) else: # 设备不存在,在设备列表中创建: try: aa = YBQstatus.objects.filter(sex_type="2").first().equip_code # print(aa) except Exception as e: print("错误信息======>>",e) aa = "0" bb = int(aa) + 1 bb = ((5-len(str(bb)))*"0"+str(bb)) extdata['dev_num'] = bb try: # 得到设备类型实例: equip_t = Equip_type.objects.get(type_id=4) try: e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t) print("<-----this imei add successed!----->") try: YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],sex_type=extdata['proj'][-1], equip_code = bb, ybq_status=extdata, lng=extdata['lng'],lat=extdata['lat'], is_online = '1') YBQdata.objects.create(equip_id=e_id, equip_code = bb, ybq_data=extdata) print("<-----this imei register successed!----->") except: print("<-----this imei register failed!----->") except: print("<-----this imei add failed!----->") except: print("<-----this imei register failed because this equip type is not exist,!----->") if extdata_1['dev_num'] == "99999": TASK_TOPIC = 'yfkj/xycb/s2c/%s'%imei payload_1 = {"dev_num":YBQstatus.objects.get(equip_id_id=imei).equip_code} # publish(主题:Topic; 消息内容) # client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) # client = mqtt.Client(client_id, transport='tcp') # client.connect("120.27.222.26", 1883, 60) # 此处端口默认为1883,通信端口期keepalive默认60 # client.loop_start() client.publish(TASK_TOPIC, json.dumps(payload_1, ensure_ascii=False)) print("code==========>>",extdata_1['dev_num']) elif extdata['proj'][-1] == "1": if ybq_exist.exists(): # 设备状态表存在、刷新状态表: print("<-----this equip is existed!----->") if YBQstatus.objects.filter(equip_id=imei).exists(): print("<-----this equip's status is existed!----->") try: e_id = Equip.objects.get(equip_id=imei) sta = YBQstatus.objects.get(equip_id=imei) # sta.equip_code = bb sta.ybq_status=extdata sta.lng=extdata['lng'] sta.lat=extdata['lat'] sta.is_online = '1' sta.save() YBQdata.objects.create(equip_id=e_id,ybq_data=extdata) print("<-----status update success!----->") except: # print("---------------",e) print("<-----status update failed!----->") else: # 设备状态表不存在、创建状态表: print("<-----this equip's status is not existed!----->") try: e_id = Equip.objects.get(equip_id=imei) try: YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],sex_type=extdata['proj'][-1], ybq_status=extdata, lng=extdata['lng'], lat=extdata['lat'],is_online = '1') YBQdata.objects.create(equip_id=e_id,equip_code=bb,ybq_data=extdata) print("<-----this equip's status table re-create successed!----->") except: print("<-----this equip's status table re-create failed!----->") except: print("<-----this equip didn't exist!----->") else: # 设备不存在,在设备列表中创建: try: # 得到设备类型实例: equip_t = Equip_type.objects.get(type_id=4) try: e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t) print("<-----this imei add successed!----->") try: YBQstatus.objects.create(equip_id=e_id, ds=extdata['ds'],sex_type=extdata['proj'][-1], ybq_status=extdata, lng=extdata['lng'],lat=extdata['lat'], is_online = '1') YBQdata.objects.create(equip_id=e_id, ybq_data=extdata) print("<-----this imei register successed!----->") except: print("<-----this imei register failed!----->") except: print("<-----this imei add failed!----->") except: print("<-----this imei register failed because this equip type is not exist,!----->") # 参数配置信息: elif payload.get("cmd") == "mqttconf": print("<-----uploading paramconf!----->") extdata = payload.get("ext") print("extdata:", extdata) # 更新状态表中的参数配置信息; try: sta = YBQstatus.objects.get(equip_id=imei) sta.serverconf=extdata sta.save() print("<-----status.paramconf update success!----->") except: print("<-----ybqstatus table is not exist,status.paramconf upload failed!----->") # 离线消息: elif "offline" in msg.topic: # 将json字符串解析: payload = json.loads(msg.payload.decode()) if payload.get("cmd") == "offline": print("<-----离线消息!----->") print("%s is offline!" % imei) ybq_exist = Equip.objects.filter(equip_id=imei) # 设备存在,进一步判断状态表是否存在: if ybq_exist.exists(): try: e_id = Equip.objects.get(equip_id=imei) # now_time = json.dumps(datetime.datetime.now(), cls=CJSONEncoder) # 更新状态表中未离线状态: YBQstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now()) # 创建预警记录: Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}", e_type="4") print("update offline ok!") except: print("update offline failed!") else: print("this imei is not exist!") if __name__ == '__main__': client = mqtt.Client( client_id="PY_MQTT_CLIENTC_YBQ", clean_session=True, userdata=None, # protocol=MQTTv311,# 数据库版本 ) # 必须设置,否则会返回「Connected with result code 4」 client.username_pw_set("admin", "password") client.on_connect = on_connect client.on_message = on_message HOST = "127.0.0.1" client.connect(HOST, 1883, 60) client.loop_forever() # # 输入发布的话题名称: # # user = input("请输入名称:") # topic = "/yfkj/ybq/cmd/2001" # client.user_data_set(topic) # client.loop_start() # while True: # str = input() # if str: # client.publish("/yfkj/ybq/cmd/2001", json.dumps({"topic": topic, "cmd": str}))