| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- package sshd
- import (
- "context"
- "errors"
- "strings"
- "sync/atomic"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "hnyfkj.com.cn/rtu/linux/baseapp"
- "hnyfkj.com.cn/rtu/linux/netmgrd"
- "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
- "hnyfkj.com.cn/rtu/linux/utils/shell"
- "hnyfkj.com.cn/rtu/linux/utils/singletask"
- )
- const MODULE_NAME = "SshdOverMqtt"
- var (
- Coupler *MQTTCoupler
- )
- const (
- MqttQos1 byte = 1 //// 消息至少送达一次
- FastInterval = 1 * time.Second //// 快速检测时间间隔
- SlowInterval = 5 * time.Second //// 慢速检测时间间隔
- )
- var (
- ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
- ErrIMEINotAvailable = errors.New("device imei is not available")
- )
- type MQTTCoupler struct {
- broker, username, password string
- client mqtt.Client
- imei string // 设备唯一标识
- subTopic string // 订阅指令主题:/yfkj/device/rpc/imei/cmd
- pubTopic string // 发布应答主题:/yfkj/device/rpc/imei/ack
- ctx context.Context
- cancel context.CancelFunc
- isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
- // 注册本地的远程方法, 连接成功后用于让客户端能够主动下发指令
- registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
- }
- func ModuleInit(mqttBroker, mqttUsername, mqttPassword string) bool {
- if mqttBroker == "" {
- baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, ErrBrokerAddressEmpty)
- return false
- }
- ctx, cancel := context.WithCancel(context.Background())
- Coupler = &MQTTCoupler{
- broker: mqttBroker,
- username: mqttUsername,
- password: mqttPassword,
- client: nil,
- imei: "",
- subTopic: "",
- pubTopic: "",
- ctx: ctx,
- cancel: cancel,
- isConnected: atomic.Bool{},
- registerRpcMeths: &singletask.OnceTask{},
- }
- if err := Coupler.init(); err != nil {
- baseapp.Logger.Errorf("[%s] 初始化远程运维模块失败: %v!!", MODULE_NAME, err)
- return false
- }
- go Coupler.keepOnline()
- return true
- }
- func ModuleExit() {
- if Coupler != nil {
- Coupler.cancel()
- }
- }
- func (c *MQTTCoupler) init() error {
- c.imei = netmgrd.GetIMEI()
- if c.imei == netmgrd.ErrUnknownModemTypeMsg || c.imei == "" {
- return ErrIMEINotAvailable
- }
- template := "/yfkj/device/rpc/imei"
- c.subTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
- c.pubTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
- opts := mqtt.NewClientOptions().
- AddBroker(c.broker).
- SetUsername(c.username).SetPassword(c.password).
- SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
- SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
- SetOrderMatters(false). /*离线遗愿消息*/ SetWill(c.pubTopic, string(`{"jsonrpc": "2.0", "method": "logout"}`), MqttQos1, false)
- opts.OnConnect = func(client mqtt.Client) {
- if !c.isConnected.Swap(true) {
- baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
- }
- }
- opts.OnConnectionLost = func(client mqtt.Client, err error) {
- if c.isConnected.Swap(false) {
- baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
- }
- }
- c.client = mqtt.NewClient(opts)
- return nil
- }
- func (c *MQTTCoupler) keepOnline() {
- t := time.NewTimer(FastInterval)
- defer t.Stop()
- for {
- select {
- case <-c.ctx.Done():
- return
- case <-t.C:
- t.Reset(c.tick())
- } // end select
- } // end for
- }
- func (c *MQTTCoupler) tick() time.Duration {
- if c.isConnected.Load() {
- return FastInterval
- }
- if err := c.connect(); err != nil {
- baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
- } else { // 注册本地的RPC方法, 供远端调用, 单实例运行
- c.registerRpcMeths.Run(c.instRPCMethods, true)
- }
- return SlowInterval
- }
- func (c *MQTTCoupler) connect() error {
- if c.client.IsConnected() {
- return nil
- }
- token := c.client.Connect()
- select {
- case <-c.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
- func (c *MQTTCoupler) instRPCMethods() {
- t := time.NewTicker(time.Second)
- defer t.Stop()
- for {
- if !c.isConnected.Load() || c.ctx.Err() != nil {
- return
- }
- token := c.client.Subscribe(c.subTopic, MqttQos1, c.handleRequests)
- select {
- case <-c.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() == nil {
- baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远端调用...", MODULE_NAME)
- break
- }
- select {
- case <-c.ctx.Done():
- return
- case <-t.C:
- continue
- }
- }
- }
- func (c *MQTTCoupler) handleRequests(client mqtt.Client, msg mqtt.Message) {
- go c.execOneCmd(msg)
- }
- func (c *MQTTCoupler) execOneCmd(msg mqtt.Message) {
- str := string(msg.Payload())
- baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
- var resp *jsonrpc2.Response // 预定义一个空的应答
- req, err := jsonrpc2.ParseRequest(str)
- if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
- resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
- goto retp
- }
- switch req.Method {
- // Call-1: 心跳, 链路检测,"ping-pong"测试
- case "ping":
- resp = buildResp(req, "pong", nil)
- // Call-2:在本地shell中执行远程下发的指令
- case "shell.execute":
- params, err := parseShellExecuteParams(req.Params)
- if err != nil {
- resp = jsonrpc2.BuildError(req, -32700, err.Error())
- goto retp
- }
- result, err := shell.Execute(params)
- resp = buildResp(req, result, err)
- // Call-?:无效, 远端调用了还不支持的-方法
- default:
- resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
- }
- retp:
- text, err := resp.String()
- if err != nil {
- baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
- return
- }
- token := c.client.Publish(c.pubTopic, MqttQos1, false, text)
- select {
- case <-c.ctx.Done():
- return
- case <-token.Done():
- }
- if err := token.Error(); err != nil {
- baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
- }
- baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
- }
|