package sshd import ( "context" "errors" "strings" "sync/atomic" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "hnyfkj.com.cn/rtu/linux/baseapp" "hnyfkj.com.cn/rtu/linux/netmgrd" "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2" "hnyfkj.com.cn/rtu/linux/utils/shell" "hnyfkj.com.cn/rtu/linux/utils/singletask" ) const MODULE_NAME = "sshd_over_mqtt" var ( Coupler *MQTTCoupler ) const ( MqttQos1 byte = 1 //// 消息至少送达一次 FastInterval = 1 * time.Second //// 快速检测时间间隔 SlowInterval = 5 * time.Second //// 慢速检测时间间隔 ) var ( ErrBrokerAddressEmpty = errors.New("mqtt server address is empty") ErrIMEINotAvailable = errors.New("device imei is not available") ) type MQTTCoupler struct { broker, username, password string client mqtt.Client imei string // 设备唯一标识 subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack ctx context.Context cancel context.CancelFunc isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务 // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令 registerRpcMeths *singletask.OnceTask // 注册方法, 单实例 } func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) error { if mqttBroker == "" { return ErrBrokerAddressEmpty } ctx, cancel := context.WithCancel(context.Background()) Coupler = &MQTTCoupler{ broker: mqttBroker, username: mqttUsername, password: mqttPassword, client: nil, imei: "", subTopic: "", pubTopic: "", ctx: ctx, cancel: cancel, isConnected: atomic.Bool{}, registerRpcMeths: &singletask.OnceTask{}, } if err := Coupler.init(); err != nil { return err } go Coupler.keepOnline() return nil } func ModuleExit() { if Coupler != nil { Coupler.cancel() } } func (c *MQTTCoupler) init() error { c.imei = netmgrd.GetIMEI() if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" { return ErrIMEINotAvailable } template := "/yfkj/device/rpc/imei" c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei) c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei) opts := mqtt.NewClientOptions(). AddBroker(c.broker). SetUsername(c.username).SetPassword(c.password). SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true). SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间 SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false) opts.OnConnect = func(client mqtt.Client) { if !c.isConnected.Swap(true) { baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME) } } opts.OnConnectionLost = func(client mqtt.Client, err error) { if c.isConnected.Swap(false) { baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err) } } c.client = mqtt.NewClient(opts) return nil } func (c *MQTTCoupler) keepOnline() { t := time.NewTimer(FastInterval) defer t.Stop() for { select { case <-c.ctx.Done(): return case <-t.C: t.Reset(c.tick()) } // end select } // end for } func (c *MQTTCoupler) tick() time.Duration { if c.isConnected.Load() { return FastInterval } if err := c.connect(); err != nil { baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err) } else { // 注册本地的RPC方法, 供远端调用, 单实例运行 c.registerRpcMeths.Run(c.instRPCMethods, true) } return SlowInterval } func (c *MQTTCoupler) connect() error { if c.client.IsConnected() { return nil } token := c.client.Connect() select { case <-c.ctx.Done(): return nil case <-token.Done(): } return token.Error() } func (c *MQTTCoupler) instRPCMethods() { t := time.NewTicker(time.Second) defer t.Stop() for { if !c.isConnected.Load() || c.ctx.Err() != nil { return } token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests) select { case <-c.ctx.Done(): return case <-token.Done(): } if token.Error() == nil { baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME) break } select { case <-c.ctx.Done(): return case <-t.C: continue } } } func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) { go c.execOneCmd(msg) } func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) { str := string(msg.Payload()) baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str) var resp *jsonrpc2.Response // 预定义一个空的应答 req, err := jsonrpc2.ParseRequest(str) if err != nil || req.ID == nil /* 不接受通知类型的消息 */ { resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "") goto retp } switch req.Method { // Call-1: 心跳, 链路检测,"ping-pong"测试 case "ping": resp = buildResp(req, "pong", nil) // Call-2:在本地shell中执行远程下发的指令 case "shell.execute": params, err := parseShellExecuteParams(req.Params) if err != nil { resp = jsonrpc2.BuildError(req, -32700, err.Error()) goto retp } result, err := shell.Execute(params) resp = buildResp(req, result, err) // Call-?:无效, 远端调用了还不支持的-方法 default: resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "") } retp: text, err := resp.String() if err != nil { baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err) return } token := c.client.Publish(c.pubTopic, MqttQos1, false, text) select { case <-c.ctx.Done(): return case <-token.Done(): } if err := token.Error(); err != nil { baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err) } baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text) }