Bläddra i källkod

优化修改sshd远程运维模块的client代码,解决bug

niujiuru 2 veckor sedan
förälder
incheckning
4c7424f325
3 ändrade filer med 40 tillägg och 33 borttagningar
  1. 1 1
      sshd/client/client.go
  2. 18 28
      sshd/client/coupler.go
  3. 21 4
      sshd/client/invoker.go

+ 1 - 1
sshd/client/client.go

@@ -56,7 +56,7 @@ func main() {
 		cwd:      "/",
 	}
 
-	if err := coupler.init(); err != nil {
+	if err := coupler.init2(); err != nil {
 		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
 		return
 	}

+ 18 - 28
sshd/client/coupler.go

@@ -35,12 +35,12 @@ type MQTTCoupler struct {
 	pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
 	cwd      string // 当前工作目录
 
-	cmdMu     sync.Mutex                       // 串行执行的锁
-	pending   map[int]chan shell.ExecuteResult // 等待命令结果
-	pendingMu sync.Mutex                       // 等待结果的锁
+	cmdMu     sync.Mutex                     ///// 串行执行的锁
+	pending   map[int]chan jsonrpc2.Response ///// 等待命令结果
+	pendingMu sync.Mutex                     ///// 等待结果的锁
 }
 
-func (c *MQTTCoupler) init() error {
+func (c *MQTTCoupler) init2() error {
 	template := "/yfkj/device/rpc/imei"
 	c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
 	c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
@@ -75,7 +75,7 @@ func (c *MQTTCoupler) init() error {
 		}
 	}
 
-	c.pending = make(map[int]chan shell.ExecuteResult)
+	c.pending = make(map[int]chan jsonrpc2.Response)
 
 	c.client = mqtt.NewClient(opts)
 	go c.keepOnline()
@@ -124,7 +124,9 @@ func (c *MQTTCoupler) connect() error {
 	return token.Error()
 }
 
-func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (shell.ExecuteResult, error) {
+func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (jsonrpc2.Response, error) {
+	zero := jsonrpc2.Response{}
+
 	if c.needSerialize(method) {
 		c.cmdMu.Lock()
 		defer c.cmdMu.Unlock()
@@ -132,16 +134,16 @@ func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (shell.Execute
 
 	req, err := jsonrpc2.BuildRequest(method, params, id...)
 	if err != nil {
-		return shell.ExecuteResult{}, err
+		return zero, err
 	}
 	reqID := *req.ID
 
 	b, err := json.Marshal(req)
 	if err != nil {
-		return shell.ExecuteResult{}, err
+		return zero, err
 	}
 
-	ch := make(chan shell.ExecuteResult, 1)
+	ch := make(chan jsonrpc2.Response, 1)
 
 	c.pendingMu.Lock()
 	c.pending[reqID] = ch
@@ -156,12 +158,12 @@ func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (shell.Execute
 
 	select {
 	case <-c.ctx.Done():
-		return shell.ExecuteResult{}, c.ctx.Err()
+		return zero, c.ctx.Err()
 	case <-token.Done():
 	}
 
 	if token.Error() != nil {
-		return shell.ExecuteResult{}, token.Error()
+		return zero, token.Error()
 	}
 
 	var timer *time.Timer
@@ -174,11 +176,11 @@ func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (shell.Execute
 
 	select {
 	case <-c.ctx.Done():
-		return shell.ExecuteResult{}, c.ctx.Err()
-	case res := <-ch:
-		return res, nil
+		return zero, c.ctx.Err()
+	case resp := <-ch:
+		return resp, nil
 	case <-timeout:
-		return shell.ExecuteResult{}, fmt.Errorf("command timeout")
+		return zero, fmt.Errorf("command timeout")
 	}
 }
 
@@ -203,20 +205,8 @@ func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
 		return
 	}
 
-	var execResult shell.ExecuteResult
-
-	if resp.Error != nil { ////////////////// 错误应答
-		execResult.ExitCode = int(resp.Error.Code)
-		execResult.Stderr = resp.Error.Message
-	} else if len(resp.Result) > 0 { //////// 正确应答
-		if err := json.Unmarshal(resp.Result, &execResult); err != nil {
-			execResult.ExitCode = 1
-			execResult.Stderr = err.Error()
-		}
-	}
-
 	select {
-	case ch <- execResult:
+	case ch <- resp:
 	default:
 	}
 }

+ 21 - 4
sshd/client/invoker.go

@@ -1,8 +1,10 @@
 package main
 
 import (
+	"encoding/json"
 	"time"
 
+	"hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
 	"hnyfkj.com.cn/rtu/linux/utils/shell"
 )
 
@@ -38,7 +40,7 @@ func (c *MQTTCoupler) needTimeoutEnd(method string) bool {
 }
 
 // 心跳检测
-func (c *MQTTCoupler) Ping() (shell.ExecuteResult, error) {
+func (c *MQTTCoupler) Ping() (jsonrpc2.Response, error) {
 	params := struct {
 		ClientID string `json:"client_id"`
 	}{
@@ -67,11 +69,26 @@ func (c *MQTTCoupler) Exec(
 		params.Timeout = timeout
 	}
 
-	return c.doCmd(rpc_exec, params)
+	exrs := shell.ExecuteResult{}
+	resp, err := c.doCmd(rpc_exec, params)
+	if err != nil {
+		return exrs, err
+	}
+
+	if resp.Error != nil { ////////////////// 错误应答
+		exrs.ExitCode = int(resp.Error.Code)
+		exrs.Stderr = resp.Error.Message
+	} else if len(resp.Result) > 0 { //////// 正确应答
+		if err := json.Unmarshal(resp.Result, &exrs); err != nil {
+			exrs.ExitCode = 1
+			exrs.Stderr = err.Error()
+		}
+	}
+	return exrs, nil
 }
 
 // 中断执行
-func (c *MQTTCoupler) Stop() (shell.ExecuteResult, error) {
+func (c *MQTTCoupler) Stop() (jsonrpc2.Response, error) {
 	params := struct {
 		ClientID string `json:"client_id"`
 	}{
@@ -82,7 +99,7 @@ func (c *MQTTCoupler) Stop() (shell.ExecuteResult, error) {
 }
 
 // 关闭退出
-func (c *MQTTCoupler) Quit() (shell.ExecuteResult, error) {
+func (c *MQTTCoupler) Quit() (jsonrpc2.Response, error) {
 	params := struct {
 		ClientID string `json:"client_id"`
 	}{