|
@@ -1 +1,247 @@
|
|
|
package sshd
|
|
package sshd
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "context"
|
|
|
|
|
+ "errors"
|
|
|
|
|
+ "strings"
|
|
|
|
|
+ "sync/atomic"
|
|
|
|
|
+ "time"
|
|
|
|
|
+
|
|
|
|
|
+ mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
|
+ "hnyfkj.com.cn/rtu/linux/baseapp"
|
|
|
|
|
+ "hnyfkj.com.cn/rtu/linux/netmgrd"
|
|
|
|
|
+ "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
|
|
|
|
|
+ "hnyfkj.com.cn/rtu/linux/utils/shell"
|
|
|
|
|
+ "hnyfkj.com.cn/rtu/linux/utils/singletask"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+const MODULE_NAME = "sshd_over_mqtt"
|
|
|
|
|
+
|
|
|
|
|
+var (
|
|
|
|
|
+ Coupler *MQTTCoupler
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+const (
|
|
|
|
|
+ MqttQos1 byte = 1 //// 消息至少送达一次
|
|
|
|
|
+ FastInterval = 1 * time.Second //// 快速检测时间间隔
|
|
|
|
|
+ SlowInterval = 5 * time.Second //// 慢速检测时间间隔
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+var (
|
|
|
|
|
+ ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
|
|
|
|
|
+ ErrIMEINotAvailable = errors.New("device imei is not available")
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+type MQTTCoupler struct {
|
|
|
|
|
+ broker, username, password string
|
|
|
|
|
+ client mqtt.Client
|
|
|
|
|
+
|
|
|
|
|
+ imei string // 设备唯一标识
|
|
|
|
|
+ subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
|
|
|
|
|
+ pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
|
|
|
|
|
+
|
|
|
|
|
+ ctx context.Context
|
|
|
|
|
+ cancel context.CancelFunc
|
|
|
|
|
+
|
|
|
|
|
+ isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
|
|
|
|
|
+
|
|
|
|
|
+ // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
|
|
|
|
|
+ registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) error {
|
|
|
|
|
+ if mqttBroker == "" {
|
|
|
|
|
+ return ErrBrokerAddressEmpty
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
+
|
|
|
|
|
+ Coupler = &MQTTCoupler{
|
|
|
|
|
+ broker: mqttBroker,
|
|
|
|
|
+ username: mqttUsername,
|
|
|
|
|
+ password: mqttPassword,
|
|
|
|
|
+ client: nil,
|
|
|
|
|
+ imei: "",
|
|
|
|
|
+ subTopic: "",
|
|
|
|
|
+ pubTopic: "",
|
|
|
|
|
+ ctx: ctx,
|
|
|
|
|
+ cancel: cancel,
|
|
|
|
|
+ isConnected: atomic.Bool{},
|
|
|
|
|
+ registerRpcMeths: &singletask.OnceTask{},
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if err := Coupler.init(); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ go Coupler.keepOnline()
|
|
|
|
|
+
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func ModuleExit() {
|
|
|
|
|
+ if Coupler != nil {
|
|
|
|
|
+ Coupler.cancel()
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *MQTTCoupler) init() error {
|
|
|
|
|
+ c.imei = netmgrd.GetIMEI()
|
|
|
|
|
+ if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
|
|
|
|
|
+ return ErrIMEINotAvailable
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ template := "/yfkj/device/rpc/imei"
|
|
|
|
|
+ c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
|
|
|
|
|
+ c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
|
|
|
|
|
+
|
|
|
|
|
+ opts := mqtt.NewClientOptions().
|
|
|
|
|
+ AddBroker(c.broker).
|
|
|
|
|
+ SetUsername(c.username).SetPassword(c.password).
|
|
|
|
|
+ SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
|
|
|
|
|
+ SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
|
|
|
|
|
+ SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
|
|
|
|
|
+
|
|
|
|
|
+ opts.OnConnect = func(client mqtt.Client) {
|
|
|
|
|
+ if !c.isConnected.Swap(true) {
|
|
|
|
|
+ baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ opts.OnConnectionLost = func(client mqtt.Client, err error) {
|
|
|
|
|
+ if c.isConnected.Swap(false) {
|
|
|
|
|
+ baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ c.client = mqtt.NewClient(opts)
|
|
|
|
|
+
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *MQTTCoupler) keepOnline() {
|
|
|
|
|
+ t := time.NewTimer(FastInterval)
|
|
|
|
|
+ defer t.Stop()
|
|
|
|
|
+
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-c.ctx.Done():
|
|
|
|
|
+ return
|
|
|
|
|
+ case <-t.C:
|
|
|
|
|
+ t.Reset(c.tick())
|
|
|
|
|
+ } // end select
|
|
|
|
|
+ } // end for
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *MQTTCoupler) tick() time.Duration {
|
|
|
|
|
+ if c.isConnected.Load() {
|
|
|
|
|
+ return FastInterval
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if err := c.connect(); err != nil {
|
|
|
|
|
+ baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
|
|
|
|
|
+ } else { // 注册本地的RPC方法, 供远端调用, 单实例运行
|
|
|
|
|
+ c.registerRpcMeths.Run(c.instRPCMethods, true)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return SlowInterval
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *MQTTCoupler) connect() error {
|
|
|
|
|
+ if c.client.IsConnected() {
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ token := c.client.Connect()
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-c.ctx.Done():
|
|
|
|
|
+ return nil
|
|
|
|
|
+ case <-token.Done():
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return token.Error()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *MQTTCoupler) instRPCMethods() {
|
|
|
|
|
+ t := time.NewTicker(time.Second)
|
|
|
|
|
+ defer t.Stop()
|
|
|
|
|
+
|
|
|
|
|
+ for {
|
|
|
|
|
+ if !c.isConnected.Load() || c.ctx.Err() != nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-c.ctx.Done():
|
|
|
|
|
+ return
|
|
|
|
|
+ case <-token.Done():
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if token.Error() == nil {
|
|
|
|
|
+ baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-c.ctx.Done():
|
|
|
|
|
+ return
|
|
|
|
|
+ case <-t.C:
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
|
+ go c.execOneCmd(msg)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
|
|
|
|
|
+ str := string(msg.Payload())
|
|
|
|
|
+ baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
|
|
|
|
|
+
|
|
|
|
|
+ var resp *jsonrpc2.Response // 预定义一个空的应答
|
|
|
|
|
+
|
|
|
|
|
+ req, err := jsonrpc2.ParseRequest(str)
|
|
|
|
|
+ if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
|
|
|
|
|
+ resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
|
|
|
|
|
+ goto retp
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ switch req.Method {
|
|
|
|
|
+ // Call-1: 心跳, 链路检测,"ping-pong"测试
|
|
|
|
|
+ case "ping":
|
|
|
|
|
+ resp = buildResp(req, "pong", nil)
|
|
|
|
|
+ // Call-2:在本地shell中执行远程下发的指令
|
|
|
|
|
+ case "shell.execute":
|
|
|
|
|
+ params, err := parseShellExecuteParams(req.Params)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ resp = jsonrpc2.BuildError(req, -32700, err.Error())
|
|
|
|
|
+ goto retp
|
|
|
|
|
+ }
|
|
|
|
|
+ result, err := shell.Execute(params)
|
|
|
|
|
+ resp = buildResp(req, result, err)
|
|
|
|
|
+ // Call-?:无效, 远端调用了还不支持的-方法
|
|
|
|
|
+ default:
|
|
|
|
|
+ resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+retp:
|
|
|
|
|
+ text, err := resp.String()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-c.ctx.Done():
|
|
|
|
|
+ return
|
|
|
|
|
+ case <-token.Done():
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if err := token.Error(); err != nil {
|
|
|
|
|
+ baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
|
|
|
|
|
+}
|