sshd.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package sshd
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. mqtt "github.com/eclipse/paho.mqtt.golang"
  9. "hnyfkj.com.cn/rtu/linux/baseapp"
  10. "hnyfkj.com.cn/rtu/linux/netmgrd"
  11. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  12. "hnyfkj.com.cn/rtu/linux/utils/shell"
  13. "hnyfkj.com.cn/rtu/linux/utils/singletask"
  14. )
  15. const MODULE_NAME = "sshd_over_mqtt"
  16. var (
  17. Coupler *MQTTCoupler
  18. )
  19. const (
  20. MqttQos1 byte = 1 //// 消息至少送达一次
  21. FastInterval = 1 * time.Second //// 快速检测时间间隔
  22. SlowInterval = 5 * time.Second //// 慢速检测时间间隔
  23. )
  24. var (
  25. ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
  26. ErrIMEINotAvailable = errors.New("device imei is not available")
  27. )
  28. type MQTTCoupler struct {
  29. broker, username, password string
  30. client mqtt.Client
  31. imei string // 设备唯一标识
  32. subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
  33. pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
  34. ctx context.Context
  35. cancel context.CancelFunc
  36. isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
  37. // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
  38. registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
  39. }
  40. func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) error {
  41. if mqttBroker == "" {
  42. return ErrBrokerAddressEmpty
  43. }
  44. ctx, cancel := context.WithCancel(context.Background())
  45. Coupler = &MQTTCoupler{
  46. broker: mqttBroker,
  47. username: mqttUsername,
  48. password: mqttPassword,
  49. client: nil,
  50. imei: "",
  51. subTopic: "",
  52. pubTopic: "",
  53. ctx: ctx,
  54. cancel: cancel,
  55. isConnected: atomic.Bool{},
  56. registerRpcMeths: &singletask.OnceTask{},
  57. }
  58. if err := Coupler.init(); err != nil {
  59. return err
  60. }
  61. go Coupler.keepOnline()
  62. return nil
  63. }
  64. func ModuleExit() {
  65. if Coupler != nil {
  66. Coupler.cancel()
  67. }
  68. }
  69. func (c *MQTTCoupler) init() error {
  70. c.imei = netmgrd.GetIMEI()
  71. if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
  72. return ErrIMEINotAvailable
  73. }
  74. template := "/yfkj/device/rpc/imei"
  75. c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
  76. c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
  77. opts := mqtt.NewClientOptions().
  78. AddBroker(c.broker).
  79. SetUsername(c.username).SetPassword(c.password).
  80. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  81. SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
  82. SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
  83. opts.OnConnect = func(client mqtt.Client) {
  84. if !c.isConnected.Swap(true) {
  85. baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
  86. }
  87. }
  88. opts.OnConnectionLost = func(client mqtt.Client, err error) {
  89. if c.isConnected.Swap(false) {
  90. baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
  91. }
  92. }
  93. c.client = mqtt.NewClient(opts)
  94. return nil
  95. }
  96. func (c *MQTTCoupler) keepOnline() {
  97. t := time.NewTimer(FastInterval)
  98. defer t.Stop()
  99. for {
  100. select {
  101. case <-c.ctx.Done():
  102. return
  103. case <-t.C:
  104. t.Reset(c.tick())
  105. } // end select
  106. } // end for
  107. }
  108. func (c *MQTTCoupler) tick() time.Duration {
  109. if c.isConnected.Load() {
  110. return FastInterval
  111. }
  112. if err := c.connect(); err != nil {
  113. baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
  114. } else { // 注册本地的RPC方法, 供远端调用, 单实例运行
  115. c.registerRpcMeths.Run(c.instRPCMethods, true)
  116. }
  117. return SlowInterval
  118. }
  119. func (c *MQTTCoupler) connect() error {
  120. if c.client.IsConnected() {
  121. return nil
  122. }
  123. token := c.client.Connect()
  124. select {
  125. case <-c.ctx.Done():
  126. return nil
  127. case <-token.Done():
  128. }
  129. return token.Error()
  130. }
  131. func (c *MQTTCoupler) instRPCMethods() {
  132. t := time.NewTicker(time.Second)
  133. defer t.Stop()
  134. for {
  135. if !c.isConnected.Load() || c.ctx.Err() != nil {
  136. return
  137. }
  138. token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
  139. select {
  140. case <-c.ctx.Done():
  141. return
  142. case <-token.Done():
  143. }
  144. if token.Error() == nil {
  145. baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
  146. break
  147. }
  148. select {
  149. case <-c.ctx.Done():
  150. return
  151. case <-t.C:
  152. continue
  153. }
  154. }
  155. }
  156. func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
  157. go c.execOneCmd(msg)
  158. }
  159. func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
  160. str := string(msg.Payload())
  161. baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
  162. var resp *jsonrpc2.Response // 预定义一个空的应答
  163. req, err := jsonrpc2.ParseRequest(str)
  164. if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
  165. resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
  166. goto retp
  167. }
  168. switch req.Method {
  169. // Call-1: 心跳, 链路检测,"ping-pong"测试
  170. case "ping":
  171. resp = buildResp(req, "pong", nil)
  172. // Call-2:在本地shell中执行远程下发的指令
  173. case "shell.execute":
  174. params, err := parseShellExecuteParams(req.Params)
  175. if err != nil {
  176. resp = jsonrpc2.BuildError(req, -32700, err.Error())
  177. goto retp
  178. }
  179. result, err := shell.Execute(params)
  180. resp = buildResp(req, result, err)
  181. // Call-?:无效, 远端调用了还不支持的-方法
  182. default:
  183. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
  184. }
  185. retp:
  186. text, err := resp.String()
  187. if err != nil {
  188. baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
  189. return
  190. }
  191. token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
  192. select {
  193. case <-c.ctx.Done():
  194. return
  195. case <-token.Done():
  196. }
  197. if err := token.Error(); err != nil {
  198. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
  199. }
  200. baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
  201. }