package sshd import ( "context" "errors" "fmt" "os" "strings" "sync" "sync/atomic" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "hnyfkj.com.cn/rtu/linux/baseapp" "hnyfkj.com.cn/rtu/linux/netmgrd" "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2" "hnyfkj.com.cn/rtu/linux/utils/shell" "hnyfkj.com.cn/rtu/linux/utils/singletask" ) const MODULE_NAME = "YFKJ_SSHD" var ( coupler *MQTTCoupler ) const ( MqttQos1 byte = 1 //// 消息至少送达一次 FastInterval = 1 * time.Second //// 快速检测时间间隔 SlowInterval = 5 * time.Second //// 慢速检测时间间隔 ExecutorCheckInterval = 2 * time.Second // 执行器回收检测 ExecutorTimeout = 6 * time.Second // 执行器超时时间 ) var ( ErrBrokerAddressEmpty = errors.New("mqtt server address is empty") ErrIMEINotAvailable = errors.New("device imei is not available") ) type MQTTCoupler struct { ctx context.Context cancel context.CancelFunc broker, username, password string client mqtt.Client /// MQTT客户端 isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务 imei string // 设备唯一标识 subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack ///////// 本地执行器, 允许多客户端, 同一客户端串行的执行指令 executorMap map[string]*clientExecutor executorMapMu sync.Mutex // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令 registerRpcMeths *singletask.OnceTask // 注册方法, 单实例 } type executorState int const ( execIdle executorState = iota // 空闲状态时, 可安全回收 execRunning // 正在执行时, 不允许回收 execClosing // 表明执行器正在关闭中.. ) type clientExecutor struct { id string /////////////////// 客户端唯一ID executor *shell.Executor /////////////////// 本地的执行器 mu sync.Mutex /////////////////// 同ID串行执行 lastPing time.Time /////////////////// 用于超时回收 state executorState /////////////////// 执行器的状态 } func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool { if mqttBroker == "" { baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty) return false } ctx, cancel := context.WithCancel(context.Background()) coupler = &MQTTCoupler{ ctx: ctx, cancel: cancel, broker: mqttBroker, username: mqttUsername, password: mqttPassword, executorMap: make(map[string]*clientExecutor), registerRpcMeths: &singletask.OnceTask{}, } if err := coupler.init2(); err != nil { baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err) return false } go coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout) go coupler.keepOnline() return true } func ModuleExit() { if coupler != nil { coupler.cancel() } } func (c *MQTTCoupler) init2() error { imeiBytes, _ := os.ReadFile("/var/device_imei.txt") c.imei = strings.TrimSpace(string(imeiBytes)) if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" { return ErrIMEINotAvailable } baseapp.Logger.Infof("[%s] ☺✔设备IMEI: %s", MODULE_NAME, c.imei) template := "/yfkj/device/rpc/imei" c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei) c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei) opts := mqtt.NewClientOptions(). AddBroker(c.broker). SetUsername(c.username).SetPassword(c.password). SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true). SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间 SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false) opts.OnConnect = func(client mqtt.Client) { if !c.isConnected.Swap(true) { baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME) c.registerRpcMeths.Run(c.instRPCMethods, true) // 注册本地的RPC方法, 供远端调用, 单实例运行 } } opts.OnConnectionLost = func(client mqtt.Client, err error) { if c.isConnected.Swap(false) { baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err) } } c.client = mqtt.NewClient(opts) return nil } func (c *MQTTCoupler) keepOnline() { t := time.NewTimer(FastInterval) defer t.Stop() for { select { case <-c.ctx.Done(): return case <-t.C: t.Reset(c.tick()) } // end select } // end for } func (c *MQTTCoupler) tick() time.Duration { if c.isConnected.Load() { return FastInterval } if err := c.connect(); err != nil { baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err) } return SlowInterval } func (c *MQTTCoupler) connect() error { if c.client.IsConnected() { return nil } token := c.client.Connect() select { case <-c.ctx.Done(): return nil case <-token.Done(): } return token.Error() } func (c *MQTTCoupler) instRPCMethods() { t := time.NewTicker(time.Second) defer t.Stop() for { if !c.isConnected.Load() || c.ctx.Err() != nil { return } token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests) select { case <-c.ctx.Done(): return case <-token.Done(): } if token.Error() == nil { baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME) break } select { case <-c.ctx.Done(): return case <-t.C: continue } } } func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) { go c.execOneCmd(msg) } func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) { str := string(msg.Payload()) baseapp.Logger.Debugf("[%s] 收到一个RPC请求: %s", MODULE_NAME, str) var resp *jsonrpc2.Response // 预先定义一个空的应答 var clientID string // 该客户端的|唯一标识| var ce *clientExecutor // 该客户端的本地执行器 var exists bool // 判断执行器是否已存在 req, err := jsonrpc2.ParseRequest(str) if err != nil || req.ID == nil /* 不接受通知类型的消息 */ { resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "") goto retp } clientID, err = extractClientID(req.Params) if err != nil { resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error()) goto retp } c.executorMapMu.Lock() ce, exists = c.executorMap[clientID] if !exists { if len(c.executorMap) >= 3 { c.executorMapMu.Unlock() resp = jsonrpc2.BuildError(req, -32000, "connection refused: server has reached maximum client capacity (3/3)") goto retp } ce = &clientExecutor{ id: clientID, executor: shell.NewExecutor(), state: execIdle, } c.executorMap[clientID] = ce baseapp.Logger.Infof("[%s] 客户端 %s 登录成功", MODULE_NAME, clientID) } c.executorMapMu.Unlock() ce.mu.Lock() ce.lastPing = time.Now() ce.mu.Unlock() switch req.Method { // Call-1: 心跳, 链路检测,"ping-pong"测试 case "executor.ping": resp = buildResp(req, "pong", nil) goto retp // Call-2:在本地shell中执行远程下发的指令 case "executor.exec": ce.mu.Lock() params, err := extractShellExecuteParams(req.Params) if err != nil { ce.mu.Unlock() resp = jsonrpc2.BuildError(req, jsonrpc2.ErrParse, err.Error()) goto retp } if ce.state == execClosing { ce.mu.Unlock() resp = jsonrpc2.BuildError(req, -32001, "executor closed") goto retp } if ce.state == execRunning { ce.mu.Unlock() resp = jsonrpc2.BuildError(req, -32002, "executor busy") goto retp } cmd := params.Cmd if strings.ContainsAny(cmd, "&") { ce.mu.Unlock() resp = jsonrpc2.BuildError(req, -32003, "prohibit the startup of background tasks") goto retp } if strings.ContainsAny(cmd, "|>;") || strings.Contains(cmd, "\n") { safeCmd := strings.ReplaceAll(cmd, "'", "'\\''") cmd = fmt.Sprintf("sh -c '%s'", safeCmd) // 包装成 shell 命令, 支持管道等高级功能 params.Cmd = cmd } hostID := params.HostFingerprint if hostID == "" { hostID = "unknown" } ce.state = execRunning ce.mu.Unlock() start := time.Now() if true { //////// 记录执行日志-执行前 baseapp.Logger.Infof("[%s][▷ EXEC] host=%s, cmd=%q, timeout=%ds", MODULE_NAME, hostID, params.Cmd, params.Timeout) } result, err := ce.executor.Exec(params) // 本地执行用户指令 cost := time.Since(start) if err != nil { // 记录失败日志-执行后 baseapp.Logger.Warnf("[%s][✖ EXEC] host=%s, cmd=%q, err=%v, cost=%v", MODULE_NAME, hostID, params.Cmd, err, cost) } else { ///////// 记录成功日志-执行后 baseapp.Logger.Infof("[%s][✔ EXEC] host=%s, cmd=%q, cost=%v", MODULE_NAME, hostID, params.Cmd, cost) } ce.mu.Lock() if ce.state != execClosing { ce.state = execIdle ce.lastPing = time.Now() } ce.mu.Unlock() resp = buildResp(req, result, err) goto retp // Call-3:中断本地shell的执行,等价Ctrl+C case "executor.interrupt": ce.mu.Lock() running := (ce.state == execRunning) ce.mu.Unlock() if !running { resp = jsonrpc2.BuildError(req, -32004, "no running command") goto retp } err := ce.executor.Interrupt() resp = buildResp(req, "interrupted", err) goto retp // Call-4:客户端安全退出, 释放本地的执行器 case "executor.close": err := ce.handleClose() c.executorMapMu.Lock() delete(c.executorMap, clientID) baseapp.Logger.Infof("[%s] 客户端 %s 退出成功", MODULE_NAME, clientID) c.executorMapMu.Unlock() resp = buildResp(req, "closed", err) goto retp // Call-?:无效, 远端调用了还不支持的-方法 default: resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "") goto retp } retp: text, err := resp.String() if err != nil { baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err) return } token := c.client.Publish(c.pubTopic, MqttQos1, false, text) select { case <-c.ctx.Done(): return case <-token.Done(): } if err := token.Error(); err != nil { baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err) } baseapp.Logger.Debugf("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text) } func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-c.ctx.Done(): return case <-ticker.C: c.executorMapMu.Lock() for id, ce := range c.executorMap { ce.mu.Lock() expired := time.Since(ce.lastPing) > timeout idle := (ce.state == execIdle) ce.mu.Unlock() if expired && idle { // 超时且状态空闲时则回收 ce.handleClose() //// 该函数不能阻塞, 否则锁 delete(c.executorMap, id) baseapp.Logger.Infof("[%s] 客户端 %s 超时移除", MODULE_NAME, id) } // end if } // end for2 c.executorMapMu.Unlock() } // end select } ////// end for1 } func (ce *clientExecutor) handleClose() error { needInterrupt := false ce.mu.Lock() switch ce.state { case execIdle: ce.state = execClosing case execRunning: ce.state = execClosing needInterrupt = true case execClosing: ce.mu.Unlock() return nil } ce.mu.Unlock() var err error if needInterrupt { err = ce.executor.Interrupt() // 发送"Ctrl+C" } return err }