||
- package reporter
- import "C"
- import (
- "context"
- "encoding/json"
- "fmt"
- "io/fs"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- gps "hnyfkj.com.cn/rtu/bxs-sy/air530z"
- modem "hnyfkj.com.cn/rtu/bxs-sy/air720u"
- "hnyfkj.com.cn/rtu/bxs-sy/baseapp"
- mcu "hnyfkj.com.cn/rtu/bxs-sy/mcu_ctrl_board"
- )
- const (
- MODULE_NAME = "Reporter"
- sendPingReqInterval = 10 * time.Second
- waitPingRespTimeout = 5 * time.Second
- )
- var (
- Reporter *MQTTReporter
- )
- type MQTTReporter struct {
- client mqtt.Client
- ctx context.Context
- cancel context.CancelFunc
- msgQos byte // MQTT数据消息传输的质量等级(统一)
- isLogin atomic.Bool // 标记是否已成功登录MQTT后端服务器
- dui string // 登录成功后服务端返回的设备唯一ID
- inheritDUI string // 继承的历史ID, 可选可为空(换板时)
- // 主动上报的后台任务, 登录成功时用于照片补录和上报通知类消息
- uploadPhotosTask *SingleTask
- reportMCUTask *SingleTask
- reportLocTask *SingleTask
- }
- func (r *MQTTReporter) IsLogin() bool {
- return r.isLogin.Load()
- }
- func (r *MQTTReporter) onLogin(client mqtt.Client, msg mqtt.Message) {
- imei, dui, err := parseLoginResp(msg.Payload())
- if err != nil {
- baseapp.Logger.Errorf("[%s] 登录失败: %v!!", MODULE_NAME, err)
- return
- }
- if imei != modem.GetIMEI() { // 判断是否我的应答
- return
- }
- if len(r.inheritDUI) > 0 && r.inheritDUI == dui {
- _ = os.Remove(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt"))
- } else if len(r.inheritDUI) > 0 && r.inheritDUI != dui {
- baseapp.Logger.Errorf("[%s] 登录失败: 要继承的DUI(%s)与服务器返回的DUI(%s)不匹配!!", MODULE_NAME, r.inheritDUI, dui)
- return
- }
- r.dui = dui
- r.isLogin.Store(true)
- r.client.Unsubscribe("/yfkj/bxs-sy/server/rpc/response") // 登录成功后, 取消对登录应答的订阅(不关心是否成功)
- baseapp.Logger.Infof("[%s] 登录成功, 设备DUI: %s", MODULE_NAME, r.dui)
- r.uploadPhotosTask.Run(r.uploadPendingPhotos, true) // 补录上传历史遗存照片, 单实例运行
- r.reportLocTask.Run(r.reportLocation, true) // 定位成功通知当前位置, 单实例运行
- r.reportMCUTask.Run(func() { r.reportMCUParams(mcu.CfgParams) }, true) // 上报控制板的运行参数, 单实例运行
- }
- func (r *MQTTReporter) uploadPendingPhotos() {
- baseapp.Logger.Infof("[%s] 拍照补录上传任务开始", MODULE_NAME)
- nums := 0
- filepath.WalkDir(baseapp.IMG_DIR, func(path string, d fs.DirEntry, walkErr error) error {
- if walkErr != nil || d.IsDir() || filepath.Ext(path) != ".jpg" {
- return nil
- } else if !r.IsLogin() || r.ctx.Err() != nil {
- return context.Canceled
- }
- fileUploader.uploadLock.Lock() // 上传锁定(多个上传任务并发时, 保证串行执行) 2025-10-17
- defer fileUploader.uploadLock.Unlock()
- mcu.MCBSetCamStateBit(mcu.PhotoUploading)
- defer mcu.MCBSetCamStateBit(mcu.Idle)
- _, err := uploadFileToFtp(r.ctx, path, CfgServers.Img2Ftp.Address, CfgServers.Img2Ftp.Username, CfgServers.Img2Ftp.Password, defaultUploadTimeout)
- if err == nil {
- baseapp.Logger.Infof("[%s] 拍照补录上传成功, 本地文件: %q已删除", MODULE_NAME, path)
- os.Remove(path)
- nums++
- }
- return nil
- })
- baseapp.Logger.Infof("[%s] 拍照补录上传任务结束, 本次共上传%d张照片", MODULE_NAME, nums)
- }
- func (r *MQTTReporter) reportMCUParams(setParams *mcu.Config) {
- baseapp.Logger.Infof("[%s] 主动上报MCU参数任务开始", MODULE_NAME)
- for {
- if !r.IsLogin() || r.ctx.Err() != nil {
- return
- }
- msg := fmt.Sprintf(`{"imei": "%s", "version": "%s", "ctrlMode": "%d", "lightDuration": "%d", "startHour": "%d", "endHour": "%d", "takePhotoIntervalMinutes": "%d"}`,
- modem.GetIMEI(), setParams.Version, setParams.CtrlMode, setParams.LightDuration, setParams.StartHour, setParams.EndHour, setParams.TakePhotoInterval)
- token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/notify/%s/mcuParams", r.dui), r.msgQos, false, msg)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- continue
- }
- baseapp.Logger.Infof("[%s] 主动上报MCU参数任务结束(%q)", MODULE_NAME, msg)
- return
- }
- }
- func (r *MQTTReporter) reportLocation() {
- baseapp.Logger.Infof("[%s] 主动上报当前GPS位置信息任务开始", MODULE_NAME)
- for {
- if !r.IsLogin() || r.ctx.Err() != nil {
- return
- }
- lat, lon, err := gps.Get2DPosition()
- if err != nil {
- time.Sleep(defaultRtyInterval)
- continue
- }
- msg := fmt.Sprintf(`{"imei": "%s", "lat": "%s", "lon": "%s"}`, modem.GetIMEI(), lat, lon)
- token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/notify/%s/location", r.dui), r.msgQos, false, msg)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- continue
- }
- baseapp.Logger.Infof("[%s] 主动上报当前GPS位置信息任务结束(%q)", MODULE_NAME, msg)
- return
- }
- }
- func (r *MQTTReporter) handleRequest(client mqtt.Client, msg mqtt.Message) {
- go r.execRequest(msg)
- }
- func (r *MQTTReporter) execRequest(msg mqtt.Message) {
- var req rpcRequest
- reqStr := string(msg.Payload())
- baseapp.Logger.Debugf("[%s] 收到一个RPC请求: %s", MODULE_NAME, reqStr)
- var resp rpcResponse = rpcResponse{
- JSONRPC: "2.0",
- }
- if err := json.Unmarshal(msg.Payload(), &req); err != nil {
- resp.Error = &rpcError{Code: -32700, Message: "Parse error"}
- goto send_p
- } else {
- resp.ID = req.ID
- }
- switch req.Method {
- // Call-1: 设置MCU控制板运行参数, 保存到文件
- case "setMCUParams":
- params, err := parseMCUParams(msg.Payload())
- if err != nil {
- resp.Error = makeCustomError(err)
- goto send_p
- }
- var cfgParams mcu.Config
- cfgParams.Version = params.Version
- cfgParams.CtrlMode, _ = strconv.Atoi(params.CtrlMode)
- cfgParams.LightDuration, _ = strconv.Atoi(params.LightDuration)
- cfgParams.StartHour, _ = strconv.Atoi(params.StartHour)
- cfgParams.EndHour, _ = strconv.Atoi(params.EndHour)
- cfgParams.TakePhotoInterval, _ = strconv.Atoi(params.PhotoIntervalMinutes)
- err = mcu.SaveCfgParams(&cfgParams)
- if err != nil {
- resp.Error = makeCustomError(err)
- } else {
- resp.Result, _ = json.Marshal("Success")
- go r.reportMCUParams(&cfgParams)
- }
- // Call-2: 获取MCU控制板运行参数, 从文件读取
- case "getMCUParams":
- var params mcuParams
- params.Version = mcu.CfgParams.Version
- params.CtrlMode = fmt.Sprintf("%d", mcu.CfgParams.CtrlMode)
- params.LightDuration = fmt.Sprintf("%d", mcu.CfgParams.LightDuration)
- params.StartHour = fmt.Sprintf("%d", mcu.CfgParams.StartHour)
- params.EndHour = fmt.Sprintf("%d", mcu.CfgParams.EndHour)
- params.PhotoIntervalMinutes = fmt.Sprintf("%d", mcu.CfgParams.TakePhotoInterval)
- resp.Result, _ = json.Marshal(params)
- // Call-3, 异步请求MCU获取当前实时的环境数据
- case "getCurrentEnvData":
- err := mcu.MCBReqEnvCurData()
- if err != nil {
- resp.Error = makeCustomError(err)
- } else {
- resp.Result, _ = json.Marshal("Success")
- }
- // Call-4: 同步执行相机拍照, 比较耗时, 需等待
- case "takephoto":
- remoteFile, err := TakePhotoAndUpToFtp(r.ctx)
- if err != nil {
- resp.Error = makeCustomError(err)
- } else {
- resp.Result, _ = json.Marshal(map[string]string{"filename": remoteFile})
- }
- // Call-5:重启, 服务端不一定能收到成功的应答
- case "reboot":
- cmd := exec.Command("reboot")
- if err := cmd.Run(); err != nil {
- resp.Error = &rpcError{Code: -32700, Message: "Failed"}
- } else {
- resp.Result, _ = json.Marshal("Success")
- }
- // Call-6: 获取RTU当前的位置信息(2D经纬度坐标)
- case "getLocation":
- lat, lon, err := gps.Get2DPosition()
- if err != nil {
- resp.Error = makeCustomError(err)
- } else {
- resp.Result, _ = json.Marshal(map[string]string{"lat": lat, "lon": lon})
- }
- // Call-7: 升级RTU固件, 升级后程序会自动退出, 靠守护进程重新拉起, 替换新程序
- case "upgradeApp":
- serverAddr, loginUser, loginPass, remoteFile, md5val1, err := parseUpgradeAppResp(msg.Payload())
- if err != nil {
- resp.Error = makeCustomError(err)
- goto send_p
- }
- mcu.MCBSetMNTStateBit(mcu.AppUpgrading)
- localFile, err := downloadFileFromFtp(r.ctx, serverAddr, loginUser, loginPass, remoteFile, defaultDownloadTimeout)
- if err != nil {
- resp.Error = makeCustomError(err)
- mcu.MCBSetMNTStateBit(mcu.Idle)
- goto send_p
- }
- md5val2, err := fileMD5(localFile)
- if err != nil {
- resp.Error = makeCustomError(err)
- mcu.MCBSetMNTStateBit(mcu.Idle)
- goto send_p
- }
- if !strings.EqualFold(md5val1, md5val2) {
- resp.Error = &rpcError{Code: -32001, Message: fmt.Sprintf("MD5校验失败(%s)", md5val2)}
- mcu.MCBSetMNTStateBit(mcu.Idle)
- goto send_p
- }
- // 打包: tar czf rtu_bxs_seyou_1.0.0.1.tar.gz ./rtu_bxs_seyou.out
- // 解包: tar xzf rtu_bxs_seyou_1.0.0.1.tar.gz -C baseapp.EXEC_DIR
- cmdStr := "tar xzf " + localFile + " -C " + baseapp.EXEC_DIR
- cmd := exec.Command("sh", "-c", cmdStr)
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- err = cmd.Run()
- os.Remove(localFile)
- if err != nil {
- resp.Error = &rpcError{Code: -32001, Message: "解压升级包失败"}
- mcu.MCBSetMNTStateBit(mcu.Idle)
- goto send_p
- }
- resp.Result, _ = json.Marshal("Success")
- mcu.MCBSetMNTStateBit(mcu.Idle)
- default: // 返回不支持的方法调用(不支持的请求)
- resp.Error = &rpcError{Code: -32601, Message: "Method not found"}
- }
- send_p:
- payload, _ := json.Marshal(resp)
- token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/device/rpc/%s/response", r.dui), r.msgQos, false, payload)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() == nil {
- baseapp.Logger.Debugf("[%s] 发送RPC应答成功: %s", MODULE_NAME, string(payload))
- } else {
- baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, token.Error())
- }
- }
- func (r *MQTTReporter) runLoop() {
- logoutMsg, _ := makeLogoutMsg(modem.GetIMEI())
- opts := mqtt.NewClientOptions().
- AddBroker(CfgServers.MQTTSrv.Address).SetUsername(CfgServers.MQTTSrv.Username).SetPassword(CfgServers.MQTTSrv.Password).
- SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).SetKeepAlive(sendPingReqInterval).SetPingTimeout(waitPingRespTimeout).
- SetOrderMatters(false).SetWill("/yfkj/bxs-sy/notify/logout", string(logoutMsg), r.msgQos, false)
- opts.OnConnect = func(c mqtt.Client) {
- baseapp.Logger.Infof("[%s] 服务器连接成功", MODULE_NAME)
- }
- opts.OnConnectionLost = func(c mqtt.Client, err error) {
- if r.isLogin.Swap(false) {
- baseapp.Logger.Warnf("[%s] 服务器连接丢失: %v!", MODULE_NAME, err)
- }
- }
- r.client = mqtt.NewClient(opts)
- ticker := time.NewTicker(1 * time.Second)
- defer ticker.Stop()
- for {
- if r.IsLogin() {
- select {
- case <-r.ctx.Done():
- return
- case <-ticker.C: // 唤醒定期检测
- continue
- }
- }
- wStep1: // 1, 连接到服务器
- if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
- token := r.client.Connect()
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- baseapp.Logger.Errorf("[%s] 服务器连接失败: %v!!", MODULE_NAME, token.Error())
- select {
- case <-r.ctx.Done():
- return
- case <-time.After(3 * time.Second):
- }
- goto wStep1
- }
- }
- wStep2: // 2, 订阅登录应答
- token := r.client.Subscribe("/yfkj/bxs-sy/server/rpc/response", r.msgQos, r.onLogin)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- baseapp.Logger.Errorf("[%s] 登录应答订阅失败: %v!!", MODULE_NAME, token.Error())
- select {
- case <-r.ctx.Done():
- return
- case <-time.After(3 * time.Second):
- }
- if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
- goto wStep1
- } else {
- goto wStep2
- }
- }
- wStep3: // 3, 发送登录请求
- loginReq, _ := makeLoginReq(modem.GetIMEI(), modem.GetSimICCID(), modem.GetRSSI(), baseapp.Version, r.inheritDUI)
- token = r.client.Publish("/yfkj/bxs-sy/server/rpc/request", r.msgQos, false, loginReq)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- baseapp.Logger.Errorf("[%s] 登录请求发送失败: %v!!", MODULE_NAME, token.Error())
- select {
- case <-r.ctx.Done():
- return
- case <-time.After(3 * time.Second):
- }
- if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
- goto wStep1
- } else {
- goto wStep3
- }
- }
- tc := time.NewTicker(500 * time.Millisecond)
- tt := time.NewTimer(5 * time.Second) // 超时
- for !r.IsLogin() {
- select {
- case <-r.ctx.Done():
- tc.Stop()
- tt.Stop()
- return
- case <-tc.C: // 唤醒定期检测
- case <-tt.C:
- baseapp.Logger.Warnf("[%s] 登录超时,重新发送登录请求...", MODULE_NAME)
- if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
- goto wStep1
- } else {
- goto wStep3
- }
- }
- }
- tc.Stop()
- tt.Stop()
- wStep4: // 4, 注册本地方法
- token = r.client.Subscribe(fmt.Sprintf("/yfkj/bxs-sy/device/rpc/%s/request", r.dui), r.msgQos, r.handleRequest)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- baseapp.Logger.Errorf("[%s] 注册本地RPC方法失败: %v!!", MODULE_NAME, token.Error())
- select {
- case <-r.ctx.Done():
- return
- case <-time.After(3 * time.Second):
- }
- if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
- goto wStep1
- } else {
- goto wStep4
- }
- }
- } // for end
- }
- func ModuleInit() bool {
- err := loadCfgServers()
- if err != nil {
- baseapp.Logger.Errorf("[%s] 加载服务器配置项失败: %v!!", MODULE_NAME, err)
- return false
- }
- inheritDUI := "" // 换RTU板卡时, 继承历史ID, 可选可为空
- if data, err := os.ReadFile(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt")); err == nil {
- inheritDUI = strings.TrimSpace(string(data))
- if len(inheritDUI) != 14 || !isDecimal(inheritDUI) {
- baseapp.Logger.Errorf("[%s] 文件中的DUI: %q 无效, 无法继承历史台账数据!!", MODULE_NAME, inheritDUI)
- os.Exit(1)
- }
- }
- ctx, cancel := context.WithCancel(context.Background())
- Reporter = &MQTTReporter{ctx: ctx, cancel: cancel, msgQos: 1, inheritDUI: inheritDUI,
- uploadPhotosTask: &SingleTask{}, reportMCUTask: &SingleTask{}, reportLocTask: &SingleTask{}}
- go Reporter.runLoop()
- go LoopTakePhoto(Reporter.ctx)
- go LoopRecvEnvDataGrp(Reporter.ctx)
- go LoopRecvEnvDataOne(Reporter.ctx)
- return true
- }
- func ModuleExit() {
- if Reporter != nil {
- Reporter.cancel()
- }
- }
- //export RTU_IsLoginMqServer
- func RTU_IsLoginMqServer() C.int {
- if Reporter != nil && Reporter.IsLogin() {
- return 1
- }
- return 0
- }
|