sshd.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package sshd
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. mqtt "github.com/eclipse/paho.mqtt.golang"
  9. "hnyfkj.com.cn/rtu/linux/baseapp"
  10. "hnyfkj.com.cn/rtu/linux/netmgrd"
  11. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  12. "hnyfkj.com.cn/rtu/linux/utils/shell"
  13. "hnyfkj.com.cn/rtu/linux/utils/singletask"
  14. )
  15. const MODULE_NAME = "YFKJ_SSHD"
  16. var (
  17. Coupler *MQTTCoupler
  18. )
  19. const (
  20. MqttQos1 byte = 1 //// 消息至少送达一次
  21. FastInterval = 1 * time.Second //// 快速检测时间间隔
  22. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  23. )
  24. var (
  25. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  26. ErrIMEINotAvailable = errors.New("device imei is not available")
  27. )
  28. type MQTTCoupler struct {
  29. broker, username, password string
  30. client mqtt.Client
  31. imei string // 设备唯一标识
  32. subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
  33. pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
  34. ctx context.Context
  35. cancel context.CancelFunc
  36. executor *shell.Executor // 本地执行器, 单实例-串行执行指令
  37. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  38. // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
  39. registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
  40. }
  41. func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
  42. if mqttBroker == "" {
  43. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
  44. return false
  45. }
  46. ctx, cancel := context.WithCancel(context.Background())
  47. Coupler = &MQTTCoupler{
  48. broker: mqttBroker,
  49. username: mqttUsername,
  50. password: mqttPassword,
  51. client: nil,
  52. imei: "",
  53. subTopic: "",
  54. pubTopic: "",
  55. ctx: ctx,
  56. cancel: cancel,
  57. executor: shell.NewExecutor(),
  58. isConnected: atomic.Bool{},
  59. registerRpcMeths: &singletask.OnceTask{},
  60. }
  61. if err := Coupler.init(); err != nil {
  62. baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
  63. return false
  64. }
  65. go Coupler.keepOnline()
  66. return true
  67. }
  68. func ModuleExit() {
  69. if Coupler != nil {
  70. Coupler.cancel()
  71. }
  72. }
  73. func (c *MQTTCoupler) init() error {
  74. c.imei = netmgrd.GetIMEI()
  75. if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
  76. return ErrIMEINotAvailable
  77. }
  78. template := "/yfkj/device/rpc/imei"
  79. c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  80. c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  81. opts := mqtt.NewClientOptions().
  82. AddBroker(c.broker).
  83. SetUsername(c.username).SetPassword(c.password).
  84. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  85. SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
  86. SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
  87. opts.OnConnect = func(client mqtt.Client) {
  88. if !c.isConnected.Swap(true) {
  89. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  90. }
  91. }
  92. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  93. if c.isConnected.Swap(false) {
  94. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  95. }
  96. }
  97. c.client = mqtt.NewClient(opts)
  98. return nil
  99. }
  100. func (c *MQTTCoupler) keepOnline() {
  101. t := time.NewTimer(FastInterval)
  102. defer t.Stop()
  103. for {
  104. select {
  105. case <-c.ctx.Done():
  106. return
  107. case <-t.C:
  108. t.Reset(c.tick())
  109. } // end select
  110. } // end for
  111. }
  112. func (c *MQTTCoupler) tick() time.Duration {
  113. if c.isConnected.Load() {
  114. return FastInterval
  115. }
  116. if err := c.connect(); err != nil {
  117. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  118. } else { // 注册本地的RPC方法, 供远端调用, 单实例运行
  119. c.registerRpcMeths.Run(c.instRPCMethods, true)
  120. }
  121. return SlowInterval
  122. }
  123. func (c *MQTTCoupler) connect() error {
  124. if c.client.IsConnected() {
  125. return nil
  126. }
  127. token := c.client.Connect()
  128. select {
  129. case <-c.ctx.Done():
  130. return nil
  131. case <-token.Done():
  132. }
  133. return token.Error()
  134. }
  135. func (c *MQTTCoupler) instRPCMethods() {
  136. t := time.NewTicker(time.Second)
  137. defer t.Stop()
  138. for {
  139. if !c.isConnected.Load() || c.ctx.Err() != nil {
  140. return
  141. }
  142. token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
  143. select {
  144. case <-c.ctx.Done():
  145. return
  146. case <-token.Done():
  147. }
  148. if token.Error() == nil {
  149. baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
  150. break
  151. }
  152. select {
  153. case <-c.ctx.Done():
  154. return
  155. case <-t.C:
  156. continue
  157. }
  158. }
  159. }
  160. func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
  161. c.execOneCmd(msg)
  162. }
  163. func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
  164. str := string(msg.Payload())
  165. baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
  166. var resp *jsonrpc2.Response // 预定义一个空的应答
  167. req, err := jsonrpc2.ParseRequest(str)
  168. if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
  169. resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
  170. goto retp
  171. }
  172. switch req.Method {
  173. // Call-1: 心跳, 链路检测,"ping-pong"测试
  174. case "ping":
  175. resp = buildResp(req, "pong", nil)
  176. // Call-2:在本地shell中执行远程下发的指令
  177. case "shell.execute":
  178. params, err := parseShellExecuteParams(req.Params)
  179. if err != nil {
  180. resp = jsonrpc2.BuildError(req, -32700, err.Error())
  181. goto retp
  182. }
  183. result, err := c.executor.Exec(params)
  184. resp = buildResp(req, result, err)
  185. // Call-3:中断本地shell的执行,等价Ctrl+C
  186. case "shell.interrupt":
  187. err := c.executor.Interrupt()
  188. resp = buildResp(req, "interrupted", err)
  189. // Call-?:无效, 远端调用了还不支持的-方法
  190. default:
  191. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
  192. }
  193. retp:
  194. text, err := resp.String()
  195. if err != nil {
  196. baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
  197. return
  198. }
  199. token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
  200. select {
  201. case <-c.ctx.Done():
  202. return
  203. case <-token.Done():
  204. }
  205. if err := token.Error(); err != nil {
  206. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
  207. }
  208. baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
  209. }