| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
- )
- const (
- MqttQos1 byte = 1 //// 消息至少送达一次
- FastInterval = 1 * time.Second //// 快速检测时间间隔
- SlowInterval = 5 * time.Second //// 慢速检测时间间隔
- )
- type MQTTCoupler struct {
- ctx context.Context
- cancel context.CancelFunc
- broker, username, password string
- client mqtt.Client
- clientID string
- machineID string
- isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
- imei string // 设备唯一标识
- subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
- pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
- cwd string // 当前工作目录
- // 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map"
- cmdMu sync.Mutex // 串行执行的锁
- pending map[int]chan *jsonrpc2.Response // 等待命令结果
- pendingMu sync.Mutex // 等待结果的锁
- interrupted chan struct{} // Ctrl+C 通知当前命令取消的信号
- }
- func (c *MQTTCoupler) init2() error {
- template := "/yfkj/device/rpc/imei"
- c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
- c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
- opts := mqtt.NewClientOptions().
- AddBroker(c.broker).
- SetUsername(c.username).SetPassword(c.password).
- SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
- SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
- SetOrderMatters(false)
- opts.OnConnect = func(client mqtt.Client) {
- if !c.isConnected.Swap(true) {
- fmt.Printf("[%s] MQTT Broker连接成功\n", MODULE_NAME)
- }
- go func() { // 订阅应答主题
- token := c.client.Subscribe(c.subTopic, MqttQos1, c.onCmdAck)
- select {
- case <-c.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- return
- }
- }()
- }
- opts.OnConnectionLost = func(client mqtt.Client, err error) {
- if c.isConnected.Swap(false) {
- fmt.Printf("[%s] MQTT Broker连接丢失: %v!\n", MODULE_NAME, err)
- }
- }
- c.pending = make(map[int]chan *jsonrpc2.Response)
- c.client = mqtt.NewClient(opts)
- go c.keepOnline()
- return nil
- }
- func (c *MQTTCoupler) keepOnline() {
- t := time.NewTimer(FastInterval)
- defer t.Stop()
- for {
- select {
- case <-c.ctx.Done():
- return
- case <-t.C:
- t.Reset(c.tick())
- } // end select
- } // end for
- }
- func (c *MQTTCoupler) tick() time.Duration {
- if c.isConnected.Load() {
- return FastInterval
- }
- if err := c.connect(); err != nil {
- fmt.Printf("[%s] MQTT Broker连接失败: %v!!\n", MODULE_NAME, err)
- }
- return SlowInterval
- }
- func (c *MQTTCoupler) connect() error {
- if c.client.IsConnected() {
- return nil
- }
- token := c.client.Connect()
- select {
- case <-c.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
- func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (*jsonrpc2.Response, error) {
- zero := &jsonrpc2.Response{}
- if c.needSerialize(method) {
- c.cmdMu.Lock()
- defer c.cmdMu.Unlock()
- }
- req, err := jsonrpc2.BuildRequest(method, params, id...)
- if err != nil {
- return zero, err
- }
- reqID := *req.ID
- b, err := json.Marshal(req)
- if err != nil {
- return zero, err
- }
- ch := make(chan *jsonrpc2.Response, 1)
- c.pendingMu.Lock()
- c.pending[reqID] = ch
- c.pendingMu.Unlock()
- defer func() {
- c.pendingMu.Lock()
- delete(c.pending, reqID)
- c.pendingMu.Unlock()
- }()
- token := c.client.Publish(c.pubTopic, MqttQos1, false, b)
- select {
- case <-c.ctx.Done():
- return zero, c.ctx.Err()
- case <-token.Done():
- }
- if token.Error() != nil {
- return zero, token.Error()
- }
- if c.isCtrlCommand(method) { // 控制指令, 等待结果或超时
- var timer *time.Timer
- var timeout <-chan time.Time
- timer = time.NewTimer(3 * time.Second)
- timeout = timer.C
- defer timer.Stop()
- select {
- case <-c.ctx.Done():
- return zero, c.ctx.Err()
- case resp := <-ch:
- return resp, nil
- case <-timeout:
- return zero, fmt.Errorf("command timeout")
- }
- } else { // 用户指令, 等待结果或用户主动中断取消: Ctrl+C
- select {
- case <-c.ctx.Done():
- return zero, c.ctx.Err()
- case resp := <-ch:
- return resp, nil
- case <-c.interrupted:
- return zero, fmt.Errorf("command interrupted by user")
- }
- }
- }
- func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
- p := msg.Payload()
- resp, err := jsonrpc2.ParseResponse(p)
- if err != nil { //////////// 解析应答失败,忽略不管
- return
- }
- if resp.ID == nil { // 通知类消息, 设计上不应该出现
- return
- }
- respID := *resp.ID
- c.pendingMu.Lock()
- ch, ok := c.pending[respID]
- c.pendingMu.Unlock()
- if !ok { /////////////// 未找到对应的请求, 忽略不管
- return
- }
- select {
- case ch <- resp:
- default:
- }
- }
|