sshd.go 11 KB


  1. package sshd
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. "hnyfkj.com.cn/rtu/linux/baseapp"
  13. "hnyfkj.com.cn/rtu/linux/netmgrd"
  14. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  15. "hnyfkj.com.cn/rtu/linux/utils/shell"
  16. "hnyfkj.com.cn/rtu/linux/utils/singletask"
  17. )
  18. const MODULE_NAME = "YFKJ_SSHD"
  19. var (
  20. coupler *MQTTCoupler
  21. )
  22. const (
  23. MqttQos1 byte = 1 //// 消息至少送达一次
  24. FastInterval = 1 * time.Second //// 快速检测时间间隔
  25. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  26. ExecutorCheckInterval = 2 * time.Second // 执行器回收检测
  27. ExecutorTimeout = 6 * time.Second // 执行器超时时间
  28. )
  29. var (
  30. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  31. ErrIMEINotAvailable = errors.New("device imei is not available")
  32. )
  33. type MQTTCoupler struct {
  34. ctx context.Context
  35. cancel context.CancelFunc
  36. broker, username, password string
  37. client mqtt.Client /// MQTT客户端
  38. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  39. imei string // 设备唯一标识
  40. subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
  41. pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
  42. ///////// 本地执行器, 允许多客户端, 同一客户端串行的执行指令
  43. executorMap map[string]*clientExecutor
  44. executorMapMu sync.Mutex
  45. // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
  46. registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
  47. }
  48. type executorState int
  49. const (
  50. execIdle executorState = iota // 空闲状态时, 可安全回收
  51. execRunning // 正在执行时, 不允许回收
  52. execClosing // 表明执行器正在关闭中..
  53. )
  54. type clientExecutor struct {
  55. id string /////////////////// 客户端唯一ID
  56. executor *shell.Executor /////////////////// 本地的执行器
  57. mu sync.Mutex /////////////////// 同ID串行执行
  58. lastPing time.Time /////////////////// 用于超时回收
  59. state executorState /////////////////// 执行器的状态
  60. }
  61. func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
  62. if mqttBroker == "" {
  63. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
  64. return false
  65. }
  66. ctx, cancel := context.WithCancel(context.Background())
  67. coupler = &MQTTCoupler{
  68. ctx: ctx,
  69. cancel: cancel,
  70. broker: mqttBroker,
  71. username: mqttUsername,
  72. password: mqttPassword,
  73. executorMap: make(map[string]*clientExecutor),
  74. registerRpcMeths: &singletask.OnceTask{},
  75. }
  76. if err := coupler.init2(); err != nil {
  77. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
  78. return false
  79. }
  80. go coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout)
  81. go coupler.keepOnline()
  82. return true
  83. }
  84. func ModuleExit() {
  85. if coupler != nil {
  86. coupler.cancel()
  87. }
  88. }
  89. func (c *MQTTCoupler) init2() error {
  90. imeiBytes, _ := os.ReadFile("/var/device_imei.txt")
  91. c.imei = strings.TrimSpace(string(imeiBytes))
  92. if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
  93. return ErrIMEINotAvailable
  94. }
  95. baseapp.Logger.Infof("[%s] ☺✔设备IMEI: %s", MODULE_NAME, c.imei)
  96. template := "/yfkj/device/rpc/imei"
  97. c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  98. c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  99. opts := mqtt.NewClientOptions().
  100. AddBroker(c.broker).
  101. SetUsername(c.username).SetPassword(c.password).
  102. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  103. SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
  104. SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
  105. opts.OnConnect = func(client mqtt.Client) {
  106. if !c.isConnected.Swap(true) {
  107. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  108. c.registerRpcMeths.Run(c.instRPCMethods, true) // 注册本地的RPC方法, 供远端调用, 单实例运行
  109. }
  110. }
  111. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  112. if c.isConnected.Swap(false) {
  113. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  114. }
  115. }
  116. c.client = mqtt.NewClient(opts)
  117. return nil
  118. }
  119. func (c *MQTTCoupler) keepOnline() {
  120. t := time.NewTimer(FastInterval)
  121. defer t.Stop()
  122. for {
  123. select {
  124. case <-c.ctx.Done():
  125. return
  126. case <-t.C:
  127. t.Reset(c.tick())
  128. } // end select
  129. } // end for
  130. }
  131. func (c *MQTTCoupler) tick() time.Duration {
  132. if c.isConnected.Load() {
  133. return FastInterval
  134. }
  135. if err := c.connect(); err != nil {
  136. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  137. }
  138. return SlowInterval
  139. }
  140. func (c *MQTTCoupler) connect() error {
  141. if c.client.IsConnected() {
  142. return nil
  143. }
  144. token := c.client.Connect()
  145. select {
  146. case <-c.ctx.Done():
  147. return nil
  148. case <-token.Done():
  149. }
  150. return token.Error()
  151. }
  152. func (c *MQTTCoupler) instRPCMethods() {
  153. t := time.NewTicker(time.Second)
  154. defer t.Stop()
  155. for {
  156. if !c.isConnected.Load() || c.ctx.Err() != nil {
  157. return
  158. }
  159. token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
  160. select {
  161. case <-c.ctx.Done():
  162. return
  163. case <-token.Done():
  164. }
  165. if token.Error() == nil {
  166. baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
  167. break
  168. }
  169. select {
  170. case <-c.ctx.Done():
  171. return
  172. case <-t.C:
  173. continue
  174. }
  175. }
  176. }
  177. func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
  178. go c.execOneCmd(msg)
  179. }
  180. func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
  181. str := string(msg.Payload())
  182. baseapp.Logger.Debugf("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
  183. var resp *jsonrpc2.Response // 预先定义一个空的应答
  184. var clientID string // 该客户端的|唯一标识|
  185. var ce *clientExecutor // 该客户端的本地执行器
  186. var exists bool // 判断执行器是否已存在
  187. req, err := jsonrpc2.ParseRequest(str)
  188. if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
  189. resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
  190. goto retp
  191. }
  192. clientID, err = extractClientID(req.Params)
  193. if err != nil {
  194. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error())
  195. goto retp
  196. }
  197. c.executorMapMu.Lock()
  198. ce, exists = c.executorMap[clientID]
  199. if !exists {
  200. if len(c.executorMap) >= 3 {
  201. c.executorMapMu.Unlock()
  202. resp = jsonrpc2.BuildError(req, -32000, "connection refused: server has reached maximum client capacity (3/3)")
  203. goto retp
  204. }
  205. ce = &clientExecutor{
  206. id: clientID,
  207. executor: shell.NewExecutor(),
  208. state: execIdle,
  209. }
  210. c.executorMap[clientID] = ce
  211. baseapp.Logger.Infof("[%s] 客户端 %s 登录成功", MODULE_NAME, clientID)
  212. }
  213. c.executorMapMu.Unlock()
  214. ce.mu.Lock()
  215. ce.lastPing = time.Now()
  216. ce.mu.Unlock()
  217. switch req.Method {
  218. // Call-1: 心跳, 链路检测,"ping-pong"测试
  219. case "executor.ping":
  220. resp = buildResp(req, "pong", nil)
  221. goto retp
  222. // Call-2:在本地shell中执行远程下发的指令
  223. case "executor.exec":
  224. ce.mu.Lock()
  225. params, err := extractShellExecuteParams(req.Params)
  226. if err != nil {
  227. ce.mu.Unlock()
  228. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrParse, err.Error())
  229. goto retp
  230. }
  231. if ce.state == execClosing {
  232. ce.mu.Unlock()
  233. resp = jsonrpc2.BuildError(req, -32001, "executor closed")
  234. goto retp
  235. }
  236. if ce.state == execRunning {
  237. ce.mu.Unlock()
  238. resp = jsonrpc2.BuildError(req, -32002, "executor busy")
  239. goto retp
  240. }
  241. cmd := params.Cmd
  242. if strings.ContainsAny(cmd, "&") {
  243. ce.mu.Unlock()
  244. resp = jsonrpc2.BuildError(req, -32003, "prohibit the startup of background tasks")
  245. goto retp
  246. }
  247. if strings.ContainsAny(cmd, "|>;") || strings.Contains(cmd, "\n") {
  248. safeCmd := strings.ReplaceAll(cmd, "'", "'\\''")
  249. cmd = fmt.Sprintf("sh -c '%s'", safeCmd) // 包装成 shell 命令, 支持管道等高级功能
  250. params.Cmd = cmd
  251. }
  252. hostID := params.HostFingerprint
  253. if hostID == "" {
  254. hostID = "unknown"
  255. }
  256. ce.state = execRunning
  257. ce.mu.Unlock()
  258. start := time.Now()
  259. if true { //////// 记录执行日志-执行前
  260. baseapp.Logger.Infof("[%s][▷ EXEC] host=%s, cmd=%q, timeout=%ds",
  261. MODULE_NAME, hostID, params.Cmd, params.Timeout)
  262. }
  263. result, err := ce.executor.Exec(params) // 本地执行用户指令
  264. cost := time.Since(start)
  265. if err != nil { // 记录失败日志-执行后
  266. baseapp.Logger.Warnf("[%s][✖ EXEC] host=%s, cmd=%q, err=%v, cost=%v",
  267. MODULE_NAME, hostID, params.Cmd, err, cost)
  268. } else { ///////// 记录成功日志-执行后
  269. baseapp.Logger.Infof("[%s][✔ EXEC] host=%s, cmd=%q, cost=%v",
  270. MODULE_NAME, hostID, params.Cmd, cost)
  271. }
  272. ce.mu.Lock()
  273. if ce.state != execClosing {
  274. ce.state = execIdle
  275. ce.lastPing = time.Now()
  276. }
  277. ce.mu.Unlock()
  278. resp = buildResp(req, result, err)
  279. goto retp
  280. // Call-3:中断本地shell的执行,等价Ctrl+C
  281. case "executor.interrupt":
  282. ce.mu.Lock()
  283. running := (ce.state == execRunning)
  284. ce.mu.Unlock()
  285. if !running {
  286. resp = jsonrpc2.BuildError(req, -32004, "no running command")
  287. goto retp
  288. }
  289. err := ce.executor.Interrupt()
  290. resp = buildResp(req, "interrupted", err)
  291. goto retp
  292. // Call-4:客户端安全退出, 释放本地的执行器
  293. case "executor.close":
  294. err := ce.handleClose()
  295. c.executorMapMu.Lock()
  296. delete(c.executorMap, clientID)
  297. baseapp.Logger.Infof("[%s] 客户端 %s 退出成功", MODULE_NAME, clientID)
  298. c.executorMapMu.Unlock()
  299. resp = buildResp(req, "closed", err)
  300. goto retp
  301. // Call-?:无效, 远端调用了还不支持的-方法
  302. default:
  303. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
  304. goto retp
  305. }
  306. retp:
  307. text, err := resp.String()
  308. if err != nil {
  309. baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
  310. return
  311. }
  312. token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
  313. select {
  314. case <-c.ctx.Done():
  315. return
  316. case <-token.Done():
  317. }
  318. if err := token.Error(); err != nil {
  319. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
  320. }
  321. baseapp.Logger.Debugf("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
  322. }
  323. func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
  324. ticker := time.NewTicker(interval)
  325. defer ticker.Stop()
  326. for {
  327. select {
  328. case <-c.ctx.Done():
  329. return
  330. case <-ticker.C:
  331. c.executorMapMu.Lock()
  332. for id, ce := range c.executorMap {
  333. ce.mu.Lock()
  334. expired := time.Since(ce.lastPing) > timeout
  335. idle := (ce.state == execIdle)
  336. ce.mu.Unlock()
  337. if expired && idle { // 超时且状态空闲时则回收
  338. ce.handleClose() //// 该函数不能阻塞, 否则锁
  339. delete(c.executorMap, id)
  340. baseapp.Logger.Infof("[%s] 客户端 %s 超时移除", MODULE_NAME, id)
  341. } // end if
  342. } // end for2
  343. c.executorMapMu.Unlock()
  344. } // end select
  345. } ////// end for1
  346. }
  347. func (ce *clientExecutor) handleClose() error {
  348. needInterrupt := false
  349. ce.mu.Lock()
  350. switch ce.state {
  351. case execIdle:
  352. ce.state = execClosing
  353. case execRunning:
  354. ce.state = execClosing
  355. needInterrupt = true
  356. case execClosing:
  357. ce.mu.Unlock()
  358. return nil
  359. }
  360. ce.mu.Unlock()
  361. var err error
  362. if needInterrupt {
  363. err = ce.executor.Interrupt() // 发送"Ctrl+C"
  364. }
  365. return err
  366. }