Explorar el Código

编写sshd远程运维模块的client代码

niujiuru hace 2 semanas
padre
commit
1dd1d8644a
Se han modificado 4 ficheros con 166 adiciones y 134 borrados
  1. 35 133
      sshd/client/client.go
  2. 1 1
      sshd/client/config.go
  3. 106 0
      sshd/client/coupler.go
  4. 24 0
      sshd/client/invoker.go

+ 35 - 133
sshd/client/client.go

@@ -4,72 +4,20 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"strings"
-	"sync"
-	"sync/atomic"
-	"time"
 
-	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
 	"hnyfkj.com.cn/rtu/linux/baseapp"
-	"hnyfkj.com.cn/rtu/linux/utils/shell"
 )
 
 const MODULE_NAME = "YFKJ_SSH_CLIENT"
 
 var (
-	Coupler *MQTTCoupler
-)
-
-const (
-	MqttQos1     byte = 1               //// 消息至少送达一次
-	FastInterval      = 1 * time.Second //// 快速检测时间间隔
-	SlowInterval      = 5 * time.Second //// 慢速检测时间间隔
-)
-
-var (
-	Version               = "0.0.0.1"
 	coupler               *MQTTCoupler
+	Version               = "0.0.0.1"
 	ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
 	ErrIMEINotAvailable   = errors.New("device imei is not available")
 )
 
-type MQTTCoupler struct {
-	ctx    context.Context
-	cancel context.CancelFunc
-
-	broker, username, password string
-
-	client      mqtt.Client
-	clientID    string
-	isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
-
-	imei     string     // 设备唯一标识
-	subTopic string     // 订阅应答主题:/yfkj/device/rpc/imei/ack
-	pubTopic string     // 发布指令主题:/yfkj/device/rpc/imei/cmd
-	cwd      string     // 当前工作目录
-	mu       sync.Mutex // 串行执行的锁
-}
-
-func init() {
-	if err := loadAppConfig(); err != nil {
-		msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
-		panic(msg)
-	}
-
-	ctx, cancel := context.WithCancel(context.Background())
-
-	coupler = &MQTTCoupler{
-		ctx:      ctx,
-		cancel:   cancel,
-		broker:   CfgServers.MQTTSrv.Address,
-		username: CfgServers.MQTTSrv.Username,
-		password: CfgServers.MQTTSrv.Password,
-		clientID: uuid.New().String(),
-		cwd:      "/",
-	}
-}
-
 func main() {
 	if baseapp.IsArgsParam("-h") {
 		help()
@@ -87,100 +35,54 @@ func main() {
 		return
 	}
 
-	coupler.imei = devIMEI
-	if err := Coupler.init2(); err != nil {
+	if err := loadAppConfig(); err != nil {
 		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
 		return
 	}
-
-	for {
+	if CfgServers.MQTTSrv.Address == "" {
+		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
+		return
 	}
-}
-
-func help() {
-	h := `
--h 显示帮助提示
--v 当前程序版本
--c 连接目标设备(IMEI), 例如: -c 869523059113051
-`
-
-	fmt.Println(h)
-}
 
-func (c *MQTTCoupler) init2() error {
-	template := "/yfkj/device/rpc/imei"
-	c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
-	c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
-
-	opts := mqtt.NewClientOptions().
-		AddBroker(c.broker).
-		SetUsername(c.username).SetPassword(c.password).
-		SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
-		SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
-		SetOrderMatters(false)
-
-	opts.OnConnect = func(client mqtt.Client) {
-		if !c.isConnected.Swap(true) {
-			baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
-		}
+	ctx, cancel := context.WithCancel(context.Background())
+	coupler = &MQTTCoupler{
+		ctx:      ctx,
+		cancel:   cancel,
+		broker:   CfgServers.MQTTSrv.Address,
+		username: CfgServers.MQTTSrv.Username,
+		password: CfgServers.MQTTSrv.Password,
+		clientID: uuid.New().String(),
+		imei:     devIMEI,
+		cwd:      "/",
 	}
 
-	opts.OnConnectionLost = func(client mqtt.Client, err error) {
-		if c.isConnected.Swap(false) {
-			baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
-		}
+	if err := coupler.init(); err != nil {
+		fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
+		return
 	}
 
-	c.client = mqtt.NewClient(opts)
-	go Coupler.keepOnline()
-
-	return nil
+	term() // 启动终端模拟器
 }
 
-func (c *MQTTCoupler) keepOnline() {
-	t := time.NewTimer(FastInterval)
-	defer t.Stop()
-
+// SHELL终端模拟器
+// 1, 连接远程设备, 是否成功
+// 2, 等待用户输入, 封装请求
+// 3, 没有用户输入, 封装心跳
+// 4, 发送请求数据, 远程执行
+// 5, 耗时用户请求, 允许中断, Ctrl+C
+// 6, 等待返回结果, 打印输出
+// 7, 循环等待下次, 直到退出
+func term() {
 	for {
-		select {
-		case <-c.ctx.Done():
-			return
-		case <-t.C:
-			t.Reset(c.tick())
-		} // end select
-	} // end for
-}
-
-func (c *MQTTCoupler) tick() time.Duration {
-	if c.isConnected.Load() {
-		return FastInterval
-	}
-
-	if err := c.connect(); err != nil {
-		baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
 	}
-
-	return SlowInterval
 }
 
-func (c *MQTTCoupler) connect() error {
-	if c.client.IsConnected() {
-		return nil
-	}
-
-	token := c.client.Connect()
-	select {
-	case <-c.ctx.Done():
-		return nil
-	case <-token.Done():
-	}
-
-	return token.Error()
-}
-
-func (c *MQTTCoupler) ExecuteShell(cmd string) shell.ExecuteResult {
-	c.mu.Lock()
-	defer c.mu.Unlock()
+func help() {
+	h := `
+-h 显示帮助提示
+-v 当前程序版本
+-c 连接目标设备(IMEI), 例如: -c 869523059113051
+`
 
-	return shell.ExecuteResult{}
+	fmt.Println(h)
 }

+ 1 - 1
sshd/client/config.go

@@ -63,7 +63,7 @@ func loadAppConfig() error {
 	return nil
 }
 
-func getCmdTimeoutByPrefix(cmd string) int {
+func GetCmdTimeoutByPrefix(cmd string) int {
 	if cmd == "" || len(CfgServers.Cmds) == 0 {
 		return -1
 	}

+ 106 - 0
sshd/client/coupler.go

@@ -0,0 +1,106 @@
+package main
+
+import (
+	"context"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"hnyfkj.com.cn/rtu/linux/baseapp"
+)
+
+const (
+	MqttQos1     byte = 1               //// 消息至少送达一次
+	FastInterval      = 1 * time.Second //// 快速检测时间间隔
+	SlowInterval      = 5 * time.Second //// 慢速检测时间间隔
+)
+
+type MQTTCoupler struct {
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	broker, username, password string
+
+	client      mqtt.Client
+	clientID    string
+	isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
+
+	imei     string     // 设备唯一标识
+	subTopic string     // 订阅应答主题:/yfkj/device/rpc/imei/ack
+	pubTopic string     // 发布指令主题:/yfkj/device/rpc/imei/cmd
+	cwd      string     // 当前工作目录
+	mu       sync.Mutex // 串行执行的锁
+}
+
+func (c *MQTTCoupler) init() error {
+	template := "/yfkj/device/rpc/imei"
+	c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
+	c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
+
+	opts := mqtt.NewClientOptions().
+		AddBroker(c.broker).
+		SetUsername(c.username).SetPassword(c.password).
+		SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
+		SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
+		SetOrderMatters(false)
+
+	opts.OnConnect = func(client mqtt.Client) {
+		if !c.isConnected.Swap(true) {
+			baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
+		}
+	}
+
+	opts.OnConnectionLost = func(client mqtt.Client, err error) {
+		if c.isConnected.Swap(false) {
+			baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
+		}
+	}
+
+	c.client = mqtt.NewClient(opts)
+	go coupler.keepOnline()
+
+	return nil
+}
+
+func (c *MQTTCoupler) keepOnline() {
+	t := time.NewTimer(FastInterval)
+	defer t.Stop()
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case <-t.C:
+			t.Reset(c.tick())
+		} // end select
+	} // end for
+}
+
+func (c *MQTTCoupler) tick() time.Duration {
+	if c.isConnected.Load() {
+		return FastInterval
+	}
+
+	if err := c.connect(); err != nil {
+		baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
+	}
+
+	return SlowInterval
+}
+
+func (c *MQTTCoupler) connect() error {
+	if c.client.IsConnected() {
+		return nil
+	}
+
+	token := c.client.Connect()
+	select {
+	case <-c.ctx.Done():
+		return nil
+	case <-token.Done():
+	}
+
+	return token.Error()
+}

+ 24 - 0
sshd/client/invoker.go

@@ -0,0 +1,24 @@
+package main
+
+import "hnyfkj.com.cn/rtu/linux/utils/shell"
+
+// 心跳检测
+func (c *MQTTCoupler) Ping() (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}
+
+// 执行命令
+func (c *MQTTCoupler) Exec(
+	cmd string) (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}
+
+// 中断执行
+func (c *MQTTCoupler) Stop() (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}
+
+// 关闭退出
+func (c *MQTTCoupler) Quit() (shell.ExecuteResult, error) {
+	return shell.ExecuteResult{}, nil
+}