| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- # -*- coding: utf-8 -*-
- # File Name:mqtt_chat_client.py
- # Python Version:3.5.1
- import os
- import django
- import sys
- BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
- sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
- os.environ.setdefault("DJANGO_SETTINGS_MODULE",
- "yfwlw_pro.settings") # project_name 项目名称
- django.setup()
- print("<-----python_mqtt_client_cbd is run----->")
- import paho.mqtt.client as mqtt
- import json
- from apps.AppInfoManage.models import Equip, Equip_type, CBDdata, CBDstatus, MyUser,Alarm_record, CBDstatus_all, RecentCBDdata
- import re
- import datetime
- import shutil,os
- import threading
- class CJSONEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(obj, date):
- return obj.strftime('%Y-%m-%d')
- else:
- return json.JSONEncoder.default(self, obj)
- # 连接后的操作: 0为成功
- def on_connect(client, userdata, flags, rc):
- # print("Connected with result code "+str(rc))
- client.subscribe("/yfkj/cbd/pub/#")
- client.subscribe("/yfkj/cbd/offline/#")
- # 发布消息完成回调函数:
- def on_publish(msg, rc):
- if rc == 0:
- print("publish success,msg = " + msg)
- def msg_thread(msg):
- print('\r')
- print('=================================================')
- print('\r')
- print("<-----topic:\n" + msg.topic + ';\n')
- print("Message:\n" + str(msg.payload) + "----->\n")
- # 从主题中获取imei
- # imei = msg.topic[14:len(msg.topic)]
- imei = re.sub("\D", "", msg.topic)
- print("<-----imei:", imei, "----->")
- # 判断主题:
- if "pub" in msg.topic:
- # 将json字符串解析:
- payload = json.loads(msg.payload.decode())
- if payload.get("cmd") == "status":
- print("<-----uploading status!----->")
- extdata = payload.get("ext")
- # print("extdata:", extdata)
- cbd_exist = Equip.objects.filter(equip_id=imei)
- # 设备存在,进一步判断状态表是否存在:
- if cbd_exist.exists():
- try:
- e_id = Equip.objects.get(equip_id=imei)
- CBDstatus_all.objects.create(
- equip_id=e_id, cbd_status=extdata, lng=extdata['lng'], lat=extdata['lat'],is_online = '1')
- print("<-----status_all upload success!----->")
- except:
- print("<-----status_all upload failed!----->")
- # 设备状态表存在、刷新状态表:
- print("<-----this equip is existed!----->")
- if CBDstatus.objects.filter(equip_id=imei).exists():
- print("<-----this equip's status is existed!----->")
- try:
- sta = CBDstatus.objects.get(equip_id=imei)
- sta.cbd_status=extdata
- sta.lng=extdata['lng']
- sta.lat=extdata['lat']
- sta.is_online = '1'
- sta.save()
- print("<-----status update success!----->")
- except:
- print("<-----status update failed!----->")
- else:
- # 设备状态表不存在、创建状态表:
- print("<-----this equip's status is not existed!----->")
- try:
- e_id = Equip.objects.get(equip_id=imei)
- try:
- CBDstatus.objects.create(equip_id=e_id, cbd_status=extdata,
- lng=extdata['lng'], lat=extdata['lat'],
- is_online = '1'
- )
- print("<-----this equip's status table re-create successed!----->")
- except:
- print("<-----this equip's status table re-create failed!----->")
- except:
- print("<-----this equip didn't exist!----->")
- else:
- # 设备不存在,在设备列表中创建:
- try:
- # 得到设备类型实例:
- equip_t = Equip_type.objects.get(type_id=extdata['dtype'])
- try:
- e_id = Equip.objects.create(equip_id=imei, equip_type=equip_t)
- print("<-----this imei add successed!----->")
- try:
- dver = extdata['dver']
- if dver.startswith("5") and dver.endswith("HK"):
- etype = "4.0测报灯"
- elif dver.startswith("4") and dver.endswith("HK"):
- etype = "3.0测报灯"
- elif dver.startswith("4") and dver.endswith("re"):
- etype = "3.0测报灯"
- elif dver.startswith("1") or dver.startswith("2") or dver.startswith("3"):
- etype = "2.0测报灯"
- elif dver.endswith("ZP"):
- etype = "1.0测报灯"
- elif dver.startswith("4") and dver.endswith("GK"):
- etype = "高空测报灯"
- elif dver.endswith("TC"):
- etype = "糖醋测报灯"
- elif dver.startswith("5") and dver.endswith("gk"):
- etype = "简易高空测报灯"
- CBDstatus.objects.create(equip_id=e_id, cbd_status=extdata,
- lng=extdata['lng'], lat=extdata['lat'],etype=etype,
- is_online = '1'
- )
- print("<-----this imei register successed!----->")
- except:
- print("<-----this imei register failed!----->")
- except:
- print("<-----this imei add failed!----->")
- except:
- print("<-----register failed because this equip type is not existed!----->")
-
- # rtn="register success!!!"
- # print("注册连接回应:"+"/yfkj/scd/cmd/"+payload.get("equip_id"))
- # client.publish("/yfkj/scd/cmd/"+payload.get("equip_id"), json.dumps({"msg": rtn}))
- # 上传数据:
- elif payload.get("cmd") == "data":
- print("<-----uploading data!----->")
- extdata = payload.get("ext")
- # print("extdata:", extdata)
- # 新建数据记录:
- try:
- e_id = Equip.objects.get(equip_id=imei)
- CBDdata.objects.create(equip_id=e_id, cbd_data=extdata)
- print("<-----data upload success!----->")
- except:
- print("<-----data upload failed!----->")
- # 如果是新设备,表里没有,需要做判断
- if RecentCBDdata.objects.filter(equip_id=imei).exists():
- # 设备存在,更新数据
- try:
- Recdata = RecentCBDdata.objects.get(equip_id=imei)
- Recdata.cbd_data=extdata
- Recdata.upl_time=datetime.datetime.now()
- Recdata.save()
- print("<-----RecentCBDdata update success!----->")
- except:
- print("<-----RecentCBDdata update failed!----->")
- else:
- # 如果是新设备,则新建数据
- try:
- print("<-----this imei add successed!----->")
- e_id = Equip.objects.get(equip_id=imei)
- etype = CBDstatus.objects.get(equip_id=e_id).etype
- RecentCBDdata.objects.create(
- equip_id=e_id, cbd_data=extdata, etype=etype, upl_time=datetime.datetime.now())
- print("<-----RecentCBDdata create success!----->")
- except:
- print("<-----RecentCBDdata create failed!----->")
- # 参数配置信息:
- elif payload.get("cmd") == "paramconf":
- print("<-----uploading paramconf!----->")
- extdata = payload.get("ext")
- # print("extdata:", extdata)
- # 更新状态表中的参数配置信息;
- try:
- sta = CBDstatus.objects.get(equip_id=imei)
- sta.paramconf=extdata
- sta.save()
- print("<-----status.paramconf update success!----->")
- except:
- print("<-----cbdstatus table is not exist,status.paramconf upload failed!----->")
-
- # 设备不能拍照应急处理:
- elif payload.get("cmd") == "picture":
- try:
- nowtime = datetime.datetime.now().strftime('%Y%m%d')
- pic_name = datetime.datetime.now().strftime('%H%M%S')
-
- path = "/data/yfwlw/pyftp/ftp_file/ykm_cbd/" + imei + "/" + nowtime
- isExists=os.path.exists(path)
- # 判断结果
- if not isExists:
- # 如果不存在则创建目录
- # 创建目录操作函数
- os.makedirs(path)
- print(path+' 创建成功')
- else:
- # 如果目录存在则不创建,并提示目录已存在
- print(path+' 目录已存在')
-
- dest = "/data/yfwlw/pyftp/ftp_file/ykm_cbd/" + imei + "/" + nowtime + "/" + pic_name + ".jpg"
-
- shutil.copy("/data/yfwlw/logs/aaaa.jpg",dest)
- print("picture------------------success")
- except:
- print("picture-------------------failed")
- # 设备异常预警
- elif payload.get("cmd") == "warn":
- print("<-----uploading warn!----->")
- print("%s is warn ! ! !" % imei)
- extdata = payload.get("ext")
- cbd_exist = Equip.objects.filter(equip_id=imei)
- # 设备存在,进一步判断状态表是否存在:
- if cbd_exist.exists():
- try:
- e_id = Equip.objects.get(equip_id=imei)
- # 创建预警记录:
- Alarm_record.objects.create(equip_id=e_id, alarm_desc=extdata, e_type="3")
- print("update warn ok!")
- except:
- print("update warn failed!")
- else:
- print("this imei is not exist!")
-
- # 4.0带RE上报RTU信息
- elif payload.get("cmd") == "rtu":
- print("<-----uploading rtu!----->")
- extdata = payload.get("ext")
- cbd_exist = Equip.objects.filter(equip_id=imei)
- # 设备存在,进一步判断状态表是否存在:
- if cbd_exist.exists():
- print("=======>>",extdata["version"])
- if extdata["version"] == "":
- pass
- else:
- try:
- sta = CBDstatus.objects.get(equip_id=imei)
- sta.rtuinfo=extdata
- sta.save()
- print("update rtu_info success!")
- except:
- print("update rtu_info failed!")
- else:
- print("this imei is not exist!")
-
- # 修改imei的板子上报原始imei
- elif payload.get("cmd") == "inquire" or payload.get("cmd") == "imei":
- print("<-----uploading inquire!----->")
- extdata = payload.get("ext")
- cbd_exist = Equip.objects.filter(equip_id=imei)
- # 设备存在,进一步判断状态表是否存在:
- if cbd_exist.exists():
- print("=======>>",extdata)
- try:
- print("上报设备号为",extdata["status"])
- sta = CBDstatus.objects.get(equip_id=imei)
- sta.old_eid=extdata["status"]
- sta.save()
- print("update rtu_info success!")
- except:
- print("update rtu_info failed!")
- else:
- print("this imei is not exist!")
-
- # 离线消息:
- elif "offline" in msg.topic:
- try:
- # 将json字符串解析:
- payload = json.loads(msg.payload.decode())
- if payload.get("cmd") == "offline":
- print("<-----离线消息!----->")
- print("%s is offline!" % imei)
- cbd_exist = Equip.objects.filter(equip_id=imei)
- # 设备存在,进一步判断状态表是否存在:
- if cbd_exist.exists():
- try:
- e_id = Equip.objects.get(equip_id=imei)
- # now_time = json.dumps(datetime.datetime.now(), cls=CJSONEncoder)
- # 更新状态表中未离线状态:
- CBDstatus.objects.filter(equip_id=imei).update(is_online = '0',off_time = datetime.datetime.now())
- # 增加状态历史的离线消息
- sta1 = CBDstatus.objects.get(equip_id=imei)
- # sta1.update(is_online = '0',off_time = datetime.datetime.now())
- CBDstatus_all.objects.create(equip_id=e_id, off_time = datetime.datetime.now(),
- cbd_status=sta1.cbd_status,lng=sta1.lng, lat=sta1.lat,
- is_online = '0')
- # 创建预警记录:
- Alarm_record.objects.create(equip_id=e_id, alarm_desc="{'status':0,'type':'offline'}", e_type="3")
- print("update offline ok!")
- except:
- print("update offline failed!")
- else:
- print("this imei is not exist!")
- except:
- pass
- # 从服务器接收消息的回调函数 :
- def on_message(client, userdata, msg):
- t = threading.Thread(target=msg_thread,args=(msg,))
- #打印出当前线程的名称和id
- # print(threading.currentThread().name)
- # t.setDaemon(True)
- t.start()
- return
- if __name__ == '__main__':
- client = mqtt.Client(
- client_id="PY_MQTT_CLIENT_CBD",
- clean_session=True,
- userdata=None,
- # protocol=MQTTv311,# 数据库版本
- )
- # 必须设置,否则会返回「Connected with result code 4」
- client.username_pw_set("admin", "password")
- #设置连接上服务器回调函数:
- client.on_connect = on_connect
- #设置接收到服务器消息回调函数:
- client.on_message = on_message
- # #设置与服务器断开连接回调函数
- # client.on_disconnect = on_disconnect
- HOST = "127.0.0.1"
- client.connect(HOST, 1883, 60)
- client.loop_forever()
- # # 输入发布的话题名称:
- # # user = input("请输入名称:")
- # topic = "/yfkj/scd/cmd/2001"
- # client.user_data_set(topic)
- # client.loop_start()
- # while True:
- # str = input()
- # if str:
- # client.publish("/yfkj/scd/cmd/2001", json.dumps({"topic": topic, "cmd": str}))
|