5 コミット a7a8c1dbed ... 1dd1d8644a

作者 SHA1 メッセージ 日付
  niujiuru 1dd1d8644a 编写sshd远程运维模块的client代码 2 週間 前
  niujiuru 10d4e0a514 优化去掉多余无意义的初始化代码 2 週間 前
  niujiuru df487368fb 编写sshd远程运维模块的client代码 2 週間 前
  niujiuru 2065555c19 优化注释 2 週間 前
  niujiuru d359af310c 优化注释 2 週間 前
7 ファイル変更184 行追加142 行削除
  1. 1 0
      go.mod
  2. 2 0
      go.sum
  3. 36 122
      sshd/client/client.go
  4. 1 1
      sshd/client/config.go
  5. 106 0
      sshd/client/coupler.go
  6. 24 0
      sshd/client/invoker.go
  7. 14 19
      sshd/sshd.go

+ 1 - 0
go.mod

@@ -6,6 +6,7 @@ require (
 	github.com/alexflint/go-filemutex v1.3.0
 	github.com/beevik/ntp v1.5.0
 	github.com/eclipse/paho.mqtt.golang v1.5.1
+	github.com/google/uuid v1.6.0
 	github.com/jlaffaye/ftp v0.2.0
 	github.com/mattn/go-shellwords v1.0.12
 	github.com/sirupsen/logrus v1.9.3

+ 2 - 0
go.sum

@@ -7,6 +7,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
 github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=

+ 36 - 122
sshd/client/client.go

@@ -4,67 +4,20 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"strings"
-	"sync/atomic"
-	"time"
 
-	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/google/uuid"
 	"hnyfkj.com.cn/rtu/linux/baseapp"
 )
 
 const MODULE_NAME = "YFKJ_SSH_CLIENT"
 
 var (
-	Coupler *MQTTCoupler
-)
-
-const (
-	MqttQos1     byte = 1               //// 消息至少送达一次
-	FastInterval      = 1 * time.Second //// 快速检测时间间隔
-	SlowInterval      = 5 * time.Second //// 慢速检测时间间隔
-)
-
-var (
-	Version               = "0.0.0.1"
 	coupler               *MQTTCoupler
+	Version               = "0.0.0.1"
 	ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
 	ErrIMEINotAvailable   = errors.New("device imei is not available")
 )
 
-type MQTTCoupler struct {
-	broker, username, password string
-	client                     mqtt.Client
-
-	imei     string // 设备唯一标识
-	subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
-	pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
-	cwd      string // 当前工作目录
-
-	ctx    context.Context
-	cancel context.CancelFunc
-
-	isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
-}
-
-func init() {
-	if err := loadAppConfig(); err != nil {
-		msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
-		panic(msg)
-	}
-
-	ctx, cancel := context.WithCancel(context.Background())
-
-	coupler = &MQTTCoupler{
-		broker:      CfgServers.MQTTSrv.Address,
-		username:    CfgServers.MQTTSrv.Username,
-		password:    CfgServers.MQTTSrv.Password,
-		cwd:         "/",
-		ctx:         ctx,
-		cancel:      cancel,
-		isConnected: atomic.Bool{},
-	}
-}
-
 func main() {
 	if baseapp.IsArgsParam("-h") {
 		help()
@@ -82,93 +35,54 @@ func main() {
 		return
 	}
 
-	coupler.imei = devIMEI
-	if err := Coupler.init2(); err != nil {
+	if err := loadAppConfig(); err != nil {
 		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
 		return
 	}
-
-	for {
+	if CfgServers.MQTTSrv.Address == "" {
+		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
+		return
 	}
-}
 
-func help() {
-	h := `
--h 显示帮助提示
--v 当前程序版本
--c 连接目标设备(IMEI), 例如: -c 869523059113051
-`
-
-	fmt.Println(h)
-}
-
-func (c *MQTTCoupler) init2() error {
-	template := "/yfkj/device/rpc/imei"
-	c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
-	c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
-
-	opts := mqtt.NewClientOptions().
-		AddBroker(c.broker).
-		SetUsername(c.username).SetPassword(c.password).
-		SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
-		SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
-		SetOrderMatters(false)
-
-	opts.OnConnect = func(client mqtt.Client) {
-		if !c.isConnected.Swap(true) {
-			baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
-		}
+	ctx, cancel := context.WithCancel(context.Background())
+	coupler = &MQTTCoupler{
+		ctx:      ctx,
+		cancel:   cancel,
+		broker:   CfgServers.MQTTSrv.Address,
+		username: CfgServers.MQTTSrv.Username,
+		password: CfgServers.MQTTSrv.Password,
+		clientID: uuid.New().String(),
+		imei:     devIMEI,
+		cwd:      "/",
 	}
 
-	opts.OnConnectionLost = func(client mqtt.Client, err error) {
-		if c.isConnected.Swap(false) {
-			baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
-		}
+	if err := coupler.init(); err != nil {
+		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
+		return
 	}
 
-	c.client = mqtt.NewClient(opts)
-	go Coupler.keepOnline()
-
-	return nil
+	term() // 启动终端模拟器
 }
 
-func (c *MQTTCoupler) keepOnline() {
-	t := time.NewTimer(FastInterval)
-	defer t.Stop()
-
+// SHELL终端模拟器
+// 1, 连接远程设备, 是否成功
+// 2, 等待用户输入, 封装请求
+// 3, 没有用户输入, 封装心跳
+// 4, 发送请求数据, 远程执行
+// 5, 耗时用户请求, 允许中断, Ctrl+C
+// 6, 等待返回结果, 打印输出
+// 7, 循环等待下次, 直到退出
+func term() {
 	for {
-		select {
-		case <-c.ctx.Done():
-			return
-		case <-t.C:
-			t.Reset(c.tick())
-		} // end select
-	} // end for
-}
-
-func (c *MQTTCoupler) tick() time.Duration {
-	if c.isConnected.Load() {
-		return FastInterval
-	}
-
-	if err := c.connect(); err != nil {
-		baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
 	}
-
-	return SlowInterval
 }
 
-func (c *MQTTCoupler) connect() error {
-	if c.client.IsConnected() {
-		return nil
-	}
-
-	token := c.client.Connect()
-	select {
-	case <-c.ctx.Done():
-		return nil
-	case <-token.Done():
-	}
+func help() {
+	h := `
+-h 显示帮助提示
+-v 当前程序版本
+-c 连接目标设备(IMEI), 例如: -c 869523059113051
+`
 
-	return token.Error()
+	fmt.Println(h)
 }

+ 1 - 1
sshd/client/config.go

@@ -63,7 +63,7 @@ func loadAppConfig() error {
 	return nil
 }
 
-func getCmdTimeoutByPrefix(cmd string) int {
+func GetCmdTimeoutByPrefix(cmd string) int {
 	if cmd == "" || len(CfgServers.Cmds) == 0 {
 		return -1
 	}

+ 106 - 0
sshd/client/coupler.go

@@ -0,0 +1,106 @@
+package main
+
+import (
+	"context"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"hnyfkj.com.cn/rtu/linux/baseapp"
+)
+
+const (
+	MqttQos1     byte = 1               //// 消息至少送达一次
+	FastInterval      = 1 * time.Second //// 快速检测时间间隔
+	SlowInterval      = 5 * time.Second //// 慢速检测时间间隔
+)
+
+type MQTTCoupler struct {
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	broker, username, password string
+
+	client      mqtt.Client
+	clientID    string
+	isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
+
+	imei     string     // 设备唯一标识
+	subTopic string     // 订阅应答主题:/yfkj/device/rpc/imei/ack
+	pubTopic string     // 发布指令主题:/yfkj/device/rpc/imei/cmd
+	cwd      string     // 当前工作目录
+	mu       sync.Mutex // 串行执行的锁
+}
+
+func (c *MQTTCoupler) init() error {
+	template := "/yfkj/device/rpc/imei"
+	c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
+	c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
+
+	opts := mqtt.NewClientOptions().
+		AddBroker(c.broker).
+		SetUsername(c.username).SetPassword(c.password).
+		SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
+		SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
+		SetOrderMatters(false)
+
+	opts.OnConnect = func(client mqtt.Client) {
+		if !c.isConnected.Swap(true) {
+			baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
+		}
+	}
+
+	opts.OnConnectionLost = func(client mqtt.Client, err error) {
+		if c.isConnected.Swap(false) {
+			baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
+		}
+	}
+
+	c.client = mqtt.NewClient(opts)
+	go coupler.keepOnline()
+
+	return nil
+}
+
+func (c *MQTTCoupler) keepOnline() {
+	t := time.NewTimer(FastInterval)
+	defer t.Stop()
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case <-t.C:
+			t.Reset(c.tick())
+		} // end select
+	} // end for
+}
+
+func (c *MQTTCoupler) tick() time.Duration {
+	if c.isConnected.Load() {
+		return FastInterval
+	}
+
+	if err := c.connect(); err != nil {
+		baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
+	}
+
+	return SlowInterval
+}
+
+func (c *MQTTCoupler) connect() error {
+	if c.client.IsConnected() {
+		return nil
+	}
+
+	token := c.client.Connect()
+	select {
+	case <-c.ctx.Done():
+		return nil
+	case <-token.Done():
+	}
+
+	return token.Error()
+}

+ 24 - 0
sshd/client/invoker.go

@@ -0,0 +1,24 @@
+package main
+
+import "hnyfkj.com.cn/rtu/linux/utils/shell"
+
+// 心跳检测
+func (c *MQTTCoupler) Ping() (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}
+
+// 执行命令
+func (c *MQTTCoupler) Exec(
+	cmd string) (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}
+
+// 中断执行
+func (c *MQTTCoupler) Stop() (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}
+
+// 关闭退出
+func (c *MQTTCoupler) Quit() (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}

+ 14 - 19
sshd/sshd.go

@@ -37,22 +37,22 @@ var (
 )
 
 type MQTTCoupler struct {
+	ctx    context.Context
+	cancel context.CancelFunc
+
 	broker, username, password string
-	client                     mqtt.Client
+
+	client      mqtt.Client /// MQTT客户端
+	isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
 
 	imei     string // 设备唯一标识
 	subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
 	pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
 
-	ctx    context.Context
-	cancel context.CancelFunc
-
 	///////// 本地执行器, 允许多客户端, 同一客户端串行的执行指令
 	executorMap   map[string]*clientExecutor
 	executorMapMu sync.Mutex
 
-	isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
-
 	// 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
 	registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
 }
@@ -66,11 +66,11 @@ const (
 )
 
 type clientExecutor struct {
-	id       string
-	executor *shell.Executor
-	mu       sync.Mutex    ///////////////////// 同ID串行执行
-	lastPing time.Time     ///////////////////// 用于超时回收
-	state    executorState ///////////////////// 执行器的状态
+	id       string          /////////////////// 客户端唯一ID
+	executor *shell.Executor /////////////////// 本地的执行器
+	mu       sync.Mutex      /////////////////// 同ID串行执行
+	lastPing time.Time       /////////////////// 用于超时回收
+	state    executorState   /////////////////// 执行器的状态
 }
 
 func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
@@ -82,17 +82,12 @@ func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
 	ctx, cancel := context.WithCancel(context.Background())
 
 	coupler = &MQTTCoupler{
+		ctx:              ctx,
+		cancel:           cancel,
 		broker:           mqttBroker,
 		username:         mqttUsername,
 		password:         mqttPassword,
-		client:           nil,
-		imei:             "",
-		subTopic:         "",
-		pubTopic:         "",
-		ctx:              ctx,
-		cancel:           cancel,
 		executorMap:      make(map[string]*clientExecutor),
-		isConnected:      atomic.Bool{},
 		registerRpcMeths: &singletask.OnceTask{},
 	}
 
@@ -372,7 +367,7 @@ func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
 				idle := (ce.state == execIdle)
 				ce.mu.Unlock()
 
-				if expired && idle { // 超时且执行器状态空闲, 回收该执行器
+				if expired && idle { // 超时且状态空闲时则回收
 					ce.handleClose()
 					delete(c.executorMap, id)
 				} // end if