coupler.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  12. "hnyfkj.com.cn/rtu/linux/utils/shell"
  13. )
  14. const (
  15. MqttQos1 byte = 1 //// 消息至少送达一次
  16. FastInterval = 1 * time.Second //// 快速检测时间间隔
  17. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  18. )
  19. type pendingRequest struct {
  20. ch chan *jsonrpc2.Response
  21. done chan struct{} // 命令请求完成信号: 已应答/超时/被中断
  22. // 命令请求超时/中断后, 使得等待应答的携程能收到结束信号退出
  23. // !避免请求端已经放弃等待, 但应答端仍然在等待结果并写入通道
  24. }
  25. type MQTTCoupler struct {
  26. ctx context.Context
  27. cancel context.CancelFunc
  28. broker, username, password string
  29. client mqtt.Client
  30. clientID string
  31. machineID string
  32. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  33. imei string // 设备唯一标识
  34. subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
  35. pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
  36. cwd string // 当前工作目录
  37. // 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map"
  38. cmdMu sync.Mutex // 串行执行的锁LLLLLL
  39. pending map[int64]*pendingRequest // 等命令的结果RRRRRR
  40. pendingMu sync.Mutex // 等待结果的锁LLLLLL
  41. interrupted chan struct{} // Ctrl+C 通知当前命令取消的信号
  42. }
  43. func (c *MQTTCoupler) init2() error {
  44. template := "/yfkj/device/rpc/imei"
  45. c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  46. c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  47. opts := mqtt.NewClientOptions().
  48. AddBroker(c.broker).
  49. SetUsername(c.username).SetPassword(c.password).
  50. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  51. SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
  52. SetOrderMatters(false)
  53. opts.OnConnect = func(client mqtt.Client) {
  54. if !c.isConnected.Swap(true) {
  55. fmt.Printf("[%s] MQTT Broker连接成功\n", MODULE_NAME)
  56. }
  57. go func() { // 订阅应答主题
  58. token := c.client.Subscribe(c.subTopic, MqttQos1, c.onCmdAck)
  59. select {
  60. case <-c.ctx.Done():
  61. return
  62. case <-token.Done():
  63. }
  64. if token.Error() != nil {
  65. return
  66. }
  67. }()
  68. }
  69. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  70. if c.isConnected.Swap(false) {
  71. fmt.Printf("[%s] MQTT Broker连接丢失: %v!\n", MODULE_NAME, err)
  72. }
  73. }
  74. c.pending = make(map[int64]*pendingRequest)
  75. c.client = mqtt.NewClient(opts)
  76. go c.keepOnline()
  77. return nil
  78. }
  79. func (c *MQTTCoupler) keepOnline() {
  80. t := time.NewTimer(FastInterval)
  81. defer t.Stop()
  82. for {
  83. select {
  84. case <-c.ctx.Done():
  85. return
  86. case <-t.C:
  87. t.Reset(c.tick())
  88. } // end select
  89. } // end for
  90. }
  91. func (c *MQTTCoupler) tick() time.Duration {
  92. if c.isConnected.Load() {
  93. return FastInterval
  94. }
  95. if err := c.connect(); err != nil {
  96. fmt.Printf("[%s] MQTT Broker连接失败: %v!!\n", MODULE_NAME, err)
  97. }
  98. return SlowInterval
  99. }
  100. func (c *MQTTCoupler) connect() error {
  101. if c.client.IsConnected() {
  102. return nil
  103. }
  104. token := c.client.Connect()
  105. select {
  106. case <-c.ctx.Done():
  107. return nil
  108. case <-token.Done():
  109. }
  110. return token.Error()
  111. }
  112. func (c *MQTTCoupler) doCmd(method string, params any, id ...int64) (*jsonrpc2.Response, error) {
  113. zero := &jsonrpc2.Response{}
  114. if c.needSerialize(method) {
  115. c.cmdMu.Lock()
  116. defer c.cmdMu.Unlock()
  117. }
  118. req, err := jsonrpc2.BuildRequest(method, params, id...)
  119. if err != nil {
  120. return zero, err
  121. }
  122. reqID := *req.ID
  123. b, err := json.Marshal(req)
  124. if err != nil {
  125. return zero, err
  126. }
  127. pr := &pendingRequest{
  128. ch: make(chan *jsonrpc2.Response, 1),
  129. done: make(chan struct{}),
  130. }
  131. c.pendingMu.Lock()
  132. c.pending[reqID] = pr
  133. c.pendingMu.Unlock()
  134. defer func() {
  135. c.pendingMu.Lock()
  136. delete(c.pending, reqID)
  137. c.pendingMu.Unlock()
  138. close(pr.done)
  139. }()
  140. token := c.client.Publish(c.pubTopic, MqttQos1, false, b)
  141. select {
  142. case <-c.ctx.Done():
  143. return zero, c.ctx.Err()
  144. case <-token.Done():
  145. }
  146. if token.Error() != nil {
  147. return zero, token.Error()
  148. }
  149. if c.isCtrlCommand(method) { // 控制指令, 等待结果或超时
  150. timer := time.NewTimer(time.Duration(3) * time.Second)
  151. defer timer.Stop()
  152. select {
  153. case <-c.ctx.Done():
  154. return zero, c.ctx.Err()
  155. case resp := <-pr.ch:
  156. return resp, nil
  157. case <-timer.C:
  158. return zero, fmt.Errorf("command timeout")
  159. }
  160. } else { // 用户指令, 等待结果或用户主动中断取消: Ctrl+C
  161. v, ok := c.getCmdTimeout(params)
  162. if !ok || v <= 0 { // 如果没有指定超时, 则使用默认超时
  163. v = int(shell.DefaultTimeout/time.Second) + 1
  164. }
  165. timer := time.NewTimer(time.Duration(v+1) * time.Second)
  166. defer timer.Stop()
  167. select {
  168. case <-c.ctx.Done():
  169. return zero, c.ctx.Err()
  170. case resp := <-pr.ch:
  171. return resp, nil
  172. case <-timer.C:
  173. return zero, fmt.Errorf("command timeout")
  174. case <-c.interrupted:
  175. return zero, fmt.Errorf("command interrupted by user")
  176. }
  177. }
  178. }
  179. func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
  180. p := msg.Payload()
  181. resp, err := jsonrpc2.ParseResponse(p)
  182. if err != nil {
  183. return
  184. }
  185. if resp.ID == nil { // 通知类消息, 设计上不应该出现
  186. return
  187. }
  188. respID := *resp.ID
  189. c.pendingMu.Lock()
  190. pr, ok := c.pending[respID]
  191. c.pendingMu.Unlock()
  192. if !ok { /////////////// 未找到对应的请求, 忽略不管
  193. return
  194. }
  195. select {
  196. case pr.ch <- resp:
  197. case <-pr.done:
  198. }
  199. }