| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package main
- import (
- "context"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "hnyfkj.com.cn/rtu/linux/baseapp"
- )
- const (
- MqttQos1 byte = 1 //// 消息至少送达一次
- FastInterval = 1 * time.Second //// 快速检测时间间隔
- SlowInterval = 5 * time.Second //// 慢速检测时间间隔
- )
- type MQTTCoupler struct {
- ctx context.Context
- cancel context.CancelFunc
- broker, username, password string
- client mqtt.Client
- clientID string
- isConnected atomic.Bool /////// 标记是否已连接MQTT的Broker服务
- imei string // 设备唯一标识
- subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
- pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
- cwd string // 当前工作目录
- mu sync.Mutex // 串行执行的锁
- }
- func (c *MQTTCoupler) init() error {
- template := "/yfkj/device/rpc/imei"
- c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
- c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
- opts := mqtt.NewClientOptions().
- AddBroker(c.broker).
- SetUsername(c.username).SetPassword(c.password).
- SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
- SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
- SetOrderMatters(false)
- opts.OnConnect = func(client mqtt.Client) {
- if !c.isConnected.Swap(true) {
- baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
- }
- }
- opts.OnConnectionLost = func(client mqtt.Client, err error) {
- if c.isConnected.Swap(false) {
- baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
- }
- }
- c.client = mqtt.NewClient(opts)
- go coupler.keepOnline()
- return nil
- }
- func (c *MQTTCoupler) keepOnline() {
- t := time.NewTimer(FastInterval)
- defer t.Stop()
- for {
- select {
- case <-c.ctx.Done():
- return
- case <-t.C:
- t.Reset(c.tick())
- } // end select
- } // end for
- }
- func (c *MQTTCoupler) tick() time.Duration {
- if c.isConnected.Load() {
- return FastInterval
- }
- if err := c.connect(); err != nil {
- baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
- }
- return SlowInterval
- }
- func (c *MQTTCoupler) connect() error {
- if c.client.IsConnected() {
- return nil
- }
- token := c.client.Connect()
- select {
- case <-c.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
|