| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- package sshd
- import (
- "context"
- "errors"
- "fmt"
- "os"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "hnyfkj.com.cn/rtu/linux/baseapp"
- "hnyfkj.com.cn/rtu/linux/netmgrd"
- "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
- "hnyfkj.com.cn/rtu/linux/utils/shell"
- "hnyfkj.com.cn/rtu/linux/utils/singletask"
- )
- const MODULE_NAME = "YFKJ_SSHD"
- var (
- coupler *MQTTCoupler
- )
- const (
- MqttQos1 byte = 1 //// 消息至少送达一次
- FastInterval = 1 * time.Second //// 快速检测时间间隔
- SlowInterval = 5 * time.Second //// 慢速检测时间间隔
- ExecutorCheckInterval = 2 * time.Second // 执行器回收检测
- ExecutorTimeout = 6 * time.Second // 执行器超时时间
- )
- var (
- ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
- ErrIMEINotAvailable = errors.New("device imei is not available")
- )
- type MQTTCoupler struct {
- ctx context.Context
- cancel context.CancelFunc
- broker, username, password string
- client mqtt.Client /// MQTT客户端
- isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
- imei string // 设备唯一标识
- subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
- pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
- ///////// 本地执行器, 允许多客户端, 同一客户端串行的执行指令
- executorMap map[string]*clientExecutor
- executorMapMu sync.Mutex
- // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
- registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
- }
- type executorState int
- const (
- execIdle executorState = iota // 空闲状态时, 可安全回收
- execRunning // 正在执行时, 不允许回收
- execClosing // 表明执行器正在关闭中..
- )
- type clientExecutor struct {
- id string /////////////////// 客户端唯一ID
- executor *shell.Executor /////////////////// 本地的执行器
- mu sync.Mutex /////////////////// 同ID串行执行
- lastPing time.Time /////////////////// 用于超时回收
- state executorState /////////////////// 执行器的状态
- }
- func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
- if mqttBroker == "" {
- baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
- return false
- }
- ctx, cancel := context.WithCancel(context.Background())
- coupler = &MQTTCoupler{
- ctx: ctx,
- cancel: cancel,
- broker: mqttBroker,
- username: mqttUsername,
- password: mqttPassword,
- executorMap: make(map[string]*clientExecutor),
- registerRpcMeths: &singletask.OnceTask{},
- }
- if err := coupler.init2(); err != nil {
- baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
- return false
- }
- go coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout)
- go coupler.keepOnline()
- return true
- }
- func ModuleExit() {
- if coupler != nil {
- coupler.cancel()
- }
- }
- func (c *MQTTCoupler) init2() error {
- imeiBytes, _ := os.ReadFile("/var/device_imei.txt")
- c.imei = strings.TrimSpace(string(imeiBytes))
- if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
- return ErrIMEINotAvailable
- }
- baseapp.Logger.Infof("[%s] ☺✔设备IMEI: %s", MODULE_NAME, c.imei)
- template := "/yfkj/device/rpc/imei"
- c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
- c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
- opts := mqtt.NewClientOptions().
- AddBroker(c.broker).
- SetUsername(c.username).SetPassword(c.password).
- SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
- SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
- SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
- opts.OnConnect = func(client mqtt.Client) {
- if !c.isConnected.Swap(true) {
- baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
- c.registerRpcMeths.Run(c.instRPCMethods, true) // 注册本地的RPC方法, 供远端调用, 单实例运行
- }
- }
- opts.OnConnectionLost = func(client mqtt.Client, err error) {
- if c.isConnected.Swap(false) {
- baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
- }
- }
- c.client = mqtt.NewClient(opts)
- return nil
- }
- func (c *MQTTCoupler) keepOnline() {
- t := time.NewTimer(FastInterval)
- defer t.Stop()
- for {
- select {
- case <-c.ctx.Done():
- return
- case <-t.C:
- t.Reset(c.tick())
- } // end select
- } // end for
- }
- func (c *MQTTCoupler) tick() time.Duration {
- if c.isConnected.Load() {
- return FastInterval
- }
- if err := c.connect(); err != nil {
- baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
- }
- return SlowInterval
- }
- func (c *MQTTCoupler) connect() error {
- if c.client.IsConnected() {
- return nil
- }
- token := c.client.Connect()
- select {
- case <-c.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
- func (c *MQTTCoupler) instRPCMethods() {
- t := time.NewTicker(time.Second)
- defer t.Stop()
- for {
- if !c.isConnected.Load() || c.ctx.Err() != nil {
- return
- }
- token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
- select {
- case <-c.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() == nil {
- baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
- break
- }
- select {
- case <-c.ctx.Done():
- return
- case <-t.C:
- continue
- }
- }
- }
- func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
- go c.execOneCmd(msg)
- }
- func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
- str := string(msg.Payload())
- baseapp.Logger.Debugf("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
- var resp *jsonrpc2.Response // 预先定义一个空的应答
- var clientID string // 该客户端的|唯一标识|
- var ce *clientExecutor // 该客户端的本地执行器
- var exists bool // 判断执行器是否已存在
- req, err := jsonrpc2.ParseRequest(str)
- if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
- resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
- goto retp
- }
- clientID, err = extractClientID(req.Params)
- if err != nil {
- resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error())
- goto retp
- }
- c.executorMapMu.Lock()
- ce, exists = c.executorMap[clientID]
- if !exists {
- if len(c.executorMap) >= 3 {
- c.executorMapMu.Unlock()
- resp = jsonrpc2.BuildError(req, -32000, "connection refused: server has reached maximum client capacity (3/3)")
- goto retp
- }
- ce = &clientExecutor{
- id: clientID,
- executor: shell.NewExecutor(),
- state: execIdle,
- }
- c.executorMap[clientID] = ce
- baseapp.Logger.Infof("[%s] 客户端 %s 登录成功", MODULE_NAME, clientID)
- }
- c.executorMapMu.Unlock()
- ce.mu.Lock()
- ce.lastPing = time.Now()
- ce.mu.Unlock()
- switch req.Method {
- // Call-1: 心跳, 链路检测,"ping-pong"测试
- case "executor.ping":
- resp = buildResp(req, "pong", nil)
- goto retp
- // Call-2:在本地shell中执行远程下发的指令
- case "executor.exec":
- ce.mu.Lock()
- params, err := extractShellExecuteParams(req.Params)
- if err != nil {
- ce.mu.Unlock()
- resp = jsonrpc2.BuildError(req, jsonrpc2.ErrParse, err.Error())
- goto retp
- }
- if ce.state == execClosing {
- ce.mu.Unlock()
- resp = jsonrpc2.BuildError(req, -32001, "executor closed")
- goto retp
- }
- if ce.state == execRunning {
- ce.mu.Unlock()
- resp = jsonrpc2.BuildError(req, -32002, "executor busy")
- goto retp
- }
- cmd := params.Cmd
- if strings.ContainsAny(cmd, "&") {
- ce.mu.Unlock()
- resp = jsonrpc2.BuildError(req, -32003, "prohibit the startup of background tasks")
- goto retp
- }
- if strings.ContainsAny(cmd, "|>;") || strings.Contains(cmd, "\n") {
- safeCmd := strings.ReplaceAll(cmd, "'", "'\\''")
- cmd = fmt.Sprintf("sh -c '%s'", safeCmd) // 包装成 shell 命令, 支持管道等高级功能
- params.Cmd = cmd
- }
- hostID := params.HostFingerprint
- if hostID == "" {
- hostID = "unknown"
- }
- ce.state = execRunning
- ce.mu.Unlock()
- start := time.Now()
- if true { //////// 记录执行日志-执行前
- baseapp.Logger.Infof("[%s][▷ EXEC] host=%s, cmd=%q, timeout=%ds",
- MODULE_NAME, hostID, params.Cmd, params.Timeout)
- }
- result, err := ce.executor.Exec(params) // 本地执行用户指令
- cost := time.Since(start)
- if err != nil { // 记录失败日志-执行后
- baseapp.Logger.Warnf("[%s][✖ EXEC] host=%s, cmd=%q, err=%v, cost=%v",
- MODULE_NAME, hostID, params.Cmd, err, cost)
- } else { ///////// 记录成功日志-执行后
- baseapp.Logger.Infof("[%s][✔ EXEC] host=%s, cmd=%q, cost=%v",
- MODULE_NAME, hostID, params.Cmd, cost)
- }
- ce.mu.Lock()
- if ce.state != execClosing {
- ce.state = execIdle
- ce.lastPing = time.Now()
- }
- ce.mu.Unlock()
- resp = buildResp(req, result, err)
- goto retp
- // Call-3:中断本地shell的执行,等价Ctrl+C
- case "executor.interrupt":
- ce.mu.Lock()
- running := (ce.state == execRunning)
- ce.mu.Unlock()
- if !running {
- resp = jsonrpc2.BuildError(req, -32004, "no running command")
- goto retp
- }
- err := ce.executor.Interrupt()
- resp = buildResp(req, "interrupted", err)
- goto retp
- // Call-4:客户端安全退出, 释放本地的执行器
- case "executor.close":
- err := ce.handleClose()
- c.executorMapMu.Lock()
- delete(c.executorMap, clientID)
- baseapp.Logger.Infof("[%s] 客户端 %s 退出成功", MODULE_NAME, clientID)
- c.executorMapMu.Unlock()
- resp = buildResp(req, "closed", err)
- goto retp
- // Call-?:无效, 远端调用了还不支持的-方法
- default:
- resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
- goto retp
- }
- retp:
- text, err := resp.String()
- if err != nil {
- baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
- return
- }
- token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
- select {
- case <-c.ctx.Done():
- return
- case <-token.Done():
- }
- if err := token.Error(); err != nil {
- baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
- }
- baseapp.Logger.Debugf("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
- }
- func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-c.ctx.Done():
- return
- case <-ticker.C:
- c.executorMapMu.Lock()
- for id, ce := range c.executorMap {
- ce.mu.Lock()
- expired := time.Since(ce.lastPing) > timeout
- idle := (ce.state == execIdle)
- ce.mu.Unlock()
- if expired && idle { // 超时且状态空闲时则回收
- ce.handleClose() //// 该函数不能阻塞, 否则锁
- delete(c.executorMap, id)
- baseapp.Logger.Infof("[%s] 客户端 %s 超时移除", MODULE_NAME, id)
- } // end if
- } // end for2
- c.executorMapMu.Unlock()
- } // end select
- } ////// end for1
- }
- func (ce *clientExecutor) handleClose() error {
- needInterrupt := false
- ce.mu.Lock()
- switch ce.state {
- case execIdle:
- ce.state = execClosing
- case execRunning:
- ce.state = execClosing
- needInterrupt = true
- case execClosing:
- ce.mu.Unlock()
- return nil
- }
- ce.mu.Unlock()
- var err error
- if needInterrupt {
- err = ce.executor.Interrupt() // 发送"Ctrl+C"
- }
- return err
- }
|