package reporter import "C" import ( "context" "encoding/json" "fmt" "io/fs" "os" "os/exec" "path/filepath" "strconv" "strings" "sync/atomic" "time" mqtt "github.com/eclipse/paho.mqtt.golang" gps "hnyfkj.com.cn/rtu/bxs-sy/air530z" modem "hnyfkj.com.cn/rtu/bxs-sy/air720u" "hnyfkj.com.cn/rtu/bxs-sy/baseapp" mcu "hnyfkj.com.cn/rtu/bxs-sy/mcu_ctrl_board" ) const ( MODULE_NAME = "Reporter" sendPingReqInterval = 10 * time.Second waitPingRespTimeout = 5 * time.Second ) var ( Reporter *MQTTReporter ) type MQTTReporter struct { client mqtt.Client ctx context.Context cancel context.CancelFunc msgQos byte // MQTT数据消息传输的质量等级(统一) isLogin atomic.Bool // 标记是否已成功登录MQTT后端服务器 dui string // 登录成功后服务端返回的设备唯一ID inheritDUI string // 继承的历史ID, 可选可为空(换板时) // 主动上报的后台任务, 登录成功时用于照片补录和上报通知类消息 uploadPhotosTask *SingleTask reportMCUTask *SingleTask reportLocTask *SingleTask } func (r *MQTTReporter) IsLogin() bool { return r.isLogin.Load() } func (r *MQTTReporter) onLogin(client mqtt.Client, msg mqtt.Message) { imei, dui, err := parseLoginResp(msg.Payload()) if err != nil { baseapp.Logger.Errorf("[%s] 登录失败: %v!!", MODULE_NAME, err) return } if imei != modem.GetIMEI() { // 判断是否我的应答 return } if len(r.inheritDUI) > 0 && r.inheritDUI == dui { _ = os.Remove(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt")) } else if len(r.inheritDUI) > 0 && r.inheritDUI != dui { baseapp.Logger.Errorf("[%s] 登录失败: 要继承的DUI(%s)与服务器返回的DUI(%s)不匹配!!", MODULE_NAME, r.inheritDUI, dui) return } r.dui = dui r.isLogin.Store(true) r.client.Unsubscribe("/yfkj/bxs-sy/server/rpc/response") // 登录成功后, 取消对登录应答的订阅(不关心是否成功) baseapp.Logger.Infof("[%s] 登录成功, 设备DUI: %s", MODULE_NAME, r.dui) r.uploadPhotosTask.Run(r.uploadPendingPhotos, true) // 补录上传历史遗存照片, 单实例运行 r.reportLocTask.Run(r.reportLocation, true) // 定位成功通知当前位置, 单实例运行 r.reportMCUTask.Run(func() { r.reportMCUParams(mcu.CfgParams) }, true) // 上报控制板的运行参数, 单实例运行 } func (r *MQTTReporter) uploadPendingPhotos() { baseapp.Logger.Infof("[%s] 拍照补录上传任务开始", MODULE_NAME) nums := 0 filepath.WalkDir(baseapp.IMG_DIR, func(path string, d fs.DirEntry, walkErr error) error { if walkErr != nil || d.IsDir() || filepath.Ext(path) != ".jpg" { return nil } else if !r.IsLogin() || r.ctx.Err() != nil { return context.Canceled } fileUploader.uploadLock.Lock() // 上传锁定(多个上传任务并发时, 保证串行执行) 2025-10-17 defer fileUploader.uploadLock.Unlock() mcu.MCBSetCamStateBit(mcu.PhotoUploading) defer mcu.MCBSetCamStateBit(mcu.Idle) _, err := uploadFileToFtp(r.ctx, path, CfgServers.Img2Ftp.Address, CfgServers.Img2Ftp.Username, CfgServers.Img2Ftp.Password, defaultUploadTimeout) if err == nil { baseapp.Logger.Infof("[%s] 拍照补录上传成功, 本地文件: %q已删除", MODULE_NAME, path) os.Remove(path) nums++ } return nil }) baseapp.Logger.Infof("[%s] 拍照补录上传任务结束, 本次共上传%d张照片", MODULE_NAME, nums) } func (r *MQTTReporter) reportMCUParams(setParams *mcu.Config) { baseapp.Logger.Infof("[%s] 主动上报MCU参数任务开始", MODULE_NAME) for { if !r.IsLogin() || r.ctx.Err() != nil { return } msg := fmt.Sprintf(`{"imei": "%s", "version": "%s", "ctrlMode": "%d", "lightDuration": "%d", "startHour": "%d", "endHour": "%d", "takePhotoIntervalMinutes": "%d"}`, modem.GetIMEI(), setParams.Version, setParams.CtrlMode, setParams.LightDuration, setParams.StartHour, setParams.EndHour, setParams.TakePhotoInterval) token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/notify/%s/mcuParams", r.dui), r.msgQos, false, msg) select { case <-r.ctx.Done(): return case <-token.Done(): } if token.Error() != nil { continue } baseapp.Logger.Infof("[%s] 主动上报MCU参数任务结束(%q)", MODULE_NAME, msg) return } } func (r *MQTTReporter) reportLocation() { baseapp.Logger.Infof("[%s] 主动上报当前GPS位置信息任务开始", MODULE_NAME) for { if !r.IsLogin() || r.ctx.Err() != nil { return } lat, lon, err := gps.Get2DPosition() if err != nil { time.Sleep(defaultRtyInterval) continue } msg := fmt.Sprintf(`{"imei": "%s", "lat": "%s", "lon": "%s"}`, modem.GetIMEI(), lat, lon) token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/notify/%s/location", r.dui), r.msgQos, false, msg) select { case <-r.ctx.Done(): return case <-token.Done(): } if token.Error() != nil { continue } baseapp.Logger.Infof("[%s] 主动上报当前GPS位置信息任务结束(%q)", MODULE_NAME, msg) return } } func (r *MQTTReporter) handleRequest(client mqtt.Client, msg mqtt.Message) { go r.execRequest(msg) } func (r *MQTTReporter) execRequest(msg mqtt.Message) { var req rpcRequest reqStr := string(msg.Payload()) baseapp.Logger.Debugf("[%s] 收到一个RPC请求: %s", MODULE_NAME, reqStr) var resp rpcResponse = rpcResponse{ JSONRPC: "2.0", } if err := json.Unmarshal(msg.Payload(), &req); err != nil { resp.Error = &rpcError{Code: -32700, Message: "Parse error"} goto send_p } else { resp.ID = req.ID } switch req.Method { // Call-1: 设置MCU控制板运行参数, 保存到文件 case "setMCUParams": params, err := parseMCUParams(msg.Payload()) if err != nil { resp.Error = makeCustomError(err) goto send_p } var cfgParams mcu.Config cfgParams.Version = params.Version cfgParams.CtrlMode, _ = strconv.Atoi(params.CtrlMode) cfgParams.LightDuration, _ = strconv.Atoi(params.LightDuration) cfgParams.StartHour, _ = strconv.Atoi(params.StartHour) cfgParams.EndHour, _ = strconv.Atoi(params.EndHour) cfgParams.TakePhotoInterval, _ = strconv.Atoi(params.PhotoIntervalMinutes) err = mcu.SaveCfgParams(&cfgParams) if err != nil { resp.Error = makeCustomError(err) } else { resp.Result, _ = json.Marshal("Success") go r.reportMCUParams(&cfgParams) } // Call-2: 获取MCU控制板运行参数, 从文件读取 case "getMCUParams": var params mcuParams params.Version = mcu.CfgParams.Version params.CtrlMode = fmt.Sprintf("%d", mcu.CfgParams.CtrlMode) params.LightDuration = fmt.Sprintf("%d", mcu.CfgParams.LightDuration) params.StartHour = fmt.Sprintf("%d", mcu.CfgParams.StartHour) params.EndHour = fmt.Sprintf("%d", mcu.CfgParams.EndHour) params.PhotoIntervalMinutes = fmt.Sprintf("%d", mcu.CfgParams.TakePhotoInterval) resp.Result, _ = json.Marshal(params) // Call-3, 异步请求MCU获取当前实时的环境数据 case "getCurrentEnvData": err := mcu.MCBReqEnvCurData() if err != nil { resp.Error = makeCustomError(err) } else { resp.Result, _ = json.Marshal("Success") } // Call-4: 同步执行相机拍照, 比较耗时, 需等待 case "takephoto": remoteFile, err := TakePhotoAndUpToFtp(r.ctx) if err != nil { resp.Error = makeCustomError(err) } else { resp.Result, _ = json.Marshal(map[string]string{"filename": remoteFile}) } // Call-5:重启, 服务端不一定能收到成功的应答 case "reboot": cmd := exec.Command("reboot") if err := cmd.Run(); err != nil { resp.Error = &rpcError{Code: -32700, Message: "Failed"} } else { resp.Result, _ = json.Marshal("Success") } // Call-6: 获取RTU当前的位置信息(2D经纬度坐标) case "getLocation": lat, lon, err := gps.Get2DPosition() if err != nil { resp.Error = makeCustomError(err) } else { resp.Result, _ = json.Marshal(map[string]string{"lat": lat, "lon": lon}) } // Call-7: 升级RTU固件, 升级后程序会自动退出, 靠守护进程重新拉起, 替换新程序 case "upgradeApp": serverAddr, loginUser, loginPass, remoteFile, md5val1, err := parseUpgradeAppResp(msg.Payload()) if err != nil { resp.Error = makeCustomError(err) goto send_p } mcu.MCBSetMNTStateBit(mcu.AppUpgrading) localFile, err := downloadFileFromFtp(r.ctx, serverAddr, loginUser, loginPass, remoteFile, defaultDownloadTimeout) if err != nil { resp.Error = makeCustomError(err) mcu.MCBSetMNTStateBit(mcu.Idle) goto send_p } md5val2, err := fileMD5(localFile) if err != nil { resp.Error = makeCustomError(err) mcu.MCBSetMNTStateBit(mcu.Idle) goto send_p } if !strings.EqualFold(md5val1, md5val2) { resp.Error = &rpcError{Code: -32001, Message: fmt.Sprintf("MD5校验失败(%s)", md5val2)} mcu.MCBSetMNTStateBit(mcu.Idle) goto send_p } // 打包: tar czf rtu_bxs_seyou_1.0.0.1.tar.gz ./rtu_bxs_seyou.out // 解包: tar xzf rtu_bxs_seyou_1.0.0.1.tar.gz -C baseapp.EXEC_DIR cmdStr := "tar xzf " + localFile + " -C " + baseapp.EXEC_DIR cmd := exec.Command("sh", "-c", cmdStr) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err = cmd.Run() os.Remove(localFile) if err != nil { resp.Error = &rpcError{Code: -32001, Message: "解压升级包失败"} mcu.MCBSetMNTStateBit(mcu.Idle) goto send_p } resp.Result, _ = json.Marshal("Success") mcu.MCBSetMNTStateBit(mcu.Idle) default: // 返回不支持的方法调用(不支持的请求) resp.Error = &rpcError{Code: -32601, Message: "Method not found"} } send_p: payload, _ := json.Marshal(resp) token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/device/rpc/%s/response", r.dui), r.msgQos, false, payload) select { case <-r.ctx.Done(): return case <-token.Done(): } if token.Error() == nil { baseapp.Logger.Debugf("[%s] 发送RPC应答成功: %s", MODULE_NAME, string(payload)) } else { baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, token.Error()) } } func (r *MQTTReporter) runLoop() { logoutMsg, _ := makeLogoutMsg(modem.GetIMEI()) opts := mqtt.NewClientOptions(). AddBroker(CfgServers.MQTTSrv.Address).SetUsername(CfgServers.MQTTSrv.Username).SetPassword(CfgServers.MQTTSrv.Password). SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).SetKeepAlive(sendPingReqInterval).SetPingTimeout(waitPingRespTimeout). SetOrderMatters(false).SetWill("/yfkj/bxs-sy/notify/logout", string(logoutMsg), r.msgQos, false) opts.OnConnect = func(c mqtt.Client) { baseapp.Logger.Infof("[%s] 服务器连接成功", MODULE_NAME) } opts.OnConnectionLost = func(c mqtt.Client, err error) { if r.isLogin.Swap(false) { baseapp.Logger.Warnf("[%s] 服务器连接丢失: %v!", MODULE_NAME, err) } } r.client = mqtt.NewClient(opts) ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { if r.IsLogin() { select { case <-r.ctx.Done(): return case <-ticker.C: // 唤醒定期检测 continue } } wStep1: // 1, 连接到服务器 if !r.client.IsConnected() && !r.client.IsConnectionOpen() { token := r.client.Connect() select { case <-r.ctx.Done(): return case <-token.Done(): } if token.Error() != nil { baseapp.Logger.Errorf("[%s] 服务器连接失败: %v!!", MODULE_NAME, token.Error()) select { case <-r.ctx.Done(): return case <-time.After(3 * time.Second): } goto wStep1 } } wStep2: // 2, 订阅登录应答 token := r.client.Subscribe("/yfkj/bxs-sy/server/rpc/response", r.msgQos, r.onLogin) select { case <-r.ctx.Done(): return case <-token.Done(): } if token.Error() != nil { baseapp.Logger.Errorf("[%s] 登录应答订阅失败: %v!!", MODULE_NAME, token.Error()) select { case <-r.ctx.Done(): return case <-time.After(3 * time.Second): } if !r.client.IsConnected() && !r.client.IsConnectionOpen() { goto wStep1 } else { goto wStep2 } } wStep3: // 3, 发送登录请求 loginReq, _ := makeLoginReq(modem.GetIMEI(), modem.GetSimICCID(), modem.GetRSSI(), baseapp.Version, r.inheritDUI) token = r.client.Publish("/yfkj/bxs-sy/server/rpc/request", r.msgQos, false, loginReq) select { case <-r.ctx.Done(): return case <-token.Done(): } if token.Error() != nil { baseapp.Logger.Errorf("[%s] 登录请求发送失败: %v!!", MODULE_NAME, token.Error()) select { case <-r.ctx.Done(): return case <-time.After(3 * time.Second): } if !r.client.IsConnected() && !r.client.IsConnectionOpen() { goto wStep1 } else { goto wStep3 } } tc := time.NewTicker(500 * time.Millisecond) tt := time.NewTimer(5 * time.Second) // 超时 for !r.IsLogin() { select { case <-r.ctx.Done(): tc.Stop() tt.Stop() return case <-tc.C: // 唤醒定期检测 case <-tt.C: baseapp.Logger.Warnf("[%s] 登录超时,重新发送登录请求...", MODULE_NAME) if !r.client.IsConnected() && !r.client.IsConnectionOpen() { goto wStep1 } else { goto wStep3 } } } tc.Stop() tt.Stop() wStep4: // 4, 注册本地方法 token = r.client.Subscribe(fmt.Sprintf("/yfkj/bxs-sy/device/rpc/%s/request", r.dui), r.msgQos, r.handleRequest) select { case <-r.ctx.Done(): return case <-token.Done(): } if token.Error() != nil { baseapp.Logger.Errorf("[%s] 注册本地RPC方法失败: %v!!", MODULE_NAME, token.Error()) select { case <-r.ctx.Done(): return case <-time.After(3 * time.Second): } if !r.client.IsConnected() && !r.client.IsConnectionOpen() { goto wStep1 } else { goto wStep4 } } } // for end } func ModuleInit() bool { err := loadCfgServers() if err != nil { baseapp.Logger.Errorf("[%s] 加载服务器配置项失败: %v!!", MODULE_NAME, err) return false } inheritDUI := "" // 换RTU板卡时, 继承历史ID, 可选可为空 if data, err := os.ReadFile(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt")); err == nil { inheritDUI = strings.TrimSpace(string(data)) if len(inheritDUI) != 14 || !isDecimal(inheritDUI) { baseapp.Logger.Errorf("[%s] 文件中的DUI: %q 无效, 无法继承历史台账数据!!", MODULE_NAME, inheritDUI) os.Exit(1) } } ctx, cancel := context.WithCancel(context.Background()) Reporter = &MQTTReporter{ctx: ctx, cancel: cancel, msgQos: 1, inheritDUI: inheritDUI, uploadPhotosTask: &SingleTask{}, reportMCUTask: &SingleTask{}, reportLocTask: &SingleTask{}} go Reporter.runLoop() go LoopTakePhoto(Reporter.ctx) go LoopRecvEnvDataGrp(Reporter.ctx) go LoopRecvEnvDataOne(Reporter.ctx) return true } func ModuleExit() { if Reporter != nil { Reporter.cancel() } } //export RTU_IsLoginMqServer func RTU_IsLoginMqServer() C.int { if Reporter != nil && Reporter.IsLogin() { return 1 } return 0 }