coupler.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  12. "hnyfkj.com.cn/rtu/linux/utils/shell"
  13. )
  14. const (
  15. MqttQos1 byte = 1 //// 消息至少送达一次
  16. FastInterval = 1 * time.Second //// 快速检测时间间隔
  17. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  18. )
  19. type MQTTCoupler struct {
  20. ctx context.Context
  21. cancel context.CancelFunc
  22. broker, username, password string
  23. client mqtt.Client
  24. clientID string
  25. isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
  26. imei string // 设备唯一标识
  27. subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
  28. pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
  29. cwd string // 当前工作目录
  30. cmdMu sync.Mutex ///// 串行执行的锁
  31. pending map[int]chan jsonrpc2.Response ///// 等待命令结果
  32. pendingMu sync.Mutex ///// 等待结果的锁
  33. }
  34. func (c *MQTTCoupler) init2() error {
  35. template := "/yfkj/device/rpc/imei"
  36. c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  37. c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  38. opts := mqtt.NewClientOptions().
  39. AddBroker(c.broker).
  40. SetUsername(c.username).SetPassword(c.password).
  41. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  42. SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
  43. SetOrderMatters(false)
  44. opts.OnConnect = func(client mqtt.Client) {
  45. if !c.isConnected.Swap(true) {
  46. fmt.Printf("[%s] MQTT Broker连接成功", MODULE_NAME)
  47. }
  48. go func() { // 订阅应答主题
  49. token := c.client.Subscribe(c.subTopic, MqttQos1, c.onCmdAck)
  50. select {
  51. case <-c.ctx.Done():
  52. return
  53. case <-token.Done():
  54. }
  55. if token.Error() != nil {
  56. return
  57. }
  58. }()
  59. }
  60. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  61. if c.isConnected.Swap(false) {
  62. fmt.Printf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  63. }
  64. }
  65. c.pending = make(map[int]chan jsonrpc2.Response)
  66. c.client = mqtt.NewClient(opts)
  67. go c.keepOnline()
  68. return nil
  69. }
  70. func (c *MQTTCoupler) keepOnline() {
  71. t := time.NewTimer(FastInterval)
  72. defer t.Stop()
  73. for {
  74. select {
  75. case <-c.ctx.Done():
  76. return
  77. case <-t.C:
  78. t.Reset(c.tick())
  79. } // end select
  80. } // end for
  81. }
  82. func (c *MQTTCoupler) tick() time.Duration {
  83. if c.isConnected.Load() {
  84. return FastInterval
  85. }
  86. if err := c.connect(); err != nil {
  87. fmt.Printf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  88. }
  89. return SlowInterval
  90. }
  91. func (c *MQTTCoupler) connect() error {
  92. if c.client.IsConnected() {
  93. return nil
  94. }
  95. token := c.client.Connect()
  96. select {
  97. case <-c.ctx.Done():
  98. return nil
  99. case <-token.Done():
  100. }
  101. return token.Error()
  102. }
  103. func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (jsonrpc2.Response, error) {
  104. zero := jsonrpc2.Response{}
  105. if c.needSerialize(method) {
  106. c.cmdMu.Lock()
  107. defer c.cmdMu.Unlock()
  108. }
  109. req, err := jsonrpc2.BuildRequest(method, params, id...)
  110. if err != nil {
  111. return zero, err
  112. }
  113. reqID := *req.ID
  114. b, err := json.Marshal(req)
  115. if err != nil {
  116. return zero, err
  117. }
  118. ch := make(chan jsonrpc2.Response, 1)
  119. c.pendingMu.Lock()
  120. c.pending[reqID] = ch
  121. c.pendingMu.Unlock()
  122. defer func() {
  123. c.pendingMu.Lock()
  124. delete(c.pending, reqID)
  125. c.pendingMu.Unlock()
  126. }()
  127. token := c.client.Publish(c.pubTopic, MqttQos1, false, b)
  128. select {
  129. case <-c.ctx.Done():
  130. return zero, c.ctx.Err()
  131. case <-token.Done():
  132. }
  133. if token.Error() != nil {
  134. return zero, token.Error()
  135. }
  136. var timer *time.Timer
  137. var timeout <-chan time.Time
  138. if c.needTimeoutEnd(method) {
  139. timer = time.NewTimer(shell.DefaultTimeout)
  140. timeout = timer.C
  141. defer timer.Stop()
  142. }
  143. select {
  144. case <-c.ctx.Done():
  145. return zero, c.ctx.Err()
  146. case resp := <-ch:
  147. return resp, nil
  148. case <-timeout:
  149. return zero, fmt.Errorf("command timeout")
  150. }
  151. }
  152. func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
  153. p := msg.Payload()
  154. var resp jsonrpc2.Response
  155. if err := json.Unmarshal(p, &resp); err != nil {
  156. return
  157. }
  158. if resp.ID == nil { // 通知类消息, 设计上不应该出现
  159. return
  160. }
  161. respID := *resp.ID
  162. c.pendingMu.Lock()
  163. ch, ok := c.pending[respID]
  164. c.pendingMu.Unlock()
  165. if !ok { /////////////// 未找到对应的请求, 忽略不管
  166. return
  167. }
  168. select {
  169. case ch <- resp:
  170. default:
  171. }
  172. }