ソースを参照

1, 优化修改sshd远程运维模块代码; 2, 编写sshd客户端代码

niujiuru 2 週間 前
コミット
a7a8c1dbed
4 ファイル変更243 行追加37 行削除
  1. 169 1
      sshd/client/client.go
  2. 0 6
      sshd/client/config.go
  3. 1 0
      sshd/server/server.go
  4. 73 30
      sshd/sshd.go

+ 169 - 1
sshd/client/client.go

@@ -1,6 +1,174 @@
 package main
 
-const MODULE_NAME = "YFKJ_SSHD_CTL"
+import (
+	"context"
+	"errors"
+	"fmt"
+	"strings"
+	"sync/atomic"
+	"time"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"hnyfkj.com.cn/rtu/linux/baseapp"
+)
+
+const MODULE_NAME = "YFKJ_SSH_CLIENT"
+
+var (
+	Coupler *MQTTCoupler
+)
+
+const (
+	MqttQos1     byte = 1               //// 消息至少送达一次
+	FastInterval      = 1 * time.Second //// 快速检测时间间隔
+	SlowInterval      = 5 * time.Second //// 慢速检测时间间隔
+)
+
+var (
+	Version               = "0.0.0.1"
+	coupler               *MQTTCoupler
+	ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
+	ErrIMEINotAvailable   = errors.New("device imei is not available")
+)
+
+type MQTTCoupler struct {
+	broker, username, password string
+	client                     mqtt.Client
+
+	imei     string // 设备唯一标识
+	subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
+	pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
+	cwd      string // 当前工作目录
+
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
+}
+
+func init() {
+	if err := loadAppConfig(); err != nil {
+		msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
+		panic(msg)
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	coupler = &MQTTCoupler{
+		broker:      CfgServers.MQTTSrv.Address,
+		username:    CfgServers.MQTTSrv.Username,
+		password:    CfgServers.MQTTSrv.Password,
+		cwd:         "/",
+		ctx:         ctx,
+		cancel:      cancel,
+		isConnected: atomic.Bool{},
+	}
+}
 
 func main() {
+	if baseapp.IsArgsParam("-h") {
+		help()
+		return
+	}
+
+	if baseapp.IsArgsParam("-v") {
+		fmt.Println("程序版本:", Version, "\n构建时间:", baseapp.BuildTime)
+		return
+	}
+
+	devIMEI := baseapp.GetArgsParamStr("-c", "")
+	if devIMEI == "" {
+		help()
+		return
+	}
+
+	coupler.imei = devIMEI
+	if err := Coupler.init2(); err != nil {
+		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
+		return
+	}
+
+	for {
+	}
+}
+
+func help() {
+	h := `
+-h 显示帮助提示
+-v 当前程序版本
+-c 连接目标设备(IMEI), 例如: -c 869523059113051
+`
+
+	fmt.Println(h)
+}
+
+func (c *MQTTCoupler) init2() error {
+	template := "/yfkj/device/rpc/imei"
+	c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
+	c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
+
+	opts := mqtt.NewClientOptions().
+		AddBroker(c.broker).
+		SetUsername(c.username).SetPassword(c.password).
+		SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
+		SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
+		SetOrderMatters(false)
+
+	opts.OnConnect = func(client mqtt.Client) {
+		if !c.isConnected.Swap(true) {
+			baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
+		}
+	}
+
+	opts.OnConnectionLost = func(client mqtt.Client, err error) {
+		if c.isConnected.Swap(false) {
+			baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
+		}
+	}
+
+	c.client = mqtt.NewClient(opts)
+	go Coupler.keepOnline()
+
+	return nil
+}
+
+func (c *MQTTCoupler) keepOnline() {
+	t := time.NewTimer(FastInterval)
+	defer t.Stop()
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case <-t.C:
+			t.Reset(c.tick())
+		} // end select
+	} // end for
+}
+
+func (c *MQTTCoupler) tick() time.Duration {
+	if c.isConnected.Load() {
+		return FastInterval
+	}
+
+	if err := c.connect(); err != nil {
+		baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
+	}
+
+	return SlowInterval
+}
+
+func (c *MQTTCoupler) connect() error {
+	if c.client.IsConnected() {
+		return nil
+	}
+
+	token := c.client.Connect()
+	select {
+	case <-c.ctx.Done():
+		return nil
+	case <-token.Done():
+	}
+
+	return token.Error()
 }

+ 0 - 6
sshd/client/config.go

@@ -28,12 +28,6 @@ var (
 	CfgServers = &Config{}
 )
 
-func init() {
-	if err := loadAppConfig(); err != nil {
-		baseapp.Logger.Panicf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
-	}
-}
-
 func loadAppConfig() error {
 	cfgFile := filepath.Join(baseapp.CFG_DIR, "config.ini")
 	cfgIni, err := ini.Load(cfgFile)

+ 1 - 0
sshd/server/server.go

@@ -62,6 +62,7 @@ func main() {
 
 	<-baseapp.IsExit2()
 
+	sshd.ModuleExit()
 	baseapp.Logger.Info("程序退出")
 	baseapp.ExitLogger() // 安全的关闭日志模块
 	baseapp.SafeExit()   // 安全的关闭退出程序

+ 73 - 30
sshd/sshd.go

@@ -19,7 +19,7 @@ import (
 const MODULE_NAME = "YFKJ_SSHD"
 
 var (
-	Coupler *MQTTCoupler
+	coupler *MQTTCoupler
 )
 
 const (
@@ -62,6 +62,7 @@ type executorState int
 const (
 	execIdle    executorState = iota // 空闲状态时, 可安全回收
 	execRunning                      // 正在执行时, 不允许回收
+	execClosing                      // 表明执行器正在关闭中..
 )
 
 type clientExecutor struct {
@@ -80,7 +81,7 @@ func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
 
 	ctx, cancel := context.WithCancel(context.Background())
 
-	Coupler = &MQTTCoupler{
+	coupler = &MQTTCoupler{
 		broker:           mqttBroker,
 		username:         mqttUsername,
 		password:         mqttPassword,
@@ -95,19 +96,19 @@ func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
 		registerRpcMeths: &singletask.OnceTask{},
 	}
 
-	if err := Coupler.init2(); err != nil {
+	if err := coupler.init2(); err != nil {
 		baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
 		return false
 	}
-	go Coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout)
-	go Coupler.keepOnline()
+	go coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout)
+	go coupler.keepOnline()
 
 	return true
 }
 
 func ModuleExit() {
-	if Coupler != nil {
-		Coupler.cancel()
+	if coupler != nil {
+		coupler.cancel()
 	}
 }
 
@@ -226,9 +227,9 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 	str := string(msg.Payload())
 	baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
 
-	var resp *jsonrpc2.Response // 预定义一个空的应答
-	var clientID string         // 该客户端的唯一标识
-	var ce *clientExecutor      // 该客户端的指令执行器
+	var resp *jsonrpc2.Response // 预定义一个空的应答
+	var clientID string         // 该客户端的|唯一标识|
+	var ce *clientExecutor      // 该客户端的本地执行器
 	var exists bool             // 判断执行器是否已存在
 
 	req, err := jsonrpc2.ParseRequest(str)
@@ -260,15 +261,19 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 	}
 	c.executorMapMu.Unlock()
 
-	ce.mu.Lock() // 确保同一客户端(ID一样)的指令串行执行
+	ce.mu.Lock()
 	ce.lastPing = time.Now()
+	ce.mu.Unlock()
 
 	switch req.Method {
 	// Call-1: 心跳, 链路检测,"ping-pong"测试
 	case "executor.ping":
 		resp = buildResp(req, "pong", nil)
+		goto retp
 	// Call-2:在本地shell中执行远程下发的指令
 	case "executor.exec":
+		ce.mu.Lock()
+
 		params, err := extractShellExecuteParams(req.Params)
 		if err != nil {
 			ce.mu.Unlock()
@@ -276,48 +281,61 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 			goto retp
 		}
 
+		if ce.state == execClosing {
+			ce.mu.Unlock()
+			resp = jsonrpc2.BuildError(req, -32001, "executor closed")
+			goto retp
+		}
+
+		if ce.state == execRunning {
+			ce.mu.Unlock()
+			resp = jsonrpc2.BuildError(req, -32002, "executor busy")
+			goto retp
+		}
+
 		ce.state = execRunning
 		ce.mu.Unlock()
 
-		result, err := ce.executor.Exec(params)
+		result, err := ce.executor.Exec(params) // 本地执行用户指令
 
 		ce.mu.Lock()
-		ce.state = execIdle
-		ce.lastPing = time.Now()
+		if ce.state != execClosing {
+			ce.state = execIdle
+			ce.lastPing = time.Now()
+		}
 		ce.mu.Unlock()
 
 		resp = buildResp(req, result, err)
 		goto retp
 	// Call-3:中断本地shell的执行,等价Ctrl+C
 	case "executor.interrupt":
-		if ce.state != execRunning {
-			resp = jsonrpc2.BuildError(req, -32001, "no running command")
-			break
+		ce.mu.Lock()
+		running := (ce.state == execRunning)
+		ce.mu.Unlock()
+
+		if !running {
+			resp = jsonrpc2.BuildError(req, -32003, "no running command")
+			goto retp
 		}
+
 		err := ce.executor.Interrupt()
 		resp = buildResp(req, "interrupted", err)
+		goto retp
 	// Call-4:客户端安全退出, 释放本地的执行器
 	case "executor.close":
-		if ce.state == execRunning {
-			ce.mu.Unlock()
-			resp = jsonrpc2.BuildError(req, -32002, "executor busy, interrupt first")
-			goto retp
-		}
-		ce.mu.Unlock()
+		err := ce.handleClose()
 
 		c.executorMapMu.Lock()
 		delete(c.executorMap, clientID)
 		c.executorMapMu.Unlock()
 
-		resp = buildResp(req, "bye", nil)
+		resp = buildResp(req, "closed", err)
 		goto retp
 	// Call-?:无效, 远端调用了还不支持的-方法
 	default:
 		resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
 	}
 
-	ce.mu.Unlock()
-
 retp:
 	text, err := resp.String()
 	if err != nil {
@@ -354,11 +372,36 @@ func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
 				idle := (ce.state == execIdle)
 				ce.mu.Unlock()
 
-				if expired && idle {
+				if expired && idle { // 超时且执行器状态空闲, 回收该执行器
+					ce.handleClose()
 					delete(c.executorMap, id)
-				}
-			} // end for
+				} // end if
+			} // end for2
 			c.executorMapMu.Unlock()
 		} // end select
-	} // end for
+	} ////// end for1
+}
+
+func (ce *clientExecutor) handleClose() error {
+	needInterrupt := false
+
+	ce.mu.Lock()
+	switch ce.state {
+	case execIdle:
+		ce.state = execClosing
+	case execRunning:
+		ce.state = execClosing
+		needInterrupt = true
+	case execClosing:
+		ce.mu.Unlock()
+		return nil
+	}
+	ce.mu.Unlock()
+
+	var err error
+	if needInterrupt {
+		err = ce.executor.Interrupt()
+	}
+
+	return err
 }