package main import ( "context" "errors" "fmt" "strings" "sync/atomic" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "hnyfkj.com.cn/rtu/linux/baseapp" ) const MODULE_NAME = "YFKJ_SSH_CLIENT" var ( Coupler *MQTTCoupler ) const ( MqttQos1 byte = 1 //// 消息至少送达一次 FastInterval = 1 * time.Second //// 快速检测时间间隔 SlowInterval = 5 * time.Second //// 慢速检测时间间隔 ) var ( Version = "0.0.0.1" coupler *MQTTCoupler ErrBrokerAddressEmpty = errors.New("mqtt server address is empty") ErrIMEINotAvailable = errors.New("device imei is not available") ) type MQTTCoupler struct { broker, username, password string client mqtt.Client imei string // 设备唯一标识 subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd cwd string // 当前工作目录 ctx context.Context cancel context.CancelFunc isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务 } func init() { if err := loadAppConfig(); err != nil { msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err) panic(msg) } ctx, cancel := context.WithCancel(context.Background()) coupler = &MQTTCoupler{ broker: CfgServers.MQTTSrv.Address, username: CfgServers.MQTTSrv.Username, password: CfgServers.MQTTSrv.Password, cwd: "/", ctx: ctx, cancel: cancel, isConnected: atomic.Bool{}, } } func main() { if baseapp.IsArgsParam("-h") { help() return } if baseapp.IsArgsParam("-v") { fmt.Println("程序版本:", Version, "\n构建时间:", baseapp.BuildTime) return } devIMEI := baseapp.GetArgsParamStr("-c", "") if devIMEI == "" { help() return } coupler.imei = devIMEI if err := Coupler.init2(); err != nil { fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err) return } for { } } func help() { h := ` -h 显示帮助提示 -v 当前程序版本 -c 连接目标设备(IMEI), 例如: -c 869523059113051 ` fmt.Println(h) } func (c *MQTTCoupler) init2() error { template := "/yfkj/device/rpc/imei" c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei) c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei) opts := mqtt.NewClientOptions(). AddBroker(c.broker). SetUsername(c.username).SetPassword(c.password). SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true). SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间 SetOrderMatters(false) opts.OnConnect = func(client mqtt.Client) { if !c.isConnected.Swap(true) { baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME) } } opts.OnConnectionLost = func(client mqtt.Client, err error) { if c.isConnected.Swap(false) { baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err) } } c.client = mqtt.NewClient(opts) go Coupler.keepOnline() return nil } func (c *MQTTCoupler) keepOnline() { t := time.NewTimer(FastInterval) defer t.Stop() for { select { case <-c.ctx.Done(): return case <-t.C: t.Reset(c.tick()) } // end select } // end for } func (c *MQTTCoupler) tick() time.Duration { if c.isConnected.Load() { return FastInterval } if err := c.connect(); err != nil { baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err) } return SlowInterval } func (c *MQTTCoupler) connect() error { if c.client.IsConnected() { return nil } token := c.client.Connect() select { case <-c.ctx.Done(): return nil case <-token.Done(): } return token.Error() }