| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- package reporter
- import (
- "context"
- "encoding/json"
- "fmt"
- "io/fs"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/sirupsen/logrus"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- gps "hnyfkj.com.cn/rtu/linux/air530z"
- mcu "hnyfkj.com.cn/rtu/xy_v/mcu_ctrl_board"
- "hnyfkj.com.cn/rtu/linux/baseapp"
- "hnyfkj.com.cn/rtu/linux/netmgrd"
- "hnyfkj.com.cn/rtu/linux/utils/ftpclient"
- "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
- "hnyfkj.com.cn/rtu/linux/utils/singletask"
- )
- const MODULE_NAME = "Reporter"
- var (
- Reporter *MQTTReporter
- )
- const (
- MqttQos1 byte = 1 // 消息至少送达一次
- FastInterval = 1 * time.Second // 快速检测时间间隔
- SlowInterval = 5 * time.Second // 慢速检测时间间隔
- )
- type MQTTReporter struct {
- client mqtt.Client
- ctx context.Context
- cancel context.CancelFunc
- isLogin atomic.Bool // 标记是否已成功登录MQTT后端服务器
- dui string // 登录成功后服务端返回的设备唯一ID
- inheritDUI string // 继承的历史ID, 可选可为空(换板时)
- sensorFileLock sync.Mutex // 用于保护"传感器"数据文件的读和写
- // 主动上报的后台任务, 登录成功时用于照片补录和上报通知类消息
- reuploadHistTask *singletask.OnceTask // 补录数据, 单实例
- reportMcuCfgTask *singletask.OnceTask // 上报配置, 单实例
- reportRtuPosTask *singletask.OnceTask // 上报位置, 单实例
- // 注册本地的远程方法, 登录成功后用于让服务端能够主动下发指令
- registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
- }
- func ModuleInit() bool {
- err := loadCfgServers()
- if err != nil {
- baseapp.Logger.Errorf("[%s] 加载服务器配置项失败: %v!!", MODULE_NAME, err)
- return false
- }
- inheritDUI := "" // 更换数据板时, 要继承的历史ID, (可选)可为空
- if data, err := os.ReadFile(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt")); err == nil {
- inheritDUI = strings.TrimSpace(string(data))
- if len(inheritDUI) != 14 || !IsDecimal(inheritDUI) {
- baseapp.Logger.Errorf("[%s] 文件中的DUI: %q 无效, 无法继承历史台账数据!!", MODULE_NAME, inheritDUI)
- os.Exit(1)
- }
- }
- ctx, cancel := context.WithCancel(context.Background())
- Reporter = &MQTTReporter{
- client: nil,
- isLogin: atomic.Bool{},
- ctx: ctx,
- cancel: cancel,
- dui: "",
- inheritDUI: inheritDUI,
- reuploadHistTask: &singletask.OnceTask{},
- reportMcuCfgTask: &singletask.OnceTask{},
- reportRtuPosTask: &singletask.OnceTask{},
- registerRpcMeths: &singletask.OnceTask{},
- }
- Reporter.init() // 1, Reporter执行初始化,开始就要调用
- go Reporter.keepOnline() // 2, Reporter主处理循环,维持登录状态
- go Reporter.loopTakePhoto() // 3, 循环处理控制板发来的相机拍照请求
- go Reporter.loopRecvOneEnvData() // 4, 循环处理控制板发来的单条环境数据
- return true
- }
- func ModuleExit() {
- if Reporter != nil {
- Reporter.cancel()
- }
- }
- func (r *MQTTReporter) init() {
- logoutNotif, _ := makeLogoutNotif(netmgrd.GetIMEI())
- opts := mqtt.NewClientOptions().
- AddBroker(CfgServers.MQTTSrv.Address).
- SetUsername(CfgServers.MQTTSrv.Username).SetPassword(CfgServers.MQTTSrv.Password).
- SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
- SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
- SetOrderMatters(false). /*离线遗愿消息*/ SetWill("/yfkj/xy-v/server/notify", string(logoutNotif), MqttQos1, false)
- opts.OnConnect = func(c mqtt.Client) { baseapp.Logger.Infof("[%s] 服务器连接成功", MODULE_NAME) }
- opts.OnConnectionLost = func(c mqtt.Client, err error) {
- if r.isLogin.Swap(false) {
- baseapp.Logger.Warnf("[%s] 服务器连接丢失: %v!", MODULE_NAME, err)
- }
- }
- r.client = mqtt.NewClient(opts)
- }
- func (r *MQTTReporter) keepOnline() {
- t := time.NewTimer(FastInterval)
- defer t.Stop()
- for {
- select {
- case <-r.ctx.Done():
- return
- case <-t.C:
- t.Reset(r.tick())
- } // end select
- } // end for
- }
- func (r *MQTTReporter) tick() time.Duration {
- if r.IsLogin() {
- return FastInterval
- }
- baseapp.Logger.Infof("[%s] 尝试连接、登录服务器(%s)...", MODULE_NAME, CfgServers.MQTTSrv.Address)
- if err := r.connect(); err != nil {
- baseapp.Logger.Errorf("[%s] 无法连接上服务器: %v!!", MODULE_NAME, err)
- } else if err := r.subLoginResp(); err != nil {
- baseapp.Logger.Errorf("[%s] 订阅登录应答失败: %v!!", MODULE_NAME, err)
- } else if err := r.sendLoginReq(); err != nil {
- baseapp.Logger.Errorf("[%s] 发送登录请求失败: %v!!", MODULE_NAME, err)
- }
- return SlowInterval
- }
- func (r *MQTTReporter) connect() error {
- if r.client.IsConnected() {
- return nil
- }
- token := r.client.Connect()
- select {
- case <-r.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
- func (r *MQTTReporter) subLoginResp() error {
- token := r.client.Subscribe("/yfkj/xy-v/server/rpc/response", MqttQos1, r.onLogin)
- select {
- case <-r.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
- func (r *MQTTReporter) onLogin(client mqtt.Client, msg mqtt.Message) {
- imei, dui, err := parseLoginResp(msg.Payload())
- if err != nil {
- baseapp.Logger.Errorf("[%s] 设备登录失败: %v!!", MODULE_NAME, err)
- return
- }
- if imei != netmgrd.GetIMEI() { // 判断是否我的应答
- return
- }
- if len(r.inheritDUI) > 0 && r.inheritDUI == dui {
- _ = os.Remove(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt"))
- } else if len(r.inheritDUI) > 0 && r.inheritDUI != dui {
- baseapp.Logger.Errorf("[%s] 设备登录失败: 要继承的DUI(%s)与服务器返回的DUI(%s)不匹配!!", MODULE_NAME, r.inheritDUI, dui)
- return
- }
- r.dui = dui
- r.isLogin.Store(true)
- r.client.Unsubscribe("/yfkj/xy-v/server/rpc/response") // 登录成功后, 取消对登录应答的订阅(不关心是否已取消)
- baseapp.Logger.Infof("[%s] 设备登录成功, DUI: %s", MODULE_NAME, r.dui)
- ftpclient.FileUpFolder = dui // 设置默认的照片上传目录
- // 启动执行多个异步任务
- r.registerRpcMeths.Run(r.instRPCMethods, true) // 注册方法, 供远程调用, 单实例运行
- r.reuploadHistTask.Run(r.reuploadLegacy, true) // 补录上传历史遗存数据, 单实例运行
- r.reportMcuCfgTask.Run(func() { r.reportMcuCfg(mcu.CfgParams) }, true) // 上报控制板的运行参数, 单实例运行
- r.reportRtuPosTask.Run(r.reportLocation, true) // 定位成功上报当前位置, 单实例运行
- }
- func (r *MQTTReporter) reuploadLegacy() {
- baseapp.Logger.Infof("[%s] ++数据补录开始++", MODULE_NAME)
- nums1 := 0
- filename := "sensor_data.json"
- if FileExists(filename) {
- if nums1 = r.reportSensorHist(filename); nums1 >= 0 {
- os.Remove(filename) // 上传成功后删除历史数据文件
- } else {
- nums1 = 0
- }
- }
- baseapp.Logger.Infof("[%s] --数据补录结束--, 上传条数: %d", MODULE_NAME, nums1)
- baseapp.Logger.Infof("[%s] ++照片补录开始++", MODULE_NAME)
- nums2 := 0
- filepath.WalkDir(baseapp.IMG_DIR, func(path string, d fs.DirEntry, walkErr error) error {
- if walkErr != nil || d.IsDir() || filepath.Ext(path) != ".jpg" {
- return nil
- } else if !r.IsLogin() || r.ctx.Err() != nil {
- return context.Canceled
- }
- ftpclient.FileUploader.Lock() // 上传锁定(多个上传任务并发时, 保证串行执行)
- defer ftpclient.FileUploader.Unlock()
- mcu.GlobalWorkState.Add(mcu.PhotoUploading)
- defer mcu.GlobalWorkState.Remove(mcu.PhotoUploading)
- _, err := ftpclient.UploadFileToFtp(r.ctx, path, CfgServers.Img2Ftp.Address, CfgServers.Img2Ftp.Username, CfgServers.Img2Ftp.Password, ftpclient.DefaultUploadTimeout)
- if err == nil {
- baseapp.Logger.Infof("[%s] 照片上传成功, 本地文件: %q已删除", MODULE_NAME, path)
- os.Remove(path)
- nums2++
- }
- return nil
- })
- baseapp.Logger.Infof("[%s] --照片补录结束--, 上传张数: %d", MODULE_NAME, nums2)
- }
- func (r *MQTTReporter) reportMcuCfg(cfg *mcu.Config) {
- baseapp.Logger.Infof("[%s] ++上报配置参数++", MODULE_NAME)
- for {
- if !r.IsLogin() || r.ctx.Err() != nil {
- return
- }
- req, err := jsonrpc2.BuildNotification("report_config", cfg)
- if err != nil {
- baseapp.Logger.Errorf("[%s] 构建通知失败: %v!!", MODULE_NAME, err)
- return
- }
- msg, err := json.Marshal(req)
- if err != nil {
- baseapp.Logger.Errorf("[%s] 编码通知失败: %v!!", MODULE_NAME, err)
- return
- }
- text := string(msg)
- token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- continue
- }
- baseapp.Logger.Infof("[%s] --上报配置成功--, 报文内容: %s", MODULE_NAME, text)
- return
- }
- }
- func (r *MQTTReporter) reportLocation() {
- baseapp.Logger.Infof("[%s] ++上报当前位置++", MODULE_NAME)
- t := time.NewTicker(time.Second)
- defer t.Stop()
- for {
- if !r.IsLogin() || r.ctx.Err() != nil {
- return
- }
- lat, lon, err := gps.Get2DPosition()
- if err != nil {
- select {
- case <-r.ctx.Done():
- return
- case <-t.C:
- continue
- }
- }
- text := fmt.Sprintf(`{ "method": "report_location", "params": {"lat": "%s", "lon": "%s"} }`, lat, lon)
- token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- continue
- }
- baseapp.Logger.Infof("[%s] --上报位置成功--, 报文内容: %s", MODULE_NAME, text)
- return
- }
- }
- func (r *MQTTReporter) instRPCMethods() {
- t := time.NewTicker(time.Second)
- defer t.Stop()
- for {
- if !r.IsLogin() || r.ctx.Err() != nil {
- return
- }
- token := r.client.Subscribe(fmt.Sprintf("/yfkj/xy-v/device/rpc/%s/request", r.dui), MqttQos1, r.handleRequests)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() == nil {
- baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远程调用...", MODULE_NAME)
- break
- }
- select {
- case <-r.ctx.Done():
- return
- case <-t.C:
- continue
- }
- }
- }
- func (r *MQTTReporter) sendLoginReq() error {
- loginReq, err := makeLoginReq(netmgrd.GetIMEI(), netmgrd.GetSimICCID(), netmgrd.GetRSSI(), baseapp.Version, r.inheritDUI)
- if err != nil {
- return err
- }
- token := r.client.Publish("/yfkj/xy-v/server/rpc/request", MqttQos1, false, loginReq)
- select {
- case <-r.ctx.Done():
- return nil
- case <-token.Done():
- }
- return token.Error()
- }
- func (r *MQTTReporter) IsLogin() bool {
- return r.isLogin.Load()
- }
- func (r *MQTTReporter) handleRequests(client mqtt.Client, msg mqtt.Message) {
- go r.execOneCmd(msg)
- }
- func (r *MQTTReporter) execOneCmd(msg mqtt.Message) {
- str := string(msg.Payload())
- baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
- var resp *jsonrpc2.Response // 预定义一个空的应答
- req, err := jsonrpc2.ParseRequest(str)
- if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
- resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
- goto retp
- }
- switch req.Method {
- // 下发配置参数
- case "set_config":
- var cfg mcu.Config
- err := json.Unmarshal(req.Params, &cfg)
- if err == nil {
- err = mcu.SaveCfgParams(&cfg)
- }
- if err == nil { // 上报当前配置, 单实例运行(平台侧不处理错误应答、错误时不回滚之前的配置, 就需要重新上报)
- r.reportMcuCfgTask.Run(func() { r.reportMcuCfg(mcu.CfgParams) }, true)
- }
- resp = buildResp(req, "Success", err)
- // 下发拍照指令
- case "take_photo":
- rf, err := r.takePhotoAndUpToFtp()
- resp = buildResp(req, map[string]string{"filename": rf}, err)
- // 下发升级指令
- case "upgrade_app":
- server, user, pass, rf, md5val1, err := parseUpgradeAppReq(req.Params)
- if err != nil {
- resp = jsonrpc2.BuildError(req, -32700, err.Error())
- break
- }
- mcu.GlobalWorkState.Add(mcu.AppUpgrading)
- defer mcu.GlobalWorkState.Remove(mcu.AppUpgrading)
- lf, err := ftpclient.DownloadFileFromFtp(r.ctx, server, user, pass, rf, ftpclient.DefaultDownloadTimeout)
- if err != nil {
- resp = jsonrpc2.BuildError(req, -32700, err.Error())
- break
- }
- md5val2, err := FileMD5(lf)
- if err != nil {
- resp = jsonrpc2.BuildError(req, -32700, err.Error())
- break
- }
- if !strings.EqualFold(md5val1, md5val2) {
- resp = jsonrpc2.BuildError(req, -32700, fmt.Sprintf("MD5校验失败(%s != %s)", md5val1, md5val2))
- break
- }
- cmdStr := "tar xzf " + lf + " -C " + filepath.Dir(baseapp.EXEC_DIR)
- cmd := exec.Command("sh", "-c", cmdStr)
- cmd.Stdout = baseapp.Logger.WriterLevel(logrus.DebugLevel)
- cmd.Stderr = baseapp.Logger.WriterLevel(logrus.ErrorLevel)
- err = cmd.Run()
- os.Remove(lf)
- resp = buildResp(req, "Success", err)
- // 下发重启指令
- case "reboot":
- err := exec.Command("reboot").Run()
- resp = buildResp(req, "Success", err)
- // 调用无效方法
- default:
- resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
- }
- retp:
- text, err := resp.String()
- if err != nil {
- baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
- return
- }
- token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/device/rpc/%s/response", r.dui), MqttQos1, false, text)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if err := token.Error(); err != nil {
- baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
- }
- baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
- }
|