client.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "github.com/google/uuid"
  12. "hnyfkj.com.cn/rtu/linux/baseapp"
  13. "hnyfkj.com.cn/rtu/linux/utils/shell"
  14. )
  15. const MODULE_NAME = "YFKJ_SSH_CLIENT"
  16. var (
  17. Coupler *MQTTCoupler
  18. )
  19. const (
  20. MqttQos1 byte = 1 //// 消息至少送达一次
  21. FastInterval = 1 * time.Second //// 快速检测时间间隔
  22. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  23. )
  24. var (
  25. Version = "0.0.0.1"
  26. coupler *MQTTCoupler
  27. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  28. ErrIMEINotAvailable = errors.New("device imei is not available")
  29. )
  30. type MQTTCoupler struct {
  31. ctx context.Context
  32. cancel context.CancelFunc
  33. broker, username, password string
  34. client mqtt.Client
  35. clientID string
  36. isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
  37. imei string // 设备唯一标识
  38. subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
  39. pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
  40. cwd string // 当前工作目录
  41. mu sync.Mutex // 串行执行的锁
  42. }
  43. func init() {
  44. if err := loadAppConfig(); err != nil {
  45. msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
  46. panic(msg)
  47. }
  48. ctx, cancel := context.WithCancel(context.Background())
  49. coupler = &MQTTCoupler{
  50. ctx: ctx,
  51. cancel: cancel,
  52. broker: CfgServers.MQTTSrv.Address,
  53. username: CfgServers.MQTTSrv.Username,
  54. password: CfgServers.MQTTSrv.Password,
  55. clientID: uuid.New().String(),
  56. cwd: "/",
  57. }
  58. }
  59. func main() {
  60. if baseapp.IsArgsParam("-h") {
  61. help()
  62. return
  63. }
  64. if baseapp.IsArgsParam("-v") {
  65. fmt.Println("程序版本:", Version, "\n构建时间:", baseapp.BuildTime)
  66. return
  67. }
  68. devIMEI := baseapp.GetArgsParamStr("-c", "")
  69. if devIMEI == "" {
  70. help()
  71. return
  72. }
  73. coupler.imei = devIMEI
  74. if err := Coupler.init2(); err != nil {
  75. fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
  76. return
  77. }
  78. for {
  79. }
  80. }
  81. func help() {
  82. h := `
  83. -h 显示帮助提示
  84. -v 当前程序版本
  85. -c 连接目标设备(IMEI), 例如: -c 869523059113051
  86. `
  87. fmt.Println(h)
  88. }
  89. func (c *MQTTCoupler) init2() error {
  90. template := "/yfkj/device/rpc/imei"
  91. c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  92. c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  93. opts := mqtt.NewClientOptions().
  94. AddBroker(c.broker).
  95. SetUsername(c.username).SetPassword(c.password).
  96. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  97. SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
  98. SetOrderMatters(false)
  99. opts.OnConnect = func(client mqtt.Client) {
  100. if !c.isConnected.Swap(true) {
  101. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  102. }
  103. }
  104. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  105. if c.isConnected.Swap(false) {
  106. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  107. }
  108. }
  109. c.client = mqtt.NewClient(opts)
  110. go Coupler.keepOnline()
  111. return nil
  112. }
  113. func (c *MQTTCoupler) keepOnline() {
  114. t := time.NewTimer(FastInterval)
  115. defer t.Stop()
  116. for {
  117. select {
  118. case <-c.ctx.Done():
  119. return
  120. case <-t.C:
  121. t.Reset(c.tick())
  122. } // end select
  123. } // end for
  124. }
  125. func (c *MQTTCoupler) tick() time.Duration {
  126. if c.isConnected.Load() {
  127. return FastInterval
  128. }
  129. if err := c.connect(); err != nil {
  130. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  131. }
  132. return SlowInterval
  133. }
  134. func (c *MQTTCoupler) connect() error {
  135. if c.client.IsConnected() {
  136. return nil
  137. }
  138. token := c.client.Connect()
  139. select {
  140. case <-c.ctx.Done():
  141. return nil
  142. case <-token.Done():
  143. }
  144. return token.Error()
  145. }
  146. func (c *MQTTCoupler) ExecuteShell(cmd string) shell.ExecuteResult {
  147. c.mu.Lock()
  148. defer c.mu.Unlock()
  149. return shell.ExecuteResult{}
  150. }