|
@@ -11,6 +11,7 @@ import (
|
|
|
|
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
"hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
|
|
"hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
|
|
|
|
|
+ "hnyfkj.com.cn/rtu/linux/utils/shell"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
@@ -19,6 +20,13 @@ const (
|
|
|
SlowInterval = 5 * time.Second //// 慢速检测时间间隔
|
|
SlowInterval = 5 * time.Second //// 慢速检测时间间隔
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+type pendingRequest struct {
|
|
|
|
|
+ ch chan *jsonrpc2.Response
|
|
|
|
|
+ done chan struct{} // 命令请求完成信号: 已应答/超时/被中断
|
|
|
|
|
+ // 命令请求超时/中断后, 使得等待应答的携程能收到结束信号退出
|
|
|
|
|
+ // !避免请求端已经放弃等待, 但应答端仍然在等待结果并写入通道
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
type MQTTCoupler struct {
|
|
type MQTTCoupler struct {
|
|
|
ctx context.Context
|
|
ctx context.Context
|
|
|
cancel context.CancelFunc
|
|
cancel context.CancelFunc
|
|
@@ -36,9 +44,9 @@ type MQTTCoupler struct {
|
|
|
cwd string // 当前工作目录
|
|
cwd string // 当前工作目录
|
|
|
|
|
|
|
|
// 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map"
|
|
// 交互式命令高频执行, 为了性能上的考虑-这里不使用"sync.Map"
|
|
|
- cmdMu sync.Mutex // 串行执行锁
|
|
|
|
|
- pending map[int64]chan *jsonrpc2.Response // 等命令结果
|
|
|
|
|
- pendingMu sync.Mutex // 等待结果锁
|
|
|
|
|
|
|
+ cmdMu sync.Mutex // 串行执行的锁LLLLLL
|
|
|
|
|
+ pending map[int64]*pendingRequest // 等命令的结果RRRRRR
|
|
|
|
|
+ pendingMu sync.Mutex // 等待结果的锁LLLLLL
|
|
|
|
|
|
|
|
interrupted chan struct{} // Ctrl+C 通知当前命令取消的信号
|
|
interrupted chan struct{} // Ctrl+C 通知当前命令取消的信号
|
|
|
}
|
|
}
|
|
@@ -78,7 +86,7 @@ func (c *MQTTCoupler) init2() error {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- c.pending = make(map[int64]chan *jsonrpc2.Response)
|
|
|
|
|
|
|
+ c.pending = make(map[int64]*pendingRequest)
|
|
|
|
|
|
|
|
c.client = mqtt.NewClient(opts)
|
|
c.client = mqtt.NewClient(opts)
|
|
|
go c.keepOnline()
|
|
go c.keepOnline()
|
|
@@ -146,15 +154,19 @@ func (c *MQTTCoupler) doCmd(method string, params any, id ...int64) (*jsonrpc2.R
|
|
|
return zero, err
|
|
return zero, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- ch := make(chan *jsonrpc2.Response, 1)
|
|
|
|
|
|
|
+ pr := &pendingRequest{
|
|
|
|
|
+ ch: make(chan *jsonrpc2.Response, 1),
|
|
|
|
|
+ done: make(chan struct{}),
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
c.pendingMu.Lock()
|
|
c.pendingMu.Lock()
|
|
|
- c.pending[reqID] = ch
|
|
|
|
|
|
|
+ c.pending[reqID] = pr
|
|
|
c.pendingMu.Unlock()
|
|
c.pendingMu.Unlock()
|
|
|
defer func() {
|
|
defer func() {
|
|
|
c.pendingMu.Lock()
|
|
c.pendingMu.Lock()
|
|
|
delete(c.pending, reqID)
|
|
delete(c.pending, reqID)
|
|
|
c.pendingMu.Unlock()
|
|
c.pendingMu.Unlock()
|
|
|
|
|
+ close(pr.done)
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
token := c.client.Publish(c.pubTopic, MqttQos1, false, b)
|
|
token := c.client.Publish(c.pubTopic, MqttQos1, false, b)
|
|
@@ -170,26 +182,32 @@ func (c *MQTTCoupler) doCmd(method string, params any, id ...int64) (*jsonrpc2.R
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if c.isCtrlCommand(method) { // 控制指令, 等待结果或超时
|
|
if c.isCtrlCommand(method) { // 控制指令, 等待结果或超时
|
|
|
- var timer *time.Timer
|
|
|
|
|
- var timeout <-chan time.Time
|
|
|
|
|
- timer = time.NewTimer(3 * time.Second)
|
|
|
|
|
- timeout = timer.C
|
|
|
|
|
|
|
+ timer := time.NewTimer(time.Duration(3) * time.Second)
|
|
|
defer timer.Stop()
|
|
defer timer.Stop()
|
|
|
|
|
|
|
|
select {
|
|
select {
|
|
|
case <-c.ctx.Done():
|
|
case <-c.ctx.Done():
|
|
|
return zero, c.ctx.Err()
|
|
return zero, c.ctx.Err()
|
|
|
- case resp := <-ch:
|
|
|
|
|
|
|
+ case resp := <-pr.ch:
|
|
|
return resp, nil
|
|
return resp, nil
|
|
|
- case <-timeout:
|
|
|
|
|
|
|
+ case <-timer.C:
|
|
|
return zero, fmt.Errorf("command timeout")
|
|
return zero, fmt.Errorf("command timeout")
|
|
|
}
|
|
}
|
|
|
} else { // 用户指令, 等待结果或用户主动中断取消: Ctrl+C
|
|
} else { // 用户指令, 等待结果或用户主动中断取消: Ctrl+C
|
|
|
|
|
+ v, ok := c.getCmdTimeout(params)
|
|
|
|
|
+ if !ok || v <= 0 { // 如果没有指定超时, 则使用默认超时
|
|
|
|
|
+ v = int(shell.DefaultTimeout/time.Second) + 1
|
|
|
|
|
+ }
|
|
|
|
|
+ timer := time.NewTimer(time.Duration(v+1) * time.Second)
|
|
|
|
|
+ defer timer.Stop()
|
|
|
|
|
+
|
|
|
select {
|
|
select {
|
|
|
case <-c.ctx.Done():
|
|
case <-c.ctx.Done():
|
|
|
return zero, c.ctx.Err()
|
|
return zero, c.ctx.Err()
|
|
|
- case resp := <-ch:
|
|
|
|
|
|
|
+ case resp := <-pr.ch:
|
|
|
return resp, nil
|
|
return resp, nil
|
|
|
|
|
+ case <-timer.C:
|
|
|
|
|
+ return zero, fmt.Errorf("command timeout")
|
|
|
case <-c.interrupted:
|
|
case <-c.interrupted:
|
|
|
return zero, fmt.Errorf("command interrupted by user")
|
|
return zero, fmt.Errorf("command interrupted by user")
|
|
|
}
|
|
}
|
|
@@ -210,7 +228,7 @@ func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
|
|
|
respID := *resp.ID
|
|
respID := *resp.ID
|
|
|
|
|
|
|
|
c.pendingMu.Lock()
|
|
c.pendingMu.Lock()
|
|
|
- ch, ok := c.pending[respID]
|
|
|
|
|
|
|
+ pr, ok := c.pending[respID]
|
|
|
c.pendingMu.Unlock()
|
|
c.pendingMu.Unlock()
|
|
|
|
|
|
|
|
if !ok { /////////////// 未找到对应的请求, 忽略不管
|
|
if !ok { /////////////// 未找到对应的请求, 忽略不管
|
|
@@ -218,7 +236,7 @@ func (c *MQTTCoupler) onCmdAck(client mqtt.Client, msg mqtt.Message) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
select {
|
|
|
- case ch <- resp:
|
|
|
|
|
- default:
|
|
|
|
|
|
|
+ case pr.ch <- resp:
|
|
|
|
|
+ case <-pr.done:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|