client.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "sync/atomic"
  8. "time"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "github.com/google/uuid"
  11. "hnyfkj.com.cn/rtu/linux/baseapp"
  12. )
  13. const MODULE_NAME = "YFKJ_SSH_CLIENT"
  14. var (
  15. Coupler *MQTTCoupler
  16. )
  17. const (
  18. MqttQos1 byte = 1 //// 消息至少送达一次
  19. FastInterval = 1 * time.Second //// 快速检测时间间隔
  20. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  21. )
  22. var (
  23. Version = "0.0.0.1"
  24. coupler *MQTTCoupler
  25. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  26. ErrIMEINotAvailable = errors.New("device imei is not available")
  27. )
  28. type MQTTCoupler struct {
  29. broker, username, password string
  30. client mqtt.Client
  31. clientID string
  32. imei string // 设备唯一标识
  33. subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
  34. pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
  35. cwd string // 当前工作目录
  36. ctx context.Context
  37. cancel context.CancelFunc
  38. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  39. }
  40. func init() {
  41. if err := loadAppConfig(); err != nil {
  42. msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
  43. panic(msg)
  44. }
  45. ctx, cancel := context.WithCancel(context.Background())
  46. coupler = &MQTTCoupler{
  47. broker: CfgServers.MQTTSrv.Address,
  48. username: CfgServers.MQTTSrv.Username,
  49. password: CfgServers.MQTTSrv.Password,
  50. clientID: uuid.New().String(),
  51. cwd: "/",
  52. ctx: ctx,
  53. cancel: cancel,
  54. isConnected: atomic.Bool{},
  55. }
  56. }
  57. func main() {
  58. if baseapp.IsArgsParam("-h") {
  59. help()
  60. return
  61. }
  62. if baseapp.IsArgsParam("-v") {
  63. fmt.Println("程序版本:", Version, "\n构建时间:", baseapp.BuildTime)
  64. return
  65. }
  66. devIMEI := baseapp.GetArgsParamStr("-c", "")
  67. if devIMEI == "" {
  68. help()
  69. return
  70. }
  71. coupler.imei = devIMEI
  72. if err := Coupler.init2(); err != nil {
  73. fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
  74. return
  75. }
  76. for {
  77. }
  78. }
  79. func help() {
  80. h := `
  81. -h 显示帮助提示
  82. -v 当前程序版本
  83. -c 连接目标设备(IMEI), 例如: -c 869523059113051
  84. `
  85. fmt.Println(h)
  86. }
  87. func (c *MQTTCoupler) init2() error {
  88. template := "/yfkj/device/rpc/imei"
  89. c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  90. c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  91. opts := mqtt.NewClientOptions().
  92. AddBroker(c.broker).
  93. SetUsername(c.username).SetPassword(c.password).
  94. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  95. SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
  96. SetOrderMatters(false)
  97. opts.OnConnect = func(client mqtt.Client) {
  98. if !c.isConnected.Swap(true) {
  99. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  100. }
  101. }
  102. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  103. if c.isConnected.Swap(false) {
  104. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  105. }
  106. }
  107. c.client = mqtt.NewClient(opts)
  108. go Coupler.keepOnline()
  109. return nil
  110. }
  111. func (c *MQTTCoupler) keepOnline() {
  112. t := time.NewTimer(FastInterval)
  113. defer t.Stop()
  114. for {
  115. select {
  116. case <-c.ctx.Done():
  117. return
  118. case <-t.C:
  119. t.Reset(c.tick())
  120. } // end select
  121. } // end for
  122. }
  123. func (c *MQTTCoupler) tick() time.Duration {
  124. if c.isConnected.Load() {
  125. return FastInterval
  126. }
  127. if err := c.connect(); err != nil {
  128. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  129. }
  130. return SlowInterval
  131. }
  132. func (c *MQTTCoupler) connect() error {
  133. if c.client.IsConnected() {
  134. return nil
  135. }
  136. token := c.client.Connect()
  137. select {
  138. case <-c.ctx.Done():
  139. return nil
  140. case <-token.Done():
  141. }
  142. return token.Error()
  143. }