sshd.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package sshd
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. "hnyfkj.com.cn/rtu/linux/baseapp"
  13. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  14. "hnyfkj.com.cn/rtu/linux/utils/shell"
  15. "hnyfkj.com.cn/rtu/linux/utils/singletask"
  16. )
  17. const MODULE_NAME = "YFKJ_SSHD"
  18. var (
  19. coupler *MQTTCoupler
  20. )
  21. const (
  22. MqttQos1 byte = 1 //// 消息至少送达一次
  23. FastInterval = 1 * time.Second //// 快速检测时间间隔
  24. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  25. ExecutorCheckInterval = 2 * time.Second // 执行器回收检测
  26. ExecutorTimeout = 6 * time.Second // 执行器超时时间
  27. )
  28. var (
  29. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  30. ErrIMEINotAvailable = errors.New("device imei is not available")
  31. )
  32. type MQTTCoupler struct {
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. broker, username, password string
  36. client mqtt.Client /// MQTT客户端
  37. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  38. imei string // 设备唯一标识
  39. subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
  40. pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
  41. ///////// 本地执行器, 允许多客户端, 同一客户端串行的执行指令
  42. executorMap map[string]*clientExecutor
  43. executorMapMu sync.Mutex
  44. // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
  45. registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
  46. }
  47. type executorState int
  48. const (
  49. execIdle executorState = iota // 空闲状态时, 可安全回收
  50. execRunning // 正在执行时, 不允许回收
  51. execClosing // 表明执行器正在关闭中..
  52. )
  53. type clientExecutor struct {
  54. id string /////////////////// 客户端唯一ID
  55. executor *shell.Executor /////////////////// 本地的执行器
  56. mu sync.Mutex /////////////////// 同ID串行执行
  57. lastPing time.Time /////////////////// 用于超时回收
  58. state executorState /////////////////// 执行器的状态
  59. }
  60. func ModuleInit(mqttBroker, mqttUsername, mqttPassword, devIMEI string) bool {
  61. if mqttBroker == "" {
  62. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
  63. return false
  64. }
  65. ctx, cancel := context.WithCancel(context.Background())
  66. coupler = &MQTTCoupler{
  67. ctx: ctx,
  68. cancel: cancel,
  69. broker: mqttBroker,
  70. username: mqttUsername,
  71. password: mqttPassword,
  72. imei: devIMEI,
  73. executorMap: make(map[string]*clientExecutor),
  74. registerRpcMeths: &singletask.OnceTask{},
  75. }
  76. if err := coupler.init2(); err != nil {
  77. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
  78. return false
  79. }
  80. go coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout)
  81. go coupler.keepOnline()
  82. return true
  83. }
  84. func ModuleExit() {
  85. if coupler != nil {
  86. coupler.cancel()
  87. }
  88. }
  89. func (c *MQTTCoupler) init2() error {
  90. if c.imei == "" {
  91. imeiBytes, _ := os.ReadFile("/var/device_imei.txt")
  92. c.imei = strings.TrimSpace(string(imeiBytes))
  93. if c.imei == "" {
  94. return ErrIMEINotAvailable
  95. }
  96. }
  97. baseapp.Logger.Infof("[%s] ☺✔设备IMEI: %s", MODULE_NAME, c.imei)
  98. template := "/yfkj/device/rpc/imei"
  99. c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  100. c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  101. opts := mqtt.NewClientOptions().
  102. AddBroker(c.broker).
  103. SetUsername(c.username).SetPassword(c.password).
  104. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  105. SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
  106. SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
  107. opts.OnConnect = func(client mqtt.Client) {
  108. if !c.isConnected.Swap(true) {
  109. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  110. c.registerRpcMeths.Run(c.instRPCMethods, true) // 注册本地的RPC方法, 供远端调用, 单实例运行
  111. }
  112. }
  113. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  114. if c.isConnected.Swap(false) {
  115. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  116. }
  117. }
  118. c.client = mqtt.NewClient(opts)
  119. return nil
  120. }
  121. func (c *MQTTCoupler) keepOnline() {
  122. t := time.NewTimer(FastInterval)
  123. defer t.Stop()
  124. for {
  125. select {
  126. case <-c.ctx.Done():
  127. return
  128. case <-t.C:
  129. t.Reset(c.tick())
  130. } // end select
  131. } // end for
  132. }
  133. func (c *MQTTCoupler) tick() time.Duration {
  134. if c.isConnected.Load() {
  135. return FastInterval
  136. }
  137. if err := c.connect(); err != nil {
  138. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  139. }
  140. return SlowInterval
  141. }
  142. func (c *MQTTCoupler) connect() error {
  143. if c.client.IsConnected() {
  144. return nil
  145. }
  146. token := c.client.Connect()
  147. select {
  148. case <-c.ctx.Done():
  149. return nil
  150. case <-token.Done():
  151. }
  152. return token.Error()
  153. }
  154. func (c *MQTTCoupler) instRPCMethods() {
  155. t := time.NewTicker(time.Second)
  156. defer t.Stop()
  157. for {
  158. if !c.isConnected.Load() || c.ctx.Err() != nil {
  159. return
  160. }
  161. token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
  162. select {
  163. case <-c.ctx.Done():
  164. return
  165. case <-token.Done():
  166. }
  167. if token.Error() == nil {
  168. baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
  169. break
  170. }
  171. select {
  172. case <-c.ctx.Done():
  173. return
  174. case <-t.C:
  175. continue
  176. }
  177. }
  178. }
  179. func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
  180. go c.execOneCmd(msg)
  181. }
  182. func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
  183. str := string(msg.Payload())
  184. baseapp.Logger.Debugf("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
  185. var resp *jsonrpc2.Response // 预先定义一个空的应答
  186. var clientID string // 该客户端的|唯一标识|
  187. var ce *clientExecutor // 该客户端的本地执行器
  188. var exists bool // 判断执行器是否已存在
  189. req, err := jsonrpc2.ParseRequest(str)
  190. if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
  191. resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
  192. goto retp
  193. }
  194. clientID, err = extractClientID(req.Params)
  195. if err != nil {
  196. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error())
  197. goto retp
  198. }
  199. c.executorMapMu.Lock()
  200. ce, exists = c.executorMap[clientID]
  201. if !exists {
  202. if len(c.executorMap) >= 3 {
  203. c.executorMapMu.Unlock()
  204. resp = jsonrpc2.BuildError(req, -32000, "connection refused: server has reached maximum client capacity (3/3)")
  205. goto retp
  206. }
  207. ce = &clientExecutor{
  208. id: clientID,
  209. executor: shell.NewExecutor(),
  210. state: execIdle,
  211. }
  212. c.executorMap[clientID] = ce
  213. baseapp.Logger.Infof("[%s] 客户端 %s 登录成功", MODULE_NAME, clientID)
  214. }
  215. c.executorMapMu.Unlock()
  216. ce.mu.Lock()
  217. ce.lastPing = time.Now()
  218. ce.mu.Unlock()
  219. switch req.Method {
  220. // Call-1: 心跳, 链路检测,"ping-pong"测试
  221. case "executor.ping":
  222. resp = jsonrpc2.BuildResponse(req, "pong", nil)
  223. goto retp
  224. // Call-2:在本地shell中执行远程下发的指令
  225. case "executor.exec":
  226. ce.mu.Lock()
  227. params, err := extractShellExecuteParams(req.Params)
  228. if err != nil {
  229. ce.mu.Unlock()
  230. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error())
  231. goto retp
  232. }
  233. if ce.state == execClosing {
  234. ce.mu.Unlock()
  235. resp = jsonrpc2.BuildError(req, -32001, "executor closed")
  236. goto retp
  237. }
  238. if ce.state == execRunning {
  239. ce.mu.Unlock()
  240. resp = jsonrpc2.BuildError(req, -32002, "executor busy")
  241. goto retp
  242. }
  243. cmd := params.Cmd
  244. if strings.ContainsAny(cmd, "&") {
  245. ce.mu.Unlock()
  246. resp = jsonrpc2.BuildError(req, -32003, "prohibit the startup of background tasks")
  247. goto retp
  248. }
  249. if strings.ContainsAny(cmd, "|>;") || strings.Contains(cmd, "\n") {
  250. safeCmd := strings.ReplaceAll(cmd, "'", "'\\''")
  251. cmd = fmt.Sprintf("sh -c '%s'", safeCmd) // 包装成 shell 命令, 支持管道等高级功能
  252. params.Cmd = cmd
  253. }
  254. hostID := params.HostFingerprint
  255. if hostID == "" {
  256. hostID = "unknown"
  257. }
  258. ce.state = execRunning
  259. ce.mu.Unlock()
  260. start := time.Now()
  261. if true { //////// 记录执行日志-执行前
  262. baseapp.Logger.Infof("[%s][▷ EXEC] host=%s, cmd=%q, timeout=%ds",
  263. MODULE_NAME, hostID, params.Cmd, params.Timeout)
  264. }
  265. result, err := ce.executor.Exec(params) // 本地执行用户指令
  266. cost := time.Since(start)
  267. if err != nil { // 记录失败日志-执行后
  268. baseapp.Logger.Warnf("[%s][✖ EXEC] host=%s, cmd=%q, err=%v, cost=%v",
  269. MODULE_NAME, hostID, params.Cmd, err, cost)
  270. } else { ///////// 记录成功日志-执行后
  271. baseapp.Logger.Infof("[%s][✔ EXEC] host=%s, cmd=%q, cost=%v",
  272. MODULE_NAME, hostID, params.Cmd, cost)
  273. }
  274. ce.mu.Lock()
  275. if ce.state != execClosing {
  276. ce.state = execIdle
  277. ce.lastPing = time.Now()
  278. }
  279. ce.mu.Unlock()
  280. resp = jsonrpc2.BuildResponse(req, result, err)
  281. goto retp
  282. // Call-3:中断本地shell的执行,等价Ctrl+C
  283. case "executor.interrupt":
  284. ce.mu.Lock()
  285. running := (ce.state == execRunning)
  286. ce.mu.Unlock()
  287. if !running {
  288. resp = jsonrpc2.BuildError(req, -32004, "no running command")
  289. goto retp
  290. }
  291. err := ce.executor.Interrupt()
  292. resp = jsonrpc2.BuildResponse(req, "interrupted", err)
  293. goto retp
  294. // Call-4:客户端安全退出, 释放本地的执行器
  295. case "executor.close":
  296. err := ce.handleClose()
  297. c.executorMapMu.Lock()
  298. delete(c.executorMap, clientID)
  299. baseapp.Logger.Infof("[%s] 客户端 %s 退出成功", MODULE_NAME, clientID)
  300. c.executorMapMu.Unlock()
  301. resp = jsonrpc2.BuildResponse(req, "closed", err)
  302. goto retp
  303. // Call-?:无效, 远端调用了还不支持的-方法
  304. default:
  305. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
  306. goto retp
  307. }
  308. retp:
  309. text, err := resp.String()
  310. if err != nil {
  311. baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
  312. return
  313. }
  314. token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
  315. select {
  316. case <-c.ctx.Done():
  317. return
  318. case <-token.Done():
  319. }
  320. if err := token.Error(); err != nil {
  321. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
  322. }
  323. baseapp.Logger.Debugf("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
  324. }
  325. func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
  326. ticker := time.NewTicker(interval)
  327. defer ticker.Stop()
  328. for {
  329. select {
  330. case <-c.ctx.Done():
  331. return
  332. case <-ticker.C:
  333. c.executorMapMu.Lock()
  334. for id, ce := range c.executorMap {
  335. ce.mu.Lock()
  336. expired := time.Since(ce.lastPing) > timeout
  337. idle := (ce.state == execIdle)
  338. ce.mu.Unlock()
  339. if expired && idle { // 超时且状态空闲时则回收
  340. ce.handleClose() //// 该函数不能阻塞, 否则锁
  341. delete(c.executorMap, id)
  342. baseapp.Logger.Infof("[%s] 客户端 %s 超时移除", MODULE_NAME, id)
  343. } // end if
  344. } // end for2
  345. c.executorMapMu.Unlock()
  346. } // end select
  347. } ////// end for1
  348. }
  349. func (ce *clientExecutor) handleClose() error {
  350. needInterrupt := false
  351. ce.mu.Lock()
  352. switch ce.state {
  353. case execIdle:
  354. ce.state = execClosing
  355. case execRunning:
  356. ce.state = execClosing
  357. needInterrupt = true
  358. case execClosing:
  359. ce.mu.Unlock()
  360. return nil
  361. }
  362. ce.mu.Unlock()
  363. var err error
  364. if needInterrupt {
  365. err = ce.executor.Interrupt() // 发送"Ctrl+C"
  366. }
  367. return err
  368. }