|
|
@@ -28,16 +28,17 @@ type MQTTCoupler struct {
|
|
|
|
|
|
client mqtt.Client
|
|
|
clientID string
|
|
|
- isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
|
|
|
+ isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
|
|
|
|
|
|
imei string // 设备唯一标识
|
|
|
subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
|
|
|
pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
|
|
|
cwd string // 当前工作目录
|
|
|
|
|
|
- cmdMu sync.Mutex ///// 串行执行的锁
|
|
|
- pending map[int]chan jsonrpc2.Response ///// 等待命令结果
|
|
|
- pendingMu sync.Mutex ///// 等待结果的锁
|
|
|
+ // 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map"
|
|
|
+ cmdMu sync.Mutex // 串行执行的锁
|
|
|
+ pending map[int]chan *jsonrpc2.Response // 等待命令结果
|
|
|
+ pendingMu sync.Mutex // 等待结果的锁
|
|
|
}
|
|
|
|
|
|
func (c *MQTTCoupler) init2() error {
|
|
|
@@ -75,7 +76,7 @@ func (c *MQTTCoupler) init2() error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- c.pending = make(map[int]chan jsonrpc2.Response)
|
|
|
+ c.pending = make(map[int]chan *jsonrpc2.Response)
|
|
|
|
|
|
c.client = mqtt.NewClient(opts)
|
|
|
go c.keepOnline()
|
|
|
@@ -124,8 +125,8 @@ func (c *MQTTCoupler) connect() error {
|
|
|
return token.Error()
|
|
|
}
|
|
|
|
|
|
-func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (jsonrpc2.Response, error) {
|
|
|
- zero := jsonrpc2.Response{}
|
|
|
+func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (*jsonrpc2.Response, error) {
|
|
|
+ zero := &jsonrpc2.Response{}
|
|
|
|
|
|
if c.needSerialize(method) {
|
|
|
c.cmdMu.Lock()
|
|
|
@@ -143,7 +144,7 @@ func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (jsonrpc2.Resp
|
|
|
return zero, err
|
|
|
}
|
|
|
|
|
|
- ch := make(chan jsonrpc2.Response, 1)
|
|
|
+ ch := make(chan *jsonrpc2.Response, 1)
|
|
|
|
|
|
c.pendingMu.Lock()
|
|
|
c.pending[reqID] = ch
|
|
|
@@ -187,8 +188,8 @@ func (c *MQTTCoupler) doCmd(method string, params any, id ...int) (jsonrpc2.Resp
|
|
|
func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
|
|
|
p := msg.Payload()
|
|
|
|
|
|
- var resp jsonrpc2.Response
|
|
|
- if err := json.Unmarshal(p, &resp); err != nil {
|
|
|
+ resp, err := jsonrpc2.ParseResponse(p)
|
|
|
+ if err != nil { //////////// 解析应答失败,忽略不管
|
|
|
return
|
|
|
}
|
|
|
|