ソースを参照

优化修改sshd模块代码, 修复昨天自测试中出现的问题

niujiuru 2 週間 前
コミット
3844813ff4
3 ファイル変更61 行追加28 行削除
  1. 54 23
      sshd/client/client.go
  2. 3 3
      sshd/client/config.ini
  3. 4 2
      sshd/sshd.go

+ 54 - 23
sshd/client/client.go

@@ -3,6 +3,7 @@ package main
 import (
 	"bufio"
 	"context"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -69,35 +70,40 @@ func main() {
 		return
 	}
 
-	term() // 启动终端模拟器卍
-}
+	fmt.Printf("[%s] 正在连接目标设备...\n", MODULE_NAME)
 
-// SHELL终端模拟器
-// 1, 连接远程设备, 是否成功
-// 2, 等待用户输入, 封装请求
-// 3, 没有用户输入, 封装心跳
-// 4, 发送请求数据, 远程执行
-// 5, 耗时用户请求, 允许中断, Ctrl+C
-// 6, 等待返回结果, 打印输出
-// 7, 循环等待下次, 直到退出
-func term() {
 	var pingState atomic.Bool
-	var executing atomic.Bool
+	heartbeatLoop(&pingState) // -启动-设备在线-心跳检测-
+	for {
+		if pingState.Load() { //// 等待成功连接上目标设备卍
+			break
+		}
+		fmt.Printf("[%s] 无法连接目标设备!!\n", MODULE_NAME)
+		time.Sleep(1 * time.Second)
+	}
+
+	term(&pingState) /////////////////// 启动终端模拟器卍
+}
+
+func term(pingState *atomic.Bool) {
+	var executing atomic.Bool   // 是否有正在执行中的命令
+	var interrupted atomic.Bool // 用户是否按键取消了命令
 
-	// -在线-心跳检测
-	heartbeatLoop(&pingState)
+	interruptLoop(&executing, &interrupted) // Ctrl+C卍
 
-	// Ctrl+C中断处理
-	interruptLoop(&executing)
+	printWelcome()
 
 	reader := bufio.NewReader(os.Stdin)
 	for {
 		if !pingState.Load() {
-			fmt.Printf("[%s] 无法连接目标设备!!\n", MODULE_NAME)
+			fmt.Printf("[%s] 目标设备连接丢失!!\n", MODULE_NAME)
 			break
 		}
 
 		fmt.Print("\033[?25h") // 显示光标
+		if interrupted.Swap(false) {
+			fmt.Println("^C")
+		}
 		fmt.Printf("root@%s:%s# ", coupler.imei, coupler.cwd)
 
 		input, err := reader.ReadString('\n')
@@ -153,16 +159,16 @@ func help() {
 	fmt.Println(h)
 }
 
-func interruptLoop(executing *atomic.Bool) {
+func interruptLoop(executing *atomic.Bool, interrupted *atomic.Bool) {
 	sigCh := make(chan os.Signal, 1)
 	signal.Notify(sigCh, syscall.SIGINT)
 	go func() {
 		for range sigCh {
+			interrupted.Store(true)
 			if executing.Load() {
 				_, _ = coupler.stop()
 			}
-			fmt.Print("\n^C\n")
-		} // end for
+		}
 	}()
 }
 
@@ -170,13 +176,38 @@ func heartbeatLoop(pingState *atomic.Bool) {
 	go func() {
 		ticker := time.NewTicker(1 * time.Second)
 		defer ticker.Stop()
+
+		pingFailCount := 0
+		pong := ""
 		for range ticker.C {
 			resp, err := coupler.ping()
-			if err == nil && resp.Error == nil && resp.Result != nil && string(resp.Result) == "pong" {
+			if err == nil && resp.Error == nil && resp.Result != nil &&
+				json.Unmarshal(resp.Result, &pong) == nil && pong == "pong" {
 				pingState.Store(true)
+				pingFailCount = 0
 			} else {
-				pingState.Store(false)
-			} // end if
+				pingFailCount++
+				if pingFailCount >= 3 { ///// 连续3次ping失败, 可以认为设备离线
+					pingState.Store(false)
+				}
+			}
 		} // end for
 	}()
 }
+
+func printWelcome() {
+	welcome := `
+ _   _ _ _       _     _ _ _     
+| \ | (_) |     | |   (_) | |    
+|  \| |_| |_ ___| |__  _| | | ___
+| .   | | __/ __| '_ \| | | |/ _ \
+| |\  | | || (__| | | | | | |  __/
+|_| \_|_|\__\___|_| |_|_|_|_|\___|
+
+═══════════════════════════════════
+      云飞科技 RTU远程运维终端
+═══════════════════════════════════
+提示: 输入'quit'命令, 可退出终端模拟器
+`
+	fmt.Println(welcome)
+}

+ 3 - 3
sshd/client/config.ini

@@ -1,10 +1,10 @@
 [MQTTSrv]
 ; 地址端口
-BrokerAddress = tcp://8.136.98.49:61883
+BrokerAddress = ws://8.136.98.49:8083/mqtt
 ; 用户名称
-Username = user
+Username = websk_yfkj
 ; 接入密码
-Password = f335bf402c655ee5fd2b5300905124e
+Password = yfkj2026
 
 ; 耗时命令, 命令超时的时间单位为秒
 ; 有其它耗时命令时,可仿照扩展追加

+ 4 - 2
sshd/sshd.go

@@ -130,6 +130,7 @@ func (c *MQTTCoupler) init2() error {
 	opts.OnConnect = func(client mqtt.Client) {
 		if !c.isConnected.Swap(true) {
 			baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
+			c.registerRpcMeths.Run(c.instRPCMethods, true) // 注册本地的RPC方法, 供远端调用, 单实例运行
 		}
 	}
 
@@ -165,8 +166,6 @@ func (c *MQTTCoupler) tick() time.Duration {
 
 	if err := c.connect(); err != nil {
 		baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
-	} else { // 注册本地的RPC方法, 供远端调用, 单实例运行
-		c.registerRpcMeths.Run(c.instRPCMethods, true)
 	}
 
 	return SlowInterval
@@ -256,6 +255,7 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 			state:    execIdle,
 		}
 		c.executorMap[clientID] = ce
+		baseapp.Logger.Infof("[%s] 客户端 %s 登录成功", MODULE_NAME, clientID)
 	}
 	c.executorMapMu.Unlock()
 
@@ -325,6 +325,7 @@ func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
 
 		c.executorMapMu.Lock()
 		delete(c.executorMap, clientID)
+		baseapp.Logger.Infof("[%s] 客户端 %s 退出成功", MODULE_NAME, clientID)
 		c.executorMapMu.Unlock()
 
 		resp = buildResp(req, "closed", err)
@@ -374,6 +375,7 @@ func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
 				if expired && idle { // 超时且状态空闲时则回收
 					ce.handleClose() //// 该函数不能阻塞, 否则锁
 					delete(c.executorMap, id)
+					baseapp.Logger.Infof("[%s] 客户端 %s 超时移除", MODULE_NAME, id)
 				} // end if
 			} // end for2
 			c.executorMapMu.Unlock()