package main import ( "context" "encoding/json" "fmt" "strings" "sync" "sync/atomic" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2" ) const ( MqttQos1 byte = 1 //// 消息至少送达一次 FastInterval = 1 * time.Second //// 快速检测时间间隔 SlowInterval = 5 * time.Second //// 慢速检测时间间隔 ) type MQTTCoupler struct { ctx context.Context cancel context.CancelFunc broker, username, password string client mqtt.Client clientID string machineID string isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务 imei string // 设备唯一标识 subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd cwd string // 当前工作目录 // 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map" cmdMu sync.Mutex // 串行执行的锁 pending map[int]chan *jsonrpc2.Response // 等待命令结果 pendingMu sync.Mutex // 等待结果的锁 interrupted chan struct{} // Ctrl+C 通知当前命令取消的信号 } func (c *MQTTCoupler) init2() error { template := "/yfkj/device/rpc/imei" c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei) c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei) opts := mqtt.NewClientOptions(). AddBroker(c.broker). SetUsername(c.username).SetPassword(c.password). SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true). SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间 SetOrderMatters(false) opts.OnConnect = func(client mqtt.Client) { if !c.isConnected.Swap(true) { fmt.Printf("[%s] MQTT Broker连接成功\n", MODULE_NAME) } go func() { // 订阅应答主题 token := c.client.Subscribe(c.subTopic, MqttQos1, c.onCmdAck) select { case <-c.ctx.Done(): return case <-token.Done(): } if token.Error() != nil { return } }() } opts.OnConnectionLost = func(client mqtt.Client, err error) { if c.isConnected.Swap(false) { fmt.Printf("[%s] MQTT Broker连接丢失: %v!\n", MODULE_NAME, err) } } c.pending = make(map[int]chan *jsonrpc2.Response) c.client = mqtt.NewClient(opts) go c.keepOnline() return nil } func (c *MQTTCoupler) keepOnline() { t := time.NewTimer(FastInterval) defer t.Stop() for { select { case <-c.ctx.Done(): return case <-t.C: t.Reset(c.tick()) } // end select } // end for } func (c *MQTTCoupler) tick() time.Duration { if c.isConnected.Load() { return FastInterval } if err := c.connect(); err != nil { fmt.Printf("[%s] MQTT Broker连接失败: %v!!\n", MODULE_NAME, err) } return SlowInterval } func (c *MQTTCoupler) connect() error { if c.client.IsConnected() { return nil } token := c.client.Connect() select { case <-c.ctx.Done(): return nil case <-token.Done(): } return token.Error() } func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (*jsonrpc2.Response, error) { zero := &jsonrpc2.Response{} if c.needSerialize(method) { c.cmdMu.Lock() defer c.cmdMu.Unlock() } req, err := jsonrpc2.BuildRequest(method, params, id...) if err != nil { return zero, err } reqID := *req.ID b, err := json.Marshal(req) if err != nil { return zero, err } ch := make(chan *jsonrpc2.Response, 1) c.pendingMu.Lock() c.pending[reqID] = ch c.pendingMu.Unlock() defer func() { c.pendingMu.Lock() delete(c.pending, reqID) c.pendingMu.Unlock() }() token := c.client.Publish(c.pubTopic, MqttQos1, false, b) select { case <-c.ctx.Done(): return zero, c.ctx.Err() case <-token.Done(): } if token.Error() != nil { return zero, token.Error() } if c.isCtrlCommand(method) { // 控制指令, 等待结果或超时 var timer *time.Timer var timeout <-chan time.Time timer = time.NewTimer(3 * time.Second) timeout = timer.C defer timer.Stop() select { case <-c.ctx.Done(): return zero, c.ctx.Err() case resp := <-ch: return resp, nil case <-timeout: return zero, fmt.Errorf("command timeout") } } else { // 用户指令, 等待结果或用户主动中断取消: Ctrl+C select { case <-c.ctx.Done(): return zero, c.ctx.Err() case resp := <-ch: return resp, nil case <-c.interrupted: return zero, fmt.Errorf("command interrupted by user") } } } func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) { p := msg.Payload() resp, err := jsonrpc2.ParseResponse(p) if err != nil { //////////// 解析应答失败,忽略不管 return } if resp.ID == nil { // 通知类消息, 设计上不应该出现 return } respID := *resp.ID c.pendingMu.Lock() ch, ok := c.pending[respID] c.pendingMu.Unlock() if !ok { /////////////// 未找到对应的请求, 忽略不管 return } select { case ch <- resp: default: } }