| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import os
- import sys
- import json
- import time
- import datetime
- import threading
- import logging
- import paho.mqtt.client as mqtt
- import django
- from pathlib import Path
- local_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- print(local_path)
- sys.path.append(local_path)
- print(sys.path)
- try:
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "kedong.settings")
- from django.conf import settings
- django.setup()
- from smartfarming.models.mqtt_models import PubRecord, PublishRecord
- from scripts.tools.utils import DeviceConfig
- except Exception as e:
- print(e)
- exit(0)
- CONFIG = settings.CONFIG
- pub_redis_pool = settings.pub_redis_pool
- QXZ_OFFLINE_CACHE = set()
- dashuju_pub_key = CONFIG.get("dashuju_pub_key")
- logger = logging.getLogger("data_ingestion")
- def thread_dashuju_publish_mqtt(client):
- def _func():
- while True:
- try:
- data = pub_redis_pool.bpop(dashuju_pub_key)
- topic, payload = data['topic'], data['payload']
- client.publish(topic, payload)
- PublishRecord.objects.create(
- topic=topic,
- payload=payload
- )
- except Exception as e:
- print(e)
- t = threading.Thread(target=_func)
- t.start()
- def connect_mqtt(pub):
- def on_message(client, userdata, msg):
- """接收到消息的回调方法"""
- try:
- payload = json.loads(msg.payload.decode("utf8", "replace"))
- logger.info(f"payload: {payload}")
- except Exception as e:
- payload = {}
-
- def on_connect(client, userdata, flags, rc):
- '''mqtt 连接成功后的回调'''
- # thread_dashuju_publish_mqtt(client)
- QXZ_OFFLINE_CACHE.clear()
- client.subscribe(pub)
- client = mqtt.Client()
- client.username_pw_set(User, Passwd)
- client.on_connect = on_connect
- client.on_message = on_message
- client.connect(HOST, PORT, 30)
- client.loop_forever()
- if __name__ == "__main__":
- publics_list = [
- DeviceConfig.CBD_PUBLICS,
- DeviceConfig.CBD_OFFLINE_PUBLICS
- ]
- config_dict = settings.CONFIG
- HOST = config_dict['mqtt']['mqttIp']
- PORT = config_dict['mqtt']['port']
- User = config_dict['mqtt']["user"]
- Passwd = config_dict['mqtt']["password"]
- p_list = [(p, 0) for p in publics_list]
- logger.info(f"监听topic: {str(p_list)}")
- connect_mqtt(p_list)
|