coupler.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  12. )
  13. const (
  14. MqttQos1 byte = 1 //// 消息至少送达一次
  15. FastInterval = 1 * time.Second //// 快速检测时间间隔
  16. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  17. )
  18. type MQTTCoupler struct {
  19. ctx context.Context
  20. cancel context.CancelFunc
  21. broker, username, password string
  22. client mqtt.Client
  23. clientID string
  24. machineID string
  25. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  26. imei string // 设备唯一标识
  27. subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
  28. pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
  29. cwd string // 当前工作目录
  30. // 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map"
  31. cmdMu sync.Mutex // 串行执行的锁
  32. pending map[int]chan *jsonrpc2.Response // 等待命令结果
  33. pendingMu sync.Mutex // 等待结果的锁
  34. interrupted chan struct{} // Ctrl+C 通知当前命令取消的信号
  35. }
  36. func (c *MQTTCoupler) init2() error {
  37. template := "/yfkj/device/rpc/imei"
  38. c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  39. c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  40. opts := mqtt.NewClientOptions().
  41. AddBroker(c.broker).
  42. SetUsername(c.username).SetPassword(c.password).
  43. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  44. SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
  45. SetOrderMatters(false)
  46. opts.OnConnect = func(client mqtt.Client) {
  47. if !c.isConnected.Swap(true) {
  48. fmt.Printf("[%s] MQTT Broker连接成功\n", MODULE_NAME)
  49. }
  50. go func() { // 订阅应答主题
  51. token := c.client.Subscribe(c.subTopic, MqttQos1, c.onCmdAck)
  52. select {
  53. case <-c.ctx.Done():
  54. return
  55. case <-token.Done():
  56. }
  57. if token.Error() != nil {
  58. return
  59. }
  60. }()
  61. }
  62. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  63. if c.isConnected.Swap(false) {
  64. fmt.Printf("[%s] MQTT Broker连接丢失: %v!\n", MODULE_NAME, err)
  65. }
  66. }
  67. c.pending = make(map[int]chan *jsonrpc2.Response)
  68. c.client = mqtt.NewClient(opts)
  69. go c.keepOnline()
  70. return nil
  71. }
  72. func (c *MQTTCoupler) keepOnline() {
  73. t := time.NewTimer(FastInterval)
  74. defer t.Stop()
  75. for {
  76. select {
  77. case <-c.ctx.Done():
  78. return
  79. case <-t.C:
  80. t.Reset(c.tick())
  81. } // end select
  82. } // end for
  83. }
  84. func (c *MQTTCoupler) tick() time.Duration {
  85. if c.isConnected.Load() {
  86. return FastInterval
  87. }
  88. if err := c.connect(); err != nil {
  89. fmt.Printf("[%s] MQTT Broker连接失败: %v!!\n", MODULE_NAME, err)
  90. }
  91. return SlowInterval
  92. }
  93. func (c *MQTTCoupler) connect() error {
  94. if c.client.IsConnected() {
  95. return nil
  96. }
  97. token := c.client.Connect()
  98. select {
  99. case <-c.ctx.Done():
  100. return nil
  101. case <-token.Done():
  102. }
  103. return token.Error()
  104. }
  105. func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (*jsonrpc2.Response, error) {
  106. zero := &jsonrpc2.Response{}
  107. if c.needSerialize(method) {
  108. c.cmdMu.Lock()
  109. defer c.cmdMu.Unlock()
  110. }
  111. req, err := jsonrpc2.BuildRequest(method, params, id...)
  112. if err != nil {
  113. return zero, err
  114. }
  115. reqID := *req.ID
  116. b, err := json.Marshal(req)
  117. if err != nil {
  118. return zero, err
  119. }
  120. ch := make(chan *jsonrpc2.Response, 1)
  121. c.pendingMu.Lock()
  122. c.pending[reqID] = ch
  123. c.pendingMu.Unlock()
  124. defer func() {
  125. c.pendingMu.Lock()
  126. delete(c.pending, reqID)
  127. c.pendingMu.Unlock()
  128. }()
  129. token := c.client.Publish(c.pubTopic, MqttQos1, false, b)
  130. select {
  131. case <-c.ctx.Done():
  132. return zero, c.ctx.Err()
  133. case <-token.Done():
  134. }
  135. if token.Error() != nil {
  136. return zero, token.Error()
  137. }
  138. var timer *time.Timer
  139. var timeout <-chan time.Time
  140. if c.isCtrlCommand(method) {
  141. timer = time.NewTimer(3 * time.Second)
  142. timeout = timer.C
  143. defer timer.Stop()
  144. }
  145. select {
  146. case <-c.ctx.Done():
  147. return zero, c.ctx.Err()
  148. case resp := <-ch:
  149. return resp, nil
  150. case <-timeout:
  151. return zero, fmt.Errorf("command timeout")
  152. case <-c.interrupted:
  153. return zero, fmt.Errorf("command interrupted by user")
  154. }
  155. }
  156. func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
  157. p := msg.Payload()
  158. resp, err := jsonrpc2.ParseResponse(p)
  159. if err != nil { //////////// 解析应答失败,忽略不管
  160. return
  161. }
  162. if resp.ID == nil { // 通知类消息, 设计上不应该出现
  163. return
  164. }
  165. respID := *resp.ID
  166. c.pendingMu.Lock()
  167. ch, ok := c.pending[respID]
  168. c.pendingMu.Unlock()
  169. if !ok { /////////////// 未找到对应的请求, 忽略不管
  170. return
  171. }
  172. select {
  173. case ch <- resp:
  174. default:
  175. }
  176. }