sshd.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package sshd
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "hnyfkj.com.cn/rtu/linux/baseapp"
  11. "hnyfkj.com.cn/rtu/linux/netmgrd"
  12. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  13. "hnyfkj.com.cn/rtu/linux/utils/shell"
  14. "hnyfkj.com.cn/rtu/linux/utils/singletask"
  15. )
  16. const MODULE_NAME = "YFKJ_SSHD"
  17. var (
  18. Coupler *MQTTCoupler
  19. )
  20. const (
  21. MqttQos1 byte = 1 //// 消息至少送达一次
  22. FastInterval = 1 * time.Second //// 快速检测时间间隔
  23. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  24. ExecutorCheckInterval = 2 * time.Second // 执行器回收检测
  25. ExecutorTimeout = 6 * time.Second // 执行器超时时间
  26. )
  27. var (
  28. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  29. ErrIMEINotAvailable = errors.New("device imei is not available")
  30. )
  31. type MQTTCoupler struct {
  32. broker, username, password string
  33. client mqtt.Client
  34. imei string // 设备唯一标识
  35. subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
  36. pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
  37. ctx context.Context
  38. cancel context.CancelFunc
  39. ///////// 本地执行器, 允许多客户端, 同一客户端串行的执行指令
  40. executorMap map[string]*clientExecutor
  41. executorMapMu sync.Mutex
  42. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  43. // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
  44. registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
  45. }
  46. type executorState int
  47. const (
  48. execIdle executorState = iota // 空闲状态时, 可安全回收
  49. execRunning // 正在执行时, 不允许回收
  50. )
  51. type clientExecutor struct {
  52. id string
  53. executor *shell.Executor
  54. mu sync.Mutex ///////////////////// 同ID串行执行
  55. lastPing time.Time ///////////////////// 用于超时回收
  56. state executorState ///////////////////// 执行器的状态
  57. }
  58. func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
  59. if mqttBroker == "" {
  60. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
  61. return false
  62. }
  63. ctx, cancel := context.WithCancel(context.Background())
  64. Coupler = &MQTTCoupler{
  65. broker: mqttBroker,
  66. username: mqttUsername,
  67. password: mqttPassword,
  68. client: nil,
  69. imei: "",
  70. subTopic: "",
  71. pubTopic: "",
  72. ctx: ctx,
  73. cancel: cancel,
  74. executorMap: make(map[string]*clientExecutor),
  75. isConnected: atomic.Bool{},
  76. registerRpcMeths: &singletask.OnceTask{},
  77. }
  78. if err := Coupler.init2(); err != nil {
  79. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
  80. return false
  81. }
  82. go Coupler.startExecutorReaper(ExecutorCheckInterval, ExecutorTimeout)
  83. go Coupler.keepOnline()
  84. return true
  85. }
  86. func ModuleExit() {
  87. if Coupler != nil {
  88. Coupler.cancel()
  89. }
  90. }
  91. func (c *MQTTCoupler) init2() error {
  92. c.imei = netmgrd.GetIMEI()
  93. if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
  94. return ErrIMEINotAvailable
  95. }
  96. template := "/yfkj/device/rpc/imei"
  97. c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  98. c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  99. opts := mqtt.NewClientOptions().
  100. AddBroker(c.broker).
  101. SetUsername(c.username).SetPassword(c.password).
  102. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  103. SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
  104. SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
  105. opts.OnConnect = func(client mqtt.Client) {
  106. if !c.isConnected.Swap(true) {
  107. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  108. }
  109. }
  110. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  111. if c.isConnected.Swap(false) {
  112. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  113. }
  114. }
  115. c.client = mqtt.NewClient(opts)
  116. return nil
  117. }
  118. func (c *MQTTCoupler) keepOnline() {
  119. t := time.NewTimer(FastInterval)
  120. defer t.Stop()
  121. for {
  122. select {
  123. case <-c.ctx.Done():
  124. return
  125. case <-t.C:
  126. t.Reset(c.tick())
  127. } // end select
  128. } // end for
  129. }
  130. func (c *MQTTCoupler) tick() time.Duration {
  131. if c.isConnected.Load() {
  132. return FastInterval
  133. }
  134. if err := c.connect(); err != nil {
  135. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  136. } else { // 注册本地的RPC方法, 供远端调用, 单实例运行
  137. c.registerRpcMeths.Run(c.instRPCMethods, true)
  138. }
  139. return SlowInterval
  140. }
  141. func (c *MQTTCoupler) connect() error {
  142. if c.client.IsConnected() {
  143. return nil
  144. }
  145. token := c.client.Connect()
  146. select {
  147. case <-c.ctx.Done():
  148. return nil
  149. case <-token.Done():
  150. }
  151. return token.Error()
  152. }
  153. func (c *MQTTCoupler) instRPCMethods() {
  154. t := time.NewTicker(time.Second)
  155. defer t.Stop()
  156. for {
  157. if !c.isConnected.Load() || c.ctx.Err() != nil {
  158. return
  159. }
  160. token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
  161. select {
  162. case <-c.ctx.Done():
  163. return
  164. case <-token.Done():
  165. }
  166. if token.Error() == nil {
  167. baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
  168. break
  169. }
  170. select {
  171. case <-c.ctx.Done():
  172. return
  173. case <-t.C:
  174. continue
  175. }
  176. }
  177. }
  178. func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
  179. go c.execOneCmd(msg)
  180. }
  181. func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
  182. str := string(msg.Payload())
  183. baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
  184. var resp *jsonrpc2.Response // 预定义一个空的应答
  185. var clientID string // 该客户端的唯一标识
  186. var ce *clientExecutor // 该客户端的指令执行器
  187. var exists bool // 判断执行器是否已存在
  188. req, err := jsonrpc2.ParseRequest(str)
  189. if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
  190. resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
  191. goto retp
  192. }
  193. clientID, err = extractClientID(req.Params)
  194. if err != nil {
  195. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrInvalidParams, err.Error())
  196. goto retp
  197. }
  198. c.executorMapMu.Lock()
  199. ce, exists = c.executorMap[clientID]
  200. if !exists {
  201. if len(c.executorMap) >= 3 {
  202. c.executorMapMu.Unlock()
  203. resp = jsonrpc2.BuildError(req, -32000, "connection refused: server has reached maximum client capacity (3/3)")
  204. goto retp
  205. }
  206. ce = &clientExecutor{
  207. id: clientID,
  208. executor: shell.NewExecutor(),
  209. state: execIdle,
  210. }
  211. c.executorMap[clientID] = ce
  212. }
  213. c.executorMapMu.Unlock()
  214. ce.mu.Lock() // 确保同一客户端(ID一样)的指令串行执行
  215. ce.lastPing = time.Now()
  216. switch req.Method {
  217. // Call-1: 心跳, 链路检测,"ping-pong"测试
  218. case "executor.ping":
  219. resp = buildResp(req, "pong", nil)
  220. // Call-2:在本地shell中执行远程下发的指令
  221. case "executor.exec":
  222. params, err := extractShellExecuteParams(req.Params)
  223. if err != nil {
  224. ce.mu.Unlock()
  225. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrParse, err.Error())
  226. goto retp
  227. }
  228. ce.state = execRunning
  229. ce.mu.Unlock()
  230. result, err := ce.executor.Exec(params)
  231. ce.mu.Lock()
  232. ce.state = execIdle
  233. ce.lastPing = time.Now()
  234. ce.mu.Unlock()
  235. resp = buildResp(req, result, err)
  236. goto retp
  237. // Call-3:中断本地shell的执行,等价Ctrl+C
  238. case "executor.interrupt":
  239. if ce.state != execRunning {
  240. resp = jsonrpc2.BuildError(req, -32001, "no running command")
  241. break
  242. }
  243. err := ce.executor.Interrupt()
  244. resp = buildResp(req, "interrupted", err)
  245. // Call-4:客户端安全退出, 释放本地的执行器
  246. case "executor.close":
  247. if ce.state == execRunning {
  248. ce.mu.Unlock()
  249. resp = jsonrpc2.BuildError(req, -32002, "executor busy, interrupt first")
  250. goto retp
  251. }
  252. ce.mu.Unlock()
  253. c.executorMapMu.Lock()
  254. delete(c.executorMap, clientID)
  255. c.executorMapMu.Unlock()
  256. resp = buildResp(req, "bye", nil)
  257. goto retp
  258. // Call-?:无效, 远端调用了还不支持的-方法
  259. default:
  260. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
  261. }
  262. ce.mu.Unlock()
  263. retp:
  264. text, err := resp.String()
  265. if err != nil {
  266. baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
  267. return
  268. }
  269. token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
  270. select {
  271. case <-c.ctx.Done():
  272. return
  273. case <-token.Done():
  274. }
  275. if err := token.Error(); err != nil {
  276. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
  277. }
  278. baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
  279. }
  280. func (c *MQTTCoupler) startExecutorReaper(interval, timeout time.Duration) {
  281. ticker := time.NewTicker(interval)
  282. defer ticker.Stop()
  283. for {
  284. select {
  285. case <-c.ctx.Done():
  286. return
  287. case <-ticker.C:
  288. c.executorMapMu.Lock()
  289. for id, ce := range c.executorMap {
  290. ce.mu.Lock()
  291. expired := time.Since(ce.lastPing) > timeout
  292. idle := (ce.state == execIdle)
  293. ce.mu.Unlock()
  294. if expired && idle {
  295. delete(c.executorMap, id)
  296. }
  297. } // end for
  298. c.executorMapMu.Unlock()
  299. } // end select
  300. } // end for
  301. }