| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package main
- import (
- "context"
- "errors"
- "fmt"
- "strings"
- "sync/atomic"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "hnyfkj.com.cn/rtu/linux/baseapp"
- )
- const MODULE_NAME = "YFKJ_SSH_CLIENT"
- var (
- Coupler *MQTTCoupler
- )
- const (
- MqttQos1 byte = 1 //// 消息至少送达一次
- FastInterval = 1 * time.Second //// 快速检测时间间隔
- SlowInterval = 5 * time.Second //// 慢速检测时间间隔
- )
- var (
- Version = "0.0.0.1"
- coupler *MQTTCoupler
- ErrBrokerAddressEmpty = errors.New("mqtt server address is empty")
- ErrIMEINotAvailable = errors.New("device imei is not available")
- )
- type MQTTCoupler struct {
- broker, username, password string
- client mqtt.Client
- imei string // 设备唯一标识
- subTopic string // 订阅应答主题:/yfkj/device/rpc/imei/ack
- pubTopic string // 发布指令主题:/yfkj/device/rpc/imei/cmd
- cwd string // 当前工作目录
- ctx context.Context
- cancel context.CancelFunc
- isConnected atomic.Bool /// 标记是否已连接MQTT的Broker服务
- }
- func init() {
- if err := loadAppConfig(); err != nil {
- msg := fmt.Sprintf("[%s] 加载配置文件失败: %v!!", MODULE_NAME, err)
- panic(msg)
- }
- ctx, cancel := context.WithCancel(context.Background())
- coupler = &MQTTCoupler{
- broker: CfgServers.MQTTSrv.Address,
- username: CfgServers.MQTTSrv.Username,
- password: CfgServers.MQTTSrv.Password,
- cwd: "/",
- ctx: ctx,
- cancel: cancel,
- isConnected: atomic.Bool{},
- }
- }
- func main() {
- if baseapp.IsArgsParam("-h") {
- help()
- return
- }
- if baseapp.IsArgsParam("-v") {
- fmt.Println("程序版本:", Version, "\n构建时间:", baseapp.BuildTime)
- return
- }
- devIMEI := baseapp.GetArgsParamStr("-c", "")
- if devIMEI == "" {
- help()
- return
- }
- coupler.imei = devIMEI
- if err := Coupler.init2(); err != nil {
- fmt.Printf("[%s] 错误: %v!!", MODULE_NAME, err)
- return
- }
- for {
- }
- }
- func help() {
- h := `
- -h 显示帮助提示
- -v 当前程序版本
- -c 连接目标设备(IMEI), 例如: -c 869523059113051
- `
- fmt.Println(h)
- }
- func (c *MQTTCoupler) init2() error {
- template := "/yfkj/device/rpc/imei"
- c.subTopic = strings.ReplaceAll(template+"/ack", "imei", c.imei)
- c.pubTopic = strings.ReplaceAll(template+"/cmd", "imei", c.imei)
- opts := mqtt.NewClientOptions().
- AddBroker(c.broker).
- SetUsername(c.username).SetPassword(c.password).
- SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
- SetKeepAlive(10 * time.Second).SetPingTimeout(5 * time.Second). // Ping心跳间隔, 超时时间
- SetOrderMatters(false)
- opts.OnConnect = func(client mqtt.Client) {
- if !c.isConnected.Swap(true) {
- baseapp.Logger.Infof("[%s] MQTT Broker连接成功", MODULE_NAME)
- }
- }
- opts.OnConnectionLost = func(client mqtt.Client, err error) {
- if c.isConnected.Swap(false) {
- baseapp.Logger.Warnf("[%s] MQTT Broker连接丢失: %v!", MODULE_NAME, err)
- }
- }
- c.client = mqtt.NewClient(opts)
- go Coupler.keepOnline()
- return nil
- }
- func (c *MQTTCoupler) keepOnline() {
- t := time.NewTimer(FastInterval)
- defer t.Stop()
- for {
- select {
- case <-c.ctx.Done():
- return
- case <-t.C:
- t.Reset(c.tick())
- } // end select
- } // end for
- }
- func (c *MQTTCoupler) tick() time.Duration {
- if c.isConnected.Load() {
- return FastInterval
- }
- if err := c.connect(); err != nil {
- baseapp.Logger.Errorf("[%s] MQTT Broker连接失败: %v!!", MODULE_NAME, err)
- }
- return SlowInterval
- }
- func (c *MQTTCoupler) connect() error {
- if c.client.IsConnected() {
- return nil
- }
- token := c.client.Connect()
- select {
- case <-c.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
|