client.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "strings"
  7. "sync/atomic"
  8. "time"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "hnyfkj.com.cn/rtu/linux/baseapp"
  11. )
  12. const MODULE_NAME = "YFKJ_SSH_CLIENT"
  13. var (
  14. Coupler *MQTTCoupler
  15. )
  16. const (
  17. MqttQos1 byte = 1 //// 消息至少送达一次
  18. FastInterval = 1 * time.Second //// 快速检测时间间隔
  19. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  20. )
  21. var (
  22. Version = "0.0.0.1"
  23. coupler *MQTTCoupler
  24. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  25. ErrIMEINotAvailable = errors.New("device imei is not available")
  26. )
  27. type MQTTCoupler struct {
  28. broker, username, password string
  29. client mqtt.Client
  30. imei string // 设备唯一标识
  31. subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
  32. pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
  33. cwd string // 当前工作目录
  34. ctx context.Context
  35. cancel context.CancelFunc
  36. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  37. }
  38. func init() {
  39. if err := loadAppConfig(); err != nil {
  40. msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
  41. panic(msg)
  42. }
  43. ctx, cancel := context.WithCancel(context.Background())
  44. coupler = &MQTTCoupler{
  45. broker: CfgServers.MQTTSrv.Address,
  46. username: CfgServers.MQTTSrv.Username,
  47. password: CfgServers.MQTTSrv.Password,
  48. cwd: "/",
  49. ctx: ctx,
  50. cancel: cancel,
  51. isConnected: atomic.Bool{},
  52. }
  53. }
  54. func main() {
  55. if baseapp.IsArgsParam("-h") {
  56. help()
  57. return
  58. }
  59. if baseapp.IsArgsParam("-v") {
  60. fmt.Println("程序版本:", Version, "\n构建时间:", baseapp.BuildTime)
  61. return
  62. }
  63. devIMEI := baseapp.GetArgsParamStr("-c", "")
  64. if devIMEI == "" {
  65. help()
  66. return
  67. }
  68. coupler.imei = devIMEI
  69. if err := Coupler.init2(); err != nil {
  70. fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
  71. return
  72. }
  73. for {
  74. }
  75. }
  76. func help() {
  77. h := `
  78. -h 显示帮助提示
  79. -v 当前程序版本
  80. -c 连接目标设备(IMEI), 例如: -c 869523059113051
  81. `
  82. fmt.Println(h)
  83. }
  84. func (c *MQTTCoupler) init2() error {
  85. template := "/yfkj/device/rpc/imei"
  86. c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  87. c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  88. opts := mqtt.NewClientOptions().
  89. AddBroker(c.broker).
  90. SetUsername(c.username).SetPassword(c.password).
  91. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  92. SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
  93. SetOrderMatters(false)
  94. opts.OnConnect = func(client mqtt.Client) {
  95. if !c.isConnected.Swap(true) {
  96. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  97. }
  98. }
  99. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  100. if c.isConnected.Swap(false) {
  101. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  102. }
  103. }
  104. c.client = mqtt.NewClient(opts)
  105. go Coupler.keepOnline()
  106. return nil
  107. }
  108. func (c *MQTTCoupler) keepOnline() {
  109. t := time.NewTimer(FastInterval)
  110. defer t.Stop()
  111. for {
  112. select {
  113. case <-c.ctx.Done():
  114. return
  115. case <-t.C:
  116. t.Reset(c.tick())
  117. } // end select
  118. } // end for
  119. }
  120. func (c *MQTTCoupler) tick() time.Duration {
  121. if c.isConnected.Load() {
  122. return FastInterval
  123. }
  124. if err := c.connect(); err != nil {
  125. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  126. }
  127. return SlowInterval
  128. }
  129. func (c *MQTTCoupler) connect() error {
  130. if c.client.IsConnected() {
  131. return nil
  132. }
  133. token := c.client.Connect()
  134. select {
  135. case <-c.ctx.Done():
  136. return nil
  137. case <-token.Done():
  138. }
  139. return token.Error()
  140. }