import os import sys import json import time import datetime import threading import logging import paho.mqtt.client as mqtt import django from pathlib import Path local_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(local_path) sys.path.append(local_path) print(sys.path) try: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "kedong.settings") from django.conf import settings django.setup() from smartfarming.models.mqtt_models import PubRecord, PublishRecord from scripts.tools.utils import DeviceConfig except Exception as e: print(e) exit(0) CONFIG = settings.CONFIG pub_redis_pool = settings.pub_redis_pool QXZ_OFFLINE_CACHE = set() dashuju_pub_key = CONFIG.get("dashuju_pub_key") logger = logging.getLogger("data_ingestion") def thread_dashuju_publish_mqtt(client): def _func(): while True: try: data = pub_redis_pool.bpop(dashuju_pub_key) topic, payload = data['topic'], data['payload'] client.publish(topic, payload) PublishRecord.objects.create( topic=topic, payload=payload ) except Exception as e: print(e) t = threading.Thread(target=_func) t.start() def connect_mqtt(pub): def on_message(client, userdata, msg): """接收到消息的回调方法""" try: payload = json.loads(msg.payload.decode("utf8", "replace")) logger.info(f"payload: {payload}") except Exception as e: payload = {} def on_connect(client, userdata, flags, rc): '''mqtt 连接成功后的回调''' # thread_dashuju_publish_mqtt(client) QXZ_OFFLINE_CACHE.clear() client.subscribe(pub) client = mqtt.Client() client.username_pw_set(User, Passwd) client.on_connect = on_connect client.on_message = on_message client.connect(HOST, PORT, 30) client.loop_forever() if __name__ == "__main__": publics_list = [ DeviceConfig.CBD_PUBLICS, DeviceConfig.CBD_OFFLINE_PUBLICS ] config_dict = settings.CONFIG HOST = config_dict['mqtt']['mqttIp'] PORT = config_dict['mqtt']['port'] User = config_dict['mqtt']["user"] Passwd = config_dict['mqtt']["password"] p_list = [(p, 0) for p in publics_list] logger.info(f"监听topic: {str(p_list)}") connect_mqtt(p_list)