mqtt_gateway.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import os
  2. import sys
  3. import json
  4. import time
  5. import datetime
  6. import threading
  7. import logging
  8. import paho.mqtt.client as mqtt
  9. import django
  10. from pathlib import Path
  11. local_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  12. print(local_path)
  13. sys.path.append(local_path)
  14. print(sys.path)
  15. try:
  16. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "kedong.settings")
  17. from django.conf import settings
  18. django.setup()
  19. from smartfarming.models.mqtt_models import PubRecord, PublishRecord
  20. from scripts.tools.utils import DeviceConfig
  21. except Exception as e:
  22. print(e)
  23. exit(0)
  24. CONFIG = settings.CONFIG
  25. pub_redis_pool = settings.pub_redis_pool
  26. QXZ_OFFLINE_CACHE = set()
  27. dashuju_pub_key = CONFIG.get("dashuju_pub_key")
  28. logger = logging.getLogger("data_ingestion")
  29. def thread_dashuju_publish_mqtt(client):
  30. def _func():
  31. while True:
  32. try:
  33. data = pub_redis_pool.bpop(dashuju_pub_key)
  34. topic, payload = data['topic'], data['payload']
  35. client.publish(topic, payload)
  36. PublishRecord.objects.create(
  37. topic=topic,
  38. payload=payload
  39. )
  40. except Exception as e:
  41. print(e)
  42. t = threading.Thread(target=_func)
  43. t.start()
  44. def connect_mqtt(pub):
  45. def on_message(client, userdata, msg):
  46. """接收到消息的回调方法"""
  47. try:
  48. payload = json.loads(msg.payload.decode("utf8", "replace"))
  49. logger.info(f"payload: {payload}")
  50. except Exception as e:
  51. payload = {}
  52. def on_connect(client, userdata, flags, rc):
  53. '''mqtt 连接成功后的回调'''
  54. # thread_dashuju_publish_mqtt(client)
  55. QXZ_OFFLINE_CACHE.clear()
  56. client.subscribe(pub)
  57. client = mqtt.Client()
  58. client.username_pw_set(User, Passwd)
  59. client.on_connect = on_connect
  60. client.on_message = on_message
  61. client.connect(HOST, PORT, 30)
  62. client.loop_forever()
  63. if __name__ == "__main__":
  64. publics_list = [
  65. DeviceConfig.CBD_PUBLICS,
  66. DeviceConfig.CBD_OFFLINE_PUBLICS
  67. ]
  68. config_dict = settings.CONFIG
  69. HOST = config_dict['mqtt']['mqttIp']
  70. PORT = config_dict['mqtt']['port']
  71. User = config_dict['mqtt']["user"]
  72. Passwd = config_dict['mqtt']["password"]
  73. p_list = [(p, 0) for p in publics_list]
  74. logger.info(f"监听topic: {str(p_list)}")
  75. connect_mqtt(p_list)