sshd.go 9.8 KB


  1. package sshd
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "hnyfkj.com.cn/rtu/linux/baseapp"
  11. "hnyfkj.com.cn/rtu/linux/netmgrd"
  12. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  13. "hnyfkj.com.cn/rtu/linux/utils/shell"
  14. "hnyfkj.com.cn/rtu/linux/utils/singletask"
  15. )
  16. const MODULE_NAME = "YFKJ_SSHD"
  17. var (
  18. coupler *MQTTCoupler
  19. )
  20. const (
  21. MqttQos1 byte = 1 //// 消息至少送达一次
  22. FastInterval = 1 * time.Second //// 快速检测时间间隔
  23. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  24. ExecutorCheckInterval = 2 * time.Second // 执行器回收检测
  25. ExecutorTimeout = 6 * time.Second // 执行器超时时间
  26. )
  27. var (
  28. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  29. ErrIMEINotAvailable = errors.New("device imei is not available")
  30. )
  31. type MQTTCoupler struct {
  32. broker, username, password string
  33. client mqtt.Client
  34. imei string // 设备唯一标识
  35. subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
  36. pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
  37. ctx context.Context
  38. cancel context.CancelFunc
  39. ///////// 本地执行器, 允许多客户端, 同一客户端串行的执行指令
  40. executorMap map[string]*clientExecutor
  41. executorMapMu sync.Mutex
  42. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  43. // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
  44. registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
  45. }
  46. type executorState int
  47. const (
  48. execIdle executorState = iota // 空闲状态时, 可安全回收
  49. execRunning // 正在执行时, 不允许回收
  50. execClosing // 表明执行器正在关闭中..
  51. )
  52. type clientExecutor struct {
  53. id string
  54. executor *shell.Executor
  55. mu sync.Mutex ///////////////////// 同ID串行执行
  56. lastPing time.Time ///////////////////// 用于超时回收
  57. state executorState ///////////////////// 执行器的状态
  58. }
  59. func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
  60. if mqttBroker == "" {
  61. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
  62. return false
  63. }
  64. ctx, cancel := context.WithCancel(context.Background())
  65. coupler = &MQTTCoupler{
  66. broker: mqttBroker,
  67. username: mqttUsername,
  68. password: mqttPassword,
  69. client: nil,
  70. imei: "",
  71. subTopic: "",
  72. pubTopic: "",
  73. ctx: ctx,
  74. cancel: cancel,
  75. executorMap: make(map[string]*clientExecutor),
  76. isConnected: atomic.Bool{},
  77. registerRpcMeths: &singletask.OnceTask{},
  78. }
  79. if err := coupler.init2(); err != nil {
  80. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
  81. return false
  82. }
  83. go coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout)
  84. go coupler.keepOnline()
  85. return true
  86. }
  87. func ModuleExit() {
  88. if coupler != nil {
  89. coupler.cancel()
  90. }
  91. }
  92. func (c *MQTTCoupler) init2() error {
  93. c.imei = netmgrd.GetIMEI()
  94. if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
  95. return ErrIMEINotAvailable
  96. }
  97. template := "/yfkj/device/rpc/imei"
  98. c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  99. c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  100. opts := mqtt.NewClientOptions().
  101. AddBroker(c.broker).
  102. SetUsername(c.username).SetPassword(c.password).
  103. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  104. SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
  105. SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
  106. opts.OnConnect = func(client mqtt.Client) {
  107. if !c.isConnected.Swap(true) {
  108. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  109. }
  110. }
  111. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  112. if c.isConnected.Swap(false) {
  113. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  114. }
  115. }
  116. c.client = mqtt.NewClient(opts)
  117. return nil
  118. }
  119. func (c *MQTTCoupler) keepOnline() {
  120. t := time.NewTimer(FastInterval)
  121. defer t.Stop()
  122. for {
  123. select {
  124. case <-c.ctx.Done():
  125. return
  126. case <-t.C:
  127. t.Reset(c.tick())
  128. } // end select
  129. } // end for
  130. }
  131. func (c *MQTTCoupler) tick() time.Duration {
  132. if c.isConnected.Load() {
  133. return FastInterval
  134. }
  135. if err := c.connect(); err != nil {
  136. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  137. } else { // 注册本地的RPC方法, 供远端调用, 单实例运行
  138. c.registerRpcMeths.Run(c.instRPCMethods, true)
  139. }
  140. return SlowInterval
  141. }
  142. func (c *MQTTCoupler) connect() error {
  143. if c.client.IsConnected() {
  144. return nil
  145. }
  146. token := c.client.Connect()
  147. select {
  148. case <-c.ctx.Done():
  149. return nil
  150. case <-token.Done():
  151. }
  152. return token.Error()
  153. }
  154. func (c *MQTTCoupler) instRPCMethods() {
  155. t := time.NewTicker(time.Second)
  156. defer t.Stop()
  157. for {
  158. if !c.isConnected.Load() || c.ctx.Err() != nil {
  159. return
  160. }
  161. token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
  162. select {
  163. case <-c.ctx.Done():
  164. return
  165. case <-token.Done():
  166. }
  167. if token.Error() == nil {
  168. baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
  169. break
  170. }
  171. select {
  172. case <-c.ctx.Done():
  173. return
  174. case <-t.C:
  175. continue
  176. }
  177. }
  178. }
  179. func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
  180. go c.execOneCmd(msg)
  181. }
  182. func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
  183. str := string(msg.Payload())
  184. baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
  185. var resp *jsonrpc2.Response // 预先定义一个空的应答
  186. var clientID string // 该客户端的|唯一标识|
  187. var ce *clientExecutor // 该客户端的本地执行器
  188. var exists bool // 判断执行器是否已存在
  189. req, err := jsonrpc2.ParseRequest(str)
  190. if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
  191. resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
  192. goto retp
  193. }
  194. clientID, err = extractClientID(req.Params)
  195. if err != nil {
  196. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error())
  197. goto retp
  198. }
  199. c.executorMapMu.Lock()
  200. ce, exists = c.executorMap[clientID]
  201. if !exists {
  202. if len(c.executorMap) >= 3 {
  203. c.executorMapMu.Unlock()
  204. resp = jsonrpc2.BuildError(req, -32000, "connection refused: server has reached maximum client capacity (3/3)")
  205. goto retp
  206. }
  207. ce = &clientExecutor{
  208. id: clientID,
  209. executor: shell.NewExecutor(),
  210. state: execIdle,
  211. }
  212. c.executorMap[clientID] = ce
  213. }
  214. c.executorMapMu.Unlock()
  215. ce.mu.Lock()
  216. ce.lastPing = time.Now()
  217. ce.mu.Unlock()
  218. switch req.Method {
  219. // Call-1: 心跳, 链路检测,"ping-pong"测试
  220. case "executor.ping":
  221. resp = buildResp(req, "pong", nil)
  222. goto retp
  223. // Call-2:在本地shell中执行远程下发的指令
  224. case "executor.exec":
  225. ce.mu.Lock()
  226. params, err := extractShellExecuteParams(req.Params)
  227. if err != nil {
  228. ce.mu.Unlock()
  229. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrParse, err.Error())
  230. goto retp
  231. }
  232. if ce.state == execClosing {
  233. ce.mu.Unlock()
  234. resp = jsonrpc2.BuildError(req, -32001, "executor closed")
  235. goto retp
  236. }
  237. if ce.state == execRunning {
  238. ce.mu.Unlock()
  239. resp = jsonrpc2.BuildError(req, -32002, "executor busy")
  240. goto retp
  241. }
  242. ce.state = execRunning
  243. ce.mu.Unlock()
  244. result, err := ce.executor.Exec(params) // 本地执行用户指令
  245. ce.mu.Lock()
  246. if ce.state != execClosing {
  247. ce.state = execIdle
  248. ce.lastPing = time.Now()
  249. }
  250. ce.mu.Unlock()
  251. resp = buildResp(req, result, err)
  252. goto retp
  253. // Call-3:中断本地shell的执行,等价Ctrl+C
  254. case "executor.interrupt":
  255. ce.mu.Lock()
  256. running := (ce.state == execRunning)
  257. ce.mu.Unlock()
  258. if !running {
  259. resp = jsonrpc2.BuildError(req, -32003, "no running command")
  260. goto retp
  261. }
  262. err := ce.executor.Interrupt()
  263. resp = buildResp(req, "interrupted", err)
  264. goto retp
  265. // Call-4:客户端安全退出, 释放本地的执行器
  266. case "executor.close":
  267. err := ce.handleClose()
  268. c.executorMapMu.Lock()
  269. delete(c.executorMap, clientID)
  270. c.executorMapMu.Unlock()
  271. resp = buildResp(req, "closed", err)
  272. goto retp
  273. // Call-?:无效, 远端调用了还不支持的-方法
  274. default:
  275. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
  276. }
  277. retp:
  278. text, err := resp.String()
  279. if err != nil {
  280. baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
  281. return
  282. }
  283. token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
  284. select {
  285. case <-c.ctx.Done():
  286. return
  287. case <-token.Done():
  288. }
  289. if err := token.Error(); err != nil {
  290. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
  291. }
  292. baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
  293. }
  294. func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
  295. ticker := time.NewTicker(interval)
  296. defer ticker.Stop()
  297. for {
  298. select {
  299. case <-c.ctx.Done():
  300. return
  301. case <-ticker.C:
  302. c.executorMapMu.Lock()
  303. for id, ce := range c.executorMap {
  304. ce.mu.Lock()
  305. expired := time.Since(ce.lastPing) > timeout
  306. idle := (ce.state == execIdle)
  307. ce.mu.Unlock()
  308. if expired && idle { // 超时且状态空闲时则回收
  309. ce.handleClose()
  310. delete(c.executorMap, id)
  311. } // end if
  312. } // end for2
  313. c.executorMapMu.Unlock()
  314. } // end select
  315. } ////// end for1
  316. }
  317. func (ce *clientExecutor) handleClose() error {
  318. needInterrupt := false
  319. ce.mu.Lock()
  320. switch ce.state {
  321. case execIdle:
  322. ce.state = execClosing
  323. case execRunning:
  324. ce.state = execClosing
  325. needInterrupt = true
  326. case execClosing:
  327. ce.mu.Unlock()
  328. return nil
  329. }
  330. ce.mu.Unlock()
  331. var err error
  332. if needInterrupt {
  333. err = ce.executor.Interrupt()
  334. }
  335. return err
  336. }