| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- # -*- coding: utf-8 -*-
- # File Name:mqtt_chat_client.py
- # Python Version:3.5.1
- import os
- import django
- import sys
- BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 定位到你的django根目录
- sys.path.append(os.path.abspath(os.path.join(BASE_DIR, os.pardir)))
- os.environ.setdefault("DJANGO_SETTINGS_MODULE",
- "yfwlw_pro.settings") # project_name 项目名称
- django.setup()
- print("<-----python_mqtt_client_bzy is run----->")
- import paho.mqtt.client as mqtt
- import json
- from apps.AppInfoManage.models import Alarm_record, BZYdata, BZYstatus, Equip, Equip_Forward, Equip_type, MyUser
- from apps.ReportManage.all_dict import transpont_equip_bzy
- import re
- import datetime
- import time
- import requests
- class CJSONEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(obj, date):
- return obj.strftime('%Y-%m-%d')
- else:
- return json.JSONEncoder.default(self, obj)
- # 连接后的操作: 0为成功
- def on_connect(client, userdata, flags, rc):
- # print("Connected with result code "+str(rc))
- # for x in transpont_equip_bzy:
- # client.subscribe("/yfkj/bzy/c2s/%s"%x)
- # client.subscribe("/yfkj/bzy/offline/%s"%x)
- client.subscribe("/yfkj/bzy/c2s/#")
- client.subscribe("/yfkj/bzy/offline/#")
- # *****成功发布******
- def on_publish(msg, rc):
- if rc == 0:
- print("publish success,msg = "+msg)
- # 从服务器接受到消息后回调此函数 :
- def on_message(client, userdata, msg):
- print('\r')
- print('=================================================')
- print('\r')
- print("<-----topic:\n" + msg.topic + ';\n')
- print("Message:\n" + str(msg.payload) + "----->\n")
- # 从主题中获取imei
- # imei = msg.topic[14:len(msg.topic)]
- reg = re.compile(r"(?<=/)\d+")
- imei = reg.search(msg.topic).group(0)
- # imei = re.sub("\D", "", msg.topic)
- # nowtime = datetime.datetime.now().strftime('%Y%m%d')
- # origin = sys.stdout
- # f = open('./logs/'+nowtime+'bzymqtt.txt','a+')
- # sys.stdout = f
- print("<-----imei:", imei, "----->")
- # 判断主题:
- if "c2s" in msg.topic:
- # 将json字符串解析:
- payload = json.loads(msg.payload.decode())
- data = {"topic":msg.topic,"payload":payload}
- print(data)
- # try:
- if Equip_Forward.objects.filter(equip_id=imei).exists():
- equip_id = Equip_Forward.objects.get(equip_id=imei)
- time_a = datetime.datetime.now()
- data = json.dumps(data, cls=CJSONEncoder)
- print("参数类型为:",type(data))
- headers = {"Content-Type": "application/json; charset=UTF-8", 'Connection': 'close'}
- res_1 = requests.post(equip_id.equip_data_url, data=data, timeout=3,headers=headers)
- print("res_1",res_1)
- print("res_1",res_1.text)
- print("send success")
- time_b = datetime.datetime.now()
- print("一共用时:",time_b-time_a)
- # except Exception as e:
- # print(e)
-
- # sys.stdout = origin
- # f.close()
- if __name__ == '__main__':
- client = mqtt.Client(
- client_id="PY_MQTT_TRANSPOND_BZY",
- clean_session=True,
- userdata=None,
- # protocol=MQTTv311,# 数据库版本
- )
- # 必须设置,否则会返回「Connected with result code 4」
- client.username_pw_set("admin", "password")
- client.on_connect = on_connect
- client.on_message = on_message
- HOST = "127.0.0.1"
- client.connect(HOST, 1883, 60)
- client.loop_forever()
|