Переглянути джерело

新增、升级远程运维模块sshd的代码

niujiuru 3 тижнів тому
батько
коміт
28b95b14fa
5 змінених файлів з 226 додано та 6 видалено
  1. 3 3
      Makefile
  2. 128 0
      sshd/readme.txt
  3. 9 2
      sshd/sshd.go
  4. 14 1
      utils/shell/execute.go
  5. 72 0
      utils/shell/executor.go

+ 3 - 3
Makefile

@@ -29,7 +29,7 @@ SETGO_ENV = \
 DATE := $(shell date +%Y%m%d_%H%M%S)
 
 # 编译的目标
-all : camera_test.out hk_takephoto.out dh_takephoto.out air720u_4g.out rtu_linux_modules.out yfkj_sshd.out yfkj_sshd_cli.out
+all : camera_test.out hk_takephoto.out dh_takephoto.out air720u_4g.out rtu_linux_modules.out yfkj_sshd.out yfkj_ssh_client.out
 
 # 通用基础库
 libswapi.a :
@@ -124,13 +124,13 @@ camera_test.out : libswapi.a libhk_takephoto.a libdh_takephoto.a ./tests/camera/
 
 # 远程运维工具程序
 LIB6 = -Wl,-Bstatic -L./swapi -lswapi -L./air530z -lair530z -Wl,-Bdynamic -lc -lm -ldl -lpthread
-yfkj_sshd.out     : libswapi.a libair530z.a ./sshd/server/server.go
+yfkj_sshd.out       : libswapi.a libair530z.a ./sshd/server/server.go
 	mkdir -p ./build
 	$(GO) mod tidy
 	$(SETGO_ENV) CGO_LDFLAGS="$(LIB6)" $(GO_BUILD) $(GO_FLAGS) -o $@ ./sshd/server/server.go
 	@cp $@ ./build/$(basename $@)_$(DATE)$(suffix $@)
 
-yfkj_sshd_cli.out : libswapi.a libair530z.a ./sshd/client/client.go
+yfkj_ssh_client.out : libswapi.a libair530z.a ./sshd/client/client.go
 	mkdir -p ./build
 	$(GO) mod tidy
 	$(SETGO_ENV) CGO_LDFLAGS="$(LIB6)" $(GO_BUILD) $(GO_FLAGS) -o $@ ./sshd/client/client.go

+ 128 - 0
sshd/readme.txt

@@ -1,4 +1,5 @@
 借助MQTT实现远程控制Linux设备的SSH服务端和客户端
+作者:niujiuru 日期:2026-01-20
 
 1, 整体使用JSONRPC2.0 OVER MQTT的技术方案来架构
 
@@ -17,3 +18,130 @@
 5,客户端原理:
 - 客户端启动时,会连接到MQTT Broker,发布主题:/yfkj/device/rpc/imei/cmd,向服务端发送指令请求
 - 客户端启动时,会连接到MQTT Broker,订阅主题:/yfkj/device/rpc/imei/ack,接收服务端的指令结果
+
+6,总体架构图:
+
+┌──────────────────────────────────────────┐
+│              Remote Client               │
+│        (Web / App / CLI 运维平台)        │
+└───────────────┬──────────────────────────┘
+                │ JSON-RPC 2.0
+                │  - shell.execute
+                │  - shell.interrupt
+                │  - ping
+                v
+┌──────────────────────────────────────────┐
+│               MQTT Broker                │
+│        (QoS1 / KeepAlive / Reconnect)    │
+└───────────────┬──────────────────────────┘
+                │ MQTT Message
+                │ Topic: /yfkj/device/rpc/{imei}/cmd
+                v
+┌──────────────────────────────────────────┐
+│            sshd / MQTTCoupler            │
+│                                          │
+│  Transport & RPC Adapter Layer           │
+│  --------------------------------------- │
+│  - MQTT 连接管理                         │
+│  - JSON-RPC 解析 / 校验                  │
+│  - 方法分发 (Method Dispatch)            │
+│                                          │
+│  RPC → 本地语义映射                       │
+│  --------------------------------------- │
+│  shell.execute   → Executor.Exec()       │
+│  shell.interrupt → Executor.Interrupt()  │
+│                                          │
+└───────────────┬──────────────────────────┘
+                │ 串行 / 单 Session
+                v
+┌──────────────────────────────────────────┐
+│             shell.Executor               │
+│                                          │
+│  Session State Layer  (≈ SSH Session)    │
+│  --------------------------------------- │
+│  State:                                  │
+│    - cwd  : 当前工作目录                  │
+│    - pg   : 当前前台进程组                │
+│                                          │
+│  Built-in Commands                       │
+│  --------------------------------------- │
+│    - cd                                  │
+│    - pwd                                 │
+│                                          │
+│  Control Interface                       │
+│  --------------------------------------- │
+│    Exec()       : 启动命令               │
+│    Interrupt()  : Ctrl+C (SIGINT)        │
+│                                          │
+└───────────────┬──────────────────────────┘
+                │ 单次命令执行
+                v
+┌──────────────────────────────────────────┐
+│            executeInternal               │
+│                                          │
+│  Process Lifecycle Layer                 │
+│  --------------------------------------- │
+│  - fork/exec                             │
+│  - 新进程组 (setpgid)                    │
+│  - stdout / stderr 限流                  │
+│  - 超时控制 (context timeout)            │
+│  - SIGTERM → SIGKILL                     │
+│                                          │
+└───────────────┬──────────────────────────┘
+                │
+                v
+┌──────────────────────────────────────────┐
+│               Linux Kernel               │
+│                                          │
+│  - Process Group                         │
+│  - Signal Delivery                       │
+│  - Exit Status                           │
+└──────────────────────────────────────────┘
+
+7, 取消设计图:Ctrl+C
+
+Remote Client
+   │
+   │ shell.execute
+   v
+┌───────────────┐
+│ sshd          │
+└──────┬────────┘
+       │
+       v
+┌───────────────────────────┐
+│ shell.Executor            │
+│                           │
+│  Exec():                  │
+│   - 设置 p.Dir = cwd      │
+│   - 注册前台进程组 pg      │
+│                           │
+└──────┬────────────────────┘
+       │
+       v
+┌───────────────────────────┐
+│ executeInternal           │
+│                           │
+│  - Start cmd              │
+│  - setpgid                │
+│  - onStart(pg)            │◄──────┐
+│                           │       │
+│  - wait / timeout         │       │
+└──────┬────────────────────┘       │
+       │                            │
+       v                            │
+   Command Running                  │
+                                    │
+Remote Client Ctrl+C                │
+   │                                │
+   │ shell.interrupt                │
+   v                                │
+┌───────────────┐                   │
+│ sshd          │                   │
+└──────┬────────┘                   │
+       │                            │
+       v                            │
+┌───────────────────────────┐       │
+│ Executor.Interrupt()      │       │
+│  → SIGINT to -pgid        │───────┘
+└───────────────────────────┘

+ 9 - 2
sshd/sshd.go

@@ -43,6 +43,8 @@ type MQTTCoupler struct {
 	ctx    context.Context
 	cancel context.CancelFunc
 
+	executor *shell.Executor // 本地执行器, 单实例-串行执行指令
+
 	isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
 
 	// 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
@@ -67,6 +69,7 @@ func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
 		pubTopic:         "",
 		ctx:              ctx,
 		cancel:           cancel,
+		executor:         shell.NewExecutor(),
 		isConnected:      atomic.Bool{},
 		registerRpcMeths: &singletask.OnceTask{},
 	}
@@ -194,7 +197,7 @@ func (c *MQTTCoupler) instRPCMethods() {
 }
 
 func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
-	go c.execOneCmd(msg)
+	c.execOneCmd(msg)
 }
 
 func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
@@ -220,8 +223,12 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 			resp = jsonrpc2.BuildError(req, -32700, err.Error())
 			goto retp
 		}
-		result, err := shell.Execute(params)
+		result, err := c.executor.Exec(params)
 		resp = buildResp(req, result, err)
+	// Call-3:中断本地shell的执行,等价Ctrl+C
+	case "shell.interrupt":
+		err := c.executor.Interrupt()
+		resp = buildResp(req, "interrupted", err)
 	// Call-?:无效, 远端调用了还不支持的-方法
 	default:
 		resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")

+ 14 - 1
utils/shell/execute.go

@@ -33,6 +33,7 @@ var (
 type ExecuteParams struct {
 	Cmd     string `json:"cmd"`               // 命令
 	Timeout int    `json:"timeout,omitempty"` // 超时(秒)
+	Dir     string `json:"-"`                 // 工作目录
 }
 
 type ExecuteResult struct {
@@ -123,7 +124,7 @@ func (pg *processGroup) terminate() error {
 		ErrExecutorLostControl, pg.pgid)
 }
 
-func Execute(p ExecuteParams) (*ExecuteResult, error) {
+func executeInternal(p ExecuteParams, onStart func(pg *processGroup)) (*ExecuteResult, error) {
 	swp := shellwords.NewParser()
 	swp.ParseEnv = true      // 展开 "环境变量"
 	swp.ParseBacktick = true // 展开 `...`命令
@@ -143,6 +144,10 @@ func Execute(p ExecuteParams) (*ExecuteResult, error) {
 
 	cmd := exec.Command(argv[0], argv[1:]...)
 
+	if p.Dir != "" { // 设置工作目录
+		cmd.Dir = p.Dir
+	}
+
 	cmd.SysProcAttr = &syscall.SysProcAttr{ // 新的进程组
 		Setpgid:   true,
 		Pdeathsig: syscall.SIGKILL,
@@ -161,6 +166,10 @@ func Execute(p ExecuteParams) (*ExecuteResult, error) {
 		pgid: cmd.Process.Pid, // 进程组ID就是主进程的PID
 	}
 
+	if onStart != nil {
+		onStart(processInfo)
+	}
+
 	done := make(chan error, 1)
 	go func() {
 		done <- cmd.Wait()
@@ -199,3 +208,7 @@ func Execute(p ExecuteParams) (*ExecuteResult, error) {
 		ExitCode: exitCode,
 	}, finalErr
 }
+
+func Execute(p ExecuteParams) (*ExecuteResult, error) {
+	return executeInternal(p, nil)
+}

+ 72 - 0
utils/shell/executor.go

@@ -0,0 +1,72 @@
+package shell
+
+import (
+	"path/filepath"
+	"strings"
+	"syscall"
+)
+
+type Executor struct {
+	cwd string        // 当前工作目录
+	pg  *processGroup // 当前前台进程
+}
+
+func NewExecutor() *Executor {
+	return &Executor{cwd: "/"}
+}
+
+func (e *Executor) Exec(p ExecuteParams) (*ExecuteResult, error) {
+	if isCD(p.Cmd) {
+		dir, err := resolveCD(e.cwd, p.Cmd)
+		if err != nil {
+			return &ExecuteResult{
+				Stderr:   err.Error() + "\n",
+				ExitCode: 1,
+			}, nil
+		}
+		e.cwd = dir
+		return &ExecuteResult{ExitCode: 0}, nil
+	}
+
+	if strings.TrimSpace(p.Cmd) == "pwd" {
+		return &ExecuteResult{
+			Stdout:   e.cwd + "\n",
+			ExitCode: 0,
+		}, nil
+	}
+
+	p.Dir = e.cwd
+
+	defer func() {
+		e.pg = nil // 命令结束
+	}()
+
+	return executeInternal(p, func(pg *processGroup) { e.pg = pg })
+}
+
+func isCD(cmd string) bool {
+	s := strings.TrimSpace(cmd)
+	return s == "cd" || strings.HasPrefix(s, "cd ")
+}
+
+func resolveCD(cwd, cmd string) (string, error) {
+	fields := strings.Fields(cmd)
+	if len(fields) == 1 { // 只有"cd", 没有参数
+		return "/", nil
+	}
+
+	path := fields[1]
+	if filepath.IsAbs(path) { // 绝对路径
+		return filepath.Clean(path), nil
+	}
+
+	return filepath.Clean(filepath.Join(cwd, path)), nil // 相对路径
+}
+
+func (e *Executor) Interrupt() error {
+	if e.pg == nil {
+		return nil
+	}
+
+	return syscall.Kill(-e.pg.pgid, syscall.SIGINT) // 等价于 Ctrl+C
+}