client.go 5.0 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "sync/atomic"
  12. "syscall"
  13. "time"
  14. "github.com/google/uuid"
  15. "github.com/peterh/liner"
  16. "hnyfkj.com.cn/rtu/linux/baseapp"
  17. )
  18. const MODULE_NAME = "YFKJ_SSH_CLIENT"
  19. var (
  20. coupler *MQTTCoupler
  21. Version = "1.0.0.1"
  22. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  23. ErrIMEINotAvailable = errors.New("device imei is not available")
  24. )
  25. func main() {
  26. if baseapp.IsArgsParam("-h") {
  27. help()
  28. return
  29. }
  30. if baseapp.IsArgsParam("-v") {
  31. fmt.Println("程序版本:", Version, "\n构建时间:", baseapp.BuildTime)
  32. return
  33. }
  34. devIMEI := baseapp.GetArgsParamStr("-c", "")
  35. if devIMEI == "" {
  36. help()
  37. return
  38. }
  39. if err := loadAppConfig(); err != nil {
  40. fmt.Printf("[%s] 错误: %v!!\n", MODULE_NAME, err)
  41. return
  42. }
  43. if CfgServers.MQTTSrv.Address == "" {
  44. fmt.Printf("[%s] 错误: %v!!\n", MODULE_NAME, ErrBrokerAddressEmpty)
  45. return
  46. }
  47. ctx, cancel := context.WithCancel(context.Background())
  48. coupler = &MQTTCoupler{
  49. ctx: ctx,
  50. cancel: cancel,
  51. broker: CfgServers.MQTTSrv.Address,
  52. username: CfgServers.MQTTSrv.Username,
  53. password: CfgServers.MQTTSrv.Password,
  54. clientID: uuid.New().String(),
  55. imei: devIMEI,
  56. cwd: "/",
  57. }
  58. if err := coupler.init2(); err != nil {
  59. fmt.Printf("[%s] 错误: %v\n!!", MODULE_NAME, err)
  60. return
  61. }
  62. var pingState atomic.Bool
  63. heartbeatLoop(&pingState) // -启动-设备在线-心跳检测-
  64. for {
  65. if pingState.Load() { //// 等待成功连接上目标设备卍
  66. break
  67. }
  68. fmt.Printf("[%s] 正在尝试连接设备...\n", MODULE_NAME)
  69. time.Sleep(1 * time.Second)
  70. }
  71. term(&pingState) /////////////////// 启动终端模拟器卍
  72. }
  73. func term(pingState *atomic.Bool) {
  74. var executing atomic.Bool // 是否有正在执行中的命令
  75. var interrupted atomic.Bool // 用户是否按键取消了命令
  76. interruptLoop(&executing, &interrupted) // Ctrl+C卍
  77. printWelcome()
  78. line := liner.NewLiner()
  79. defer line.Close()
  80. line.SetCtrlCAborts(false)
  81. line.SetTabCompletionStyle(liner.TabCircular)
  82. historyFile := "history.txt"
  83. if f, err := os.Open(historyFile); err == nil {
  84. line.ReadHistory(f)
  85. f.Close()
  86. }
  87. defer func() {
  88. if f, err := os.Create(historyFile); err == nil {
  89. line.WriteHistory(f)
  90. f.Close()
  91. }
  92. }()
  93. for {
  94. if !pingState.Load() {
  95. fmt.Printf("[%s] 目标设备连接丢失!!\n", MODULE_NAME)
  96. break
  97. }
  98. if interrupted.Swap(false) {
  99. fmt.Println("^C")
  100. }
  101. prompt := fmt.Sprintf("root@%s:%s# ", coupler.imei, coupler.cwd)
  102. input, err := line.Prompt(prompt) /// 等待用户输入指令
  103. if err == nil {
  104. goto inputOK
  105. }
  106. if err == io.EOF { ////////////////// Ctrl+D 按键处理
  107. _, _ = coupler.quit()
  108. fmt.Println("bye")
  109. break
  110. }
  111. fmt.Println("读取用户输入失败:", err)
  112. continue
  113. inputOK:
  114. input = strings.TrimSpace(input)
  115. if input == "" {
  116. continue
  117. }
  118. line.AppendHistory(input) ///// 保存用户输入的历史命令
  119. if input == "exit" || input == "quit" {
  120. _, _ = coupler.quit()
  121. break
  122. }
  123. executing.Store(true)
  124. result, err := coupler.exec(input)
  125. executing.Store(false)
  126. if err != nil {
  127. fmt.Printf("执行错误: %v\n", err)
  128. continue
  129. }
  130. if result.Stdout != "" {
  131. fmt.Print(result.Stdout)
  132. }
  133. if result.Stderr != "" {
  134. fmt.Fprintln(os.Stderr, result.Stderr)
  135. }
  136. if result.ExitCode == 124 {
  137. fmt.Println("command timeout")
  138. }
  139. if result.Cwd != "" {
  140. coupler.cwd = result.Cwd
  141. }
  142. }
  143. }
  144. func help() {
  145. h := `
  146. -h 显示帮助提示
  147. -v 当前程序版本
  148. -c 连接目标设备(IMEI), 例如: -c 869523059113051
  149. `
  150. fmt.Println(h)
  151. }
  152. func interruptLoop(executing *atomic.Bool, interrupted *atomic.Bool) {
  153. sigCh := make(chan os.Signal, 1)
  154. signal.Notify(sigCh, syscall.SIGINT)
  155. go func() {
  156. for range sigCh {
  157. interrupted.Store(true)
  158. if executing.Load() {
  159. _, _ = coupler.stop()
  160. }
  161. }
  162. }()
  163. }
  164. func heartbeatLoop(pingState *atomic.Bool) {
  165. go func() {
  166. ticker := time.NewTicker(1 * time.Second)
  167. defer ticker.Stop()
  168. pingFailCount := 0
  169. pong := ""
  170. for range ticker.C {
  171. resp, err := coupler.ping()
  172. if err == nil && resp.Error == nil && resp.Result != nil &&
  173. json.Unmarshal(resp.Result, &pong) == nil && pong == "pong" {
  174. pingState.Store(true)
  175. pingFailCount = 0
  176. } else {
  177. pingFailCount++
  178. if pingFailCount >= 3 { ///// 连续3次ping失败, 可以认为设备离线
  179. pingState.Store(false)
  180. }
  181. } // end if
  182. } // end for
  183. }()
  184. }
  185. func printWelcome() {
  186. welcome := `
  187. _ _ _ _ _ _ _ _
  188. | \ | (_) | | | (_) | |
  189. | \| |_| |_ ___| |__ _| | | ___
  190. | . | | __/ __| '_ \| | | |/ _ \
  191. | |\ | | || (__| | | | | | | __/
  192. |_| \_|_|\__\___|_| |_|_|_|_|\___|
  193. ══════════════════════════════════
  194. 云飞科技RTU远程运维终端
  195. ══════════════════════════════════
  196. 提示: 输入'quit'命令, 退出终端
  197. `
  198. fmt.Println(welcome)
  199. }