coupler.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package main
  2. import (
  3. "context"
  4. "strings"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. mqtt "github.com/eclipse/paho.mqtt.golang"
  9. "hnyfkj.com.cn/rtu/linux/baseapp"
  10. )
  11. const (
  12. MqttQos1 byte = 1 //// 消息至少送达一次
  13. FastInterval = 1 * time.Second //// 快速检测时间间隔
  14. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  15. )
  16. type MQTTCoupler struct {
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. broker, username, password string
  20. client mqtt.Client
  21. clientID string
  22. isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
  23. imei string // 设备唯一标识
  24. subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
  25. pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
  26. cwd string // 当前工作目录
  27. mu sync.Mutex // 串行执行的锁
  28. }
  29. func (c *MQTTCoupler) init() error {
  30. template := "/yfkj/device/rpc/imei"
  31. c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  32. c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  33. opts := mqtt.NewClientOptions().
  34. AddBroker(c.broker).
  35. SetUsername(c.username).SetPassword(c.password).
  36. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  37. SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
  38. SetOrderMatters(false)
  39. opts.OnConnect = func(client mqtt.Client) {
  40. if !c.isConnected.Swap(true) {
  41. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  42. }
  43. }
  44. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  45. if c.isConnected.Swap(false) {
  46. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  47. }
  48. }
  49. c.client = mqtt.NewClient(opts)
  50. go coupler.keepOnline()
  51. return nil
  52. }
  53. func (c *MQTTCoupler) keepOnline() {
  54. t := time.NewTimer(FastInterval)
  55. defer t.Stop()
  56. for {
  57. select {
  58. case <-c.ctx.Done():
  59. return
  60. case <-t.C:
  61. t.Reset(c.tick())
  62. } // end select
  63. } // end for
  64. }
  65. func (c *MQTTCoupler) tick() time.Duration {
  66. if c.isConnected.Load() {
  67. return FastInterval
  68. }
  69. if err := c.connect(); err != nil {
  70. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  71. }
  72. return SlowInterval
  73. }
  74. func (c *MQTTCoupler) connect() error {
  75. if c.client.IsConnected() {
  76. return nil
  77. }
  78. token := c.client.Connect()
  79. select {
  80. case <-c.ctx.Done():
  81. return nil
  82. case <-token.Done():
  83. }
  84. return token.Error()
  85. }