reporter.go 14 KB


  1. package reporter
  2. import "C"
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io/fs"
  8. "os"
  9. "os/exec"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync/atomic"
  14. "time"
  15. mqtt "github.com/eclipse/paho.mqtt.golang"
  16. gps "hnyfkj.com.cn/rtu/bxs-sy/air530z"
  17. modem "hnyfkj.com.cn/rtu/bxs-sy/air720u"
  18. "hnyfkj.com.cn/rtu/bxs-sy/baseapp"
  19. mcu "hnyfkj.com.cn/rtu/bxs-sy/mcu_ctrl_board"
  20. )
  21. const (
  22. MODULE_NAME = "Reporter"
  23. sendPingReqInterval = 10 * time.Second
  24. waitPingRespTimeout = 5 * time.Second
  25. )
  26. var (
  27. Reporter *MQTTReporter
  28. )
  29. type MQTTReporter struct {
  30. client mqtt.Client
  31. ctx context.Context
  32. cancel context.CancelFunc
  33. msgQos byte // MQTT数据消息传输的质量等级(统一)
  34. isLogin atomic.Bool // 标记是否已成功登录MQTT后端服务器
  35. dui string // 登录成功后服务端返回的设备唯一ID
  36. inheritDUI string // 继承的历史ID, 可选可为空(换板时)
  37. // 主动上报的后台任务, 登录成功时用于照片补录和上报通知类消息
  38. uploadPhotosTask *SingleTask
  39. reportMCUTask *SingleTask
  40. reportLocTask *SingleTask
  41. }
  42. func (r *MQTTReporter) IsLogin() bool {
  43. return r.isLogin.Load()
  44. }
  45. func (r *MQTTReporter) onLogin(client mqtt.Client, msg mqtt.Message) {
  46. imei, dui, err := parseLoginResp(msg.Payload())
  47. if err != nil {
  48. baseapp.Logger.Errorf("[%s] 登录失败: %v!!", MODULE_NAME, err)
  49. return
  50. }
  51. if imei != modem.GetIMEI() { // 判断是否我的应答
  52. return
  53. }
  54. if len(r.inheritDUI) > 0 && r.inheritDUI == dui {
  55. _ = os.Remove(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt"))
  56. } else if len(r.inheritDUI) > 0 && r.inheritDUI != dui {
  57. baseapp.Logger.Errorf("[%s] 登录失败: 要继承的DUI(%s)与服务器返回的DUI(%s)不匹配!!", MODULE_NAME, r.inheritDUI, dui)
  58. return
  59. }
  60. r.dui = dui
  61. r.isLogin.Store(true)
  62. r.client.Unsubscribe("/yfkj/bxs-sy/server/rpc/response") // 登录成功后, 取消对登录应答的订阅(不关心是否成功)
  63. baseapp.Logger.Infof("[%s] 登录成功, 设备DUI: %s", MODULE_NAME, r.dui)
  64. r.uploadPhotosTask.Run(r.uploadPendingPhotos, true) // 补录上传历史遗存照片, 单实例运行
  65. r.reportLocTask.Run(r.reportLocation, true) // 定位成功通知当前位置, 单实例运行
  66. r.reportMCUTask.Run(func() { r.reportMCUParams(mcu.CfgParams) }, true) // 上报控制板的运行参数, 单实例运行
  67. }
  68. func (r *MQTTReporter) uploadPendingPhotos() {
  69. baseapp.Logger.Infof("[%s] 拍照补录上传任务开始", MODULE_NAME)
  70. nums := 0
  71. filepath.WalkDir(baseapp.IMG_DIR, func(path string, d fs.DirEntry, walkErr error) error {
  72. if walkErr != nil || d.IsDir() || filepath.Ext(path) != ".jpg" {
  73. return nil
  74. } else if !r.IsLogin() || r.ctx.Err() != nil {
  75. return context.Canceled
  76. }
  77. fileUploader.uploadLock.Lock() // 上传锁定(多个上传任务并发时, 保证串行执行) 2025-10-17
  78. defer fileUploader.uploadLock.Unlock()
  79. mcu.MCBSetCamStateBit(mcu.PhotoUploading)
  80. defer mcu.MCBSetCamStateBit(mcu.Idle)
  81. _, err := uploadFileToFtp(r.ctx, path, CfgServers.Img2Ftp.Address, CfgServers.Img2Ftp.Username, CfgServers.Img2Ftp.Password, defaultUploadTimeout)
  82. if err == nil {
  83. baseapp.Logger.Infof("[%s] 拍照补录上传成功, 本地文件: %q已删除", MODULE_NAME, path)
  84. os.Remove(path)
  85. nums++
  86. }
  87. return nil
  88. })
  89. baseapp.Logger.Infof("[%s] 拍照补录上传任务结束, 本次共上传%d张照片", MODULE_NAME, nums)
  90. }
  91. func (r *MQTTReporter) reportMCUParams(setParams *mcu.Config) {
  92. baseapp.Logger.Infof("[%s] 主动上报MCU参数任务开始", MODULE_NAME)
  93. for {
  94. if !r.IsLogin() || r.ctx.Err() != nil {
  95. return
  96. }
  97. msg := fmt.Sprintf(`{"imei": "%s", "version": "%s", "ctrlMode": "%d", "lightDuration": "%d", "startHour": "%d", "endHour": "%d", "takePhotoIntervalMinutes": "%d"}`,
  98. modem.GetIMEI(), setParams.Version, setParams.CtrlMode, setParams.LightDuration, setParams.StartHour, setParams.EndHour, setParams.TakePhotoInterval)
  99. token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/notify/%s/mcuParams", r.dui), r.msgQos, false, msg)
  100. select {
  101. case <-r.ctx.Done():
  102. return
  103. case <-token.Done():
  104. }
  105. if token.Error() != nil {
  106. continue
  107. }
  108. baseapp.Logger.Infof("[%s] 主动上报MCU参数任务结束(%q)", MODULE_NAME, msg)
  109. return
  110. }
  111. }
  112. func (r *MQTTReporter) reportLocation() {
  113. baseapp.Logger.Infof("[%s] 主动上报当前GPS位置信息任务开始", MODULE_NAME)
  114. for {
  115. if !r.IsLogin() || r.ctx.Err() != nil {
  116. return
  117. }
  118. lat, lon, err := gps.Get2DPosition()
  119. if err != nil {
  120. time.Sleep(defaultRtyInterval)
  121. continue
  122. }
  123. msg := fmt.Sprintf(`{"imei": "%s", "lat": "%s", "lon": "%s"}`, modem.GetIMEI(), lat, lon)
  124. token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/notify/%s/location", r.dui), r.msgQos, false, msg)
  125. select {
  126. case <-r.ctx.Done():
  127. return
  128. case <-token.Done():
  129. }
  130. if token.Error() != nil {
  131. continue
  132. }
  133. baseapp.Logger.Infof("[%s] 主动上报当前GPS位置信息任务结束(%q)", MODULE_NAME, msg)
  134. return
  135. }
  136. }
  137. func (r *MQTTReporter) handleRequest(client mqtt.Client, msg mqtt.Message) {
  138. go r.execRequest(msg)
  139. }
  140. func (r *MQTTReporter) execRequest(msg mqtt.Message) {
  141. var req rpcRequest
  142. reqStr := string(msg.Payload())
  143. baseapp.Logger.Debugf("[%s] 收到一个RPC请求: %s", MODULE_NAME, reqStr)
  144. var resp rpcResponse = rpcResponse{
  145. JSONRPC: "2.0",
  146. }
  147. if err := json.Unmarshal(msg.Payload(), &req); err != nil {
  148. resp.Error = &rpcError{Code: -32700, Message: "Parse error"}
  149. goto send_p
  150. } else {
  151. resp.ID = req.ID
  152. }
  153. switch req.Method {
  154. // Call-1: 设置MCU控制板运行参数, 保存到文件
  155. case "setMCUParams":
  156. params, err := parseMCUParams(msg.Payload())
  157. if err != nil {
  158. resp.Error = makeCustomError(err)
  159. goto send_p
  160. }
  161. var cfgParams mcu.Config
  162. cfgParams.Version = params.Version
  163. cfgParams.CtrlMode, _ = strconv.Atoi(params.CtrlMode)
  164. cfgParams.LightDuration, _ = strconv.Atoi(params.LightDuration)
  165. cfgParams.StartHour, _ = strconv.Atoi(params.StartHour)
  166. cfgParams.EndHour, _ = strconv.Atoi(params.EndHour)
  167. cfgParams.TakePhotoInterval, _ = strconv.Atoi(params.PhotoIntervalMinutes)
  168. err = mcu.SaveCfgParams(&cfgParams)
  169. if err != nil {
  170. resp.Error = makeCustomError(err)
  171. } else {
  172. resp.Result, _ = json.Marshal("Success")
  173. go r.reportMCUParams(&cfgParams)
  174. }
  175. // Call-2: 获取MCU控制板运行参数, 从文件读取
  176. case "getMCUParams":
  177. var params mcuParams
  178. params.Version = mcu.CfgParams.Version
  179. params.CtrlMode = fmt.Sprintf("%d", mcu.CfgParams.CtrlMode)
  180. params.LightDuration = fmt.Sprintf("%d", mcu.CfgParams.LightDuration)
  181. params.StartHour = fmt.Sprintf("%d", mcu.CfgParams.StartHour)
  182. params.EndHour = fmt.Sprintf("%d", mcu.CfgParams.EndHour)
  183. params.PhotoIntervalMinutes = fmt.Sprintf("%d", mcu.CfgParams.TakePhotoInterval)
  184. resp.Result, _ = json.Marshal(params)
  185. // Call-3, 异步请求MCU获取当前实时的环境数据
  186. case "getCurrentEnvData":
  187. err := mcu.MCBReqEnvCurData()
  188. if err != nil {
  189. resp.Error = makeCustomError(err)
  190. } else {
  191. resp.Result, _ = json.Marshal("Success")
  192. }
  193. // Call-4: 同步执行相机拍照, 比较耗时, 需等待
  194. case "takephoto":
  195. remoteFile, err := TakePhotoAndUpToFtp(r.ctx)
  196. if err != nil {
  197. resp.Error = makeCustomError(err)
  198. } else {
  199. resp.Result, _ = json.Marshal(map[string]string{"filename": remoteFile})
  200. }
  201. // Call-5:重启, 服务端不一定能收到成功的应答
  202. case "reboot":
  203. cmd := exec.Command("reboot")
  204. if err := cmd.Run(); err != nil {
  205. resp.Error = &rpcError{Code: -32700, Message: "Failed"}
  206. } else {
  207. resp.Result, _ = json.Marshal("Success")
  208. }
  209. // Call-6: 获取RTU当前的位置信息(2D经纬度坐标)
  210. case "getLocation":
  211. lat, lon, err := gps.Get2DPosition()
  212. if err != nil {
  213. resp.Error = makeCustomError(err)
  214. } else {
  215. resp.Result, _ = json.Marshal(map[string]string{"lat": lat, "lon": lon})
  216. }
  217. // Call-7: 升级RTU固件, 升级后程序会自动退出, 靠守护进程重新拉起, 替换新程序
  218. case "upgradeApp":
  219. serverAddr, loginUser, loginPass, remoteFile, md5val1, err := parseUpgradeAppResp(msg.Payload())
  220. if err != nil {
  221. resp.Error = makeCustomError(err)
  222. goto send_p
  223. }
  224. mcu.MCBSetMNTStateBit(mcu.AppUpgrading)
  225. localFile, err := downloadFileFromFtp(r.ctx, serverAddr, loginUser, loginPass, remoteFile, defaultDownloadTimeout)
  226. if err != nil {
  227. resp.Error = makeCustomError(err)
  228. mcu.MCBSetMNTStateBit(mcu.Idle)
  229. goto send_p
  230. }
  231. md5val2, err := fileMD5(localFile)
  232. if err != nil {
  233. resp.Error = makeCustomError(err)
  234. mcu.MCBSetMNTStateBit(mcu.Idle)
  235. goto send_p
  236. }
  237. if !strings.EqualFold(md5val1, md5val2) {
  238. resp.Error = &rpcError{Code: -32001, Message: fmt.Sprintf("MD5校验失败(%s)", md5val2)}
  239. mcu.MCBSetMNTStateBit(mcu.Idle)
  240. goto send_p
  241. }
  242. // 打包: tar czf rtu_bxs_seyou_1.0.0.1.tar.gz ./rtu_bxs_seyou.out
  243. // 解包: tar xzf rtu_bxs_seyou_1.0.0.1.tar.gz -C baseapp.EXEC_DIR
  244. cmdStr := "tar xzf " + localFile + " -C " + baseapp.EXEC_DIR
  245. cmd := exec.Command("sh", "-c", cmdStr)
  246. cmd.Stdout = os.Stdout
  247. cmd.Stderr = os.Stderr
  248. err = cmd.Run()
  249. os.Remove(localFile)
  250. if err != nil {
  251. resp.Error = &rpcError{Code: -32001, Message: "解压升级包失败"}
  252. mcu.MCBSetMNTStateBit(mcu.Idle)
  253. goto send_p
  254. }
  255. resp.Result, _ = json.Marshal("Success")
  256. mcu.MCBSetMNTStateBit(mcu.Idle)
  257. default: // 返回不支持的方法调用(不支持的请求)
  258. resp.Error = &rpcError{Code: -32601, Message: "Method not found"}
  259. }
  260. send_p:
  261. payload, _ := json.Marshal(resp)
  262. token := r.client.Publish(fmt.Sprintf("/yfkj/bxs-sy/device/rpc/%s/response", r.dui), r.msgQos, false, payload)
  263. select {
  264. case <-r.ctx.Done():
  265. return
  266. case <-token.Done():
  267. }
  268. if token.Error() == nil {
  269. baseapp.Logger.Debugf("[%s] 发送RPC应答成功: %s", MODULE_NAME, string(payload))
  270. } else {
  271. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, token.Error())
  272. }
  273. }
  274. func (r *MQTTReporter) runLoop() {
  275. logoutMsg, _ := makeLogoutMsg(modem.GetIMEI())
  276. opts := mqtt.NewClientOptions().
  277. AddBroker(CfgServers.MQTTSrv.Address).SetUsername(CfgServers.MQTTSrv.Username).SetPassword(CfgServers.MQTTSrv.Password).
  278. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).SetKeepAlive(sendPingReqInterval).SetPingTimeout(waitPingRespTimeout).
  279. SetOrderMatters(false).SetWill("/yfkj/bxs-sy/notify/logout", string(logoutMsg), r.msgQos, false)
  280. opts.OnConnect = func(c mqtt.Client) {
  281. baseapp.Logger.Infof("[%s] 服务器连接成功", MODULE_NAME)
  282. }
  283. opts.OnConnectionLost = func(c mqtt.Client, err error) {
  284. if r.isLogin.Swap(false) {
  285. baseapp.Logger.Warnf("[%s] 服务器连接丢失: %v!", MODULE_NAME, err)
  286. }
  287. }
  288. r.client = mqtt.NewClient(opts)
  289. ticker := time.NewTicker(1 * time.Second)
  290. defer ticker.Stop()
  291. for {
  292. if r.IsLogin() {
  293. select {
  294. case <-r.ctx.Done():
  295. return
  296. case <-ticker.C: // 唤醒定期检测
  297. continue
  298. }
  299. }
  300. wStep1: // 1, 连接到服务器
  301. if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
  302. token := r.client.Connect()
  303. select {
  304. case <-r.ctx.Done():
  305. return
  306. case <-token.Done():
  307. }
  308. if token.Error() != nil {
  309. baseapp.Logger.Errorf("[%s] 服务器连接失败: %v!!", MODULE_NAME, token.Error())
  310. select {
  311. case <-r.ctx.Done():
  312. return
  313. case <-time.After(3 * time.Second):
  314. }
  315. goto wStep1
  316. }
  317. }
  318. wStep2: // 2, 订阅登录应答
  319. token := r.client.Subscribe("/yfkj/bxs-sy/server/rpc/response", r.msgQos, r.onLogin)
  320. select {
  321. case <-r.ctx.Done():
  322. return
  323. case <-token.Done():
  324. }
  325. if token.Error() != nil {
  326. baseapp.Logger.Errorf("[%s] 登录应答订阅失败: %v!!", MODULE_NAME, token.Error())
  327. select {
  328. case <-r.ctx.Done():
  329. return
  330. case <-time.After(3 * time.Second):
  331. }
  332. if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
  333. goto wStep1
  334. } else {
  335. goto wStep2
  336. }
  337. }
  338. wStep3: // 3, 发送登录请求
  339. loginReq, _ := makeLoginReq(modem.GetIMEI(), modem.GetSimICCID(), modem.GetRSSI(), baseapp.Version, r.inheritDUI)
  340. token = r.client.Publish("/yfkj/bxs-sy/server/rpc/request", r.msgQos, false, loginReq)
  341. select {
  342. case <-r.ctx.Done():
  343. return
  344. case <-token.Done():
  345. }
  346. if token.Error() != nil {
  347. baseapp.Logger.Errorf("[%s] 登录请求发送失败: %v!!", MODULE_NAME, token.Error())
  348. select {
  349. case <-r.ctx.Done():
  350. return
  351. case <-time.After(3 * time.Second):
  352. }
  353. if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
  354. goto wStep1
  355. } else {
  356. goto wStep3
  357. }
  358. }
  359. tc := time.NewTicker(500 * time.Millisecond)
  360. tt := time.NewTimer(5 * time.Second) // 超时
  361. for !r.IsLogin() {
  362. select {
  363. case <-r.ctx.Done():
  364. tc.Stop()
  365. tt.Stop()
  366. return
  367. case <-tc.C: // 唤醒定期检测
  368. case <-tt.C:
  369. baseapp.Logger.Warnf("[%s] 登录超时,重新发送登录请求...", MODULE_NAME)
  370. if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
  371. goto wStep1
  372. } else {
  373. goto wStep3
  374. }
  375. }
  376. }
  377. tc.Stop()
  378. tt.Stop()
  379. wStep4: // 4, 注册本地方法
  380. token = r.client.Subscribe(fmt.Sprintf("/yfkj/bxs-sy/device/rpc/%s/request", r.dui), r.msgQos, r.handleRequest)
  381. select {
  382. case <-r.ctx.Done():
  383. return
  384. case <-token.Done():
  385. }
  386. if token.Error() != nil {
  387. baseapp.Logger.Errorf("[%s] 注册本地RPC方法失败: %v!!", MODULE_NAME, token.Error())
  388. select {
  389. case <-r.ctx.Done():
  390. return
  391. case <-time.After(3 * time.Second):
  392. }
  393. if !r.client.IsConnected() && !r.client.IsConnectionOpen() {
  394. goto wStep1
  395. } else {
  396. goto wStep4
  397. }
  398. }
  399. } // for end
  400. }
  401. func ModuleInit() bool {
  402. err := loadCfgServers()
  403. if err != nil {
  404. baseapp.Logger.Errorf("[%s] 加载服务器配置项失败: %v!!", MODULE_NAME, err)
  405. return false
  406. }
  407. inheritDUI := "" // 换RTU板卡时, 继承历史ID, 可选可为空
  408. if data, err := os.ReadFile(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt")); err == nil {
  409. inheritDUI = strings.TrimSpace(string(data))
  410. if len(inheritDUI) != 14 || !isDecimal(inheritDUI) {
  411. baseapp.Logger.Errorf("[%s] 文件中的DUI: %q 无效, 无法继承历史台账数据!!", MODULE_NAME, inheritDUI)
  412. os.Exit(1)
  413. }
  414. }
  415. ctx, cancel := context.WithCancel(context.Background())
  416. Reporter = &MQTTReporter{ctx: ctx, cancel: cancel, msgQos: 1, inheritDUI: inheritDUI,
  417. uploadPhotosTask: &SingleTask{}, reportMCUTask: &SingleTask{}, reportLocTask: &SingleTask{}}
  418. go Reporter.runLoop()
  419. go LoopTakePhoto(Reporter.ctx)
  420. go LoopRecvEnvDataGrp(Reporter.ctx)
  421. go LoopRecvEnvDataOne(Reporter.ctx)
  422. return true
  423. }
  424. func ModuleExit() {
  425. if Reporter != nil {
  426. Reporter.cancel()
  427. }
  428. }
  429. //export RTU_IsLoginMqServer
  430. func RTU_IsLoginMqServer() C.int {
  431. if Reporter != nil && Reporter.IsLogin() {
  432. return 1
  433. }
  434. return 0
  435. }