reporter.go 13 KB


  1. package reporter
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io/fs"
  7. "os"
  8. "os/exec"
  9. "path/filepath"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/sirupsen/logrus"
  15. mqtt "github.com/eclipse/paho.mqtt.golang"
  16. gps "hnyfkj.com.cn/rtu/linux/air530z"
  17. mcu "hnyfkj.com.cn/rtu/xy_v/mcu_ctrl_board"
  18. "hnyfkj.com.cn/rtu/linux/baseapp"
  19. "hnyfkj.com.cn/rtu/linux/netmgrd"
  20. "hnyfkj.com.cn/rtu/linux/utils/ftpclient"
  21. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  22. "hnyfkj.com.cn/rtu/linux/utils/singletask"
  23. )
  24. const MODULE_NAME = "Reporter"
  25. var (
  26. Reporter *MQTTReporter
  27. )
  28. const (
  29. MqttQos1 byte = 1 // 消息至少送达一次
  30. FastInterval = 1 * time.Second // 快速检测时间间隔
  31. SlowInterval = 5 * time.Second // 慢速检测时间间隔
  32. )
  33. type MQTTReporter struct {
  34. client mqtt.Client
  35. ctx context.Context
  36. cancel context.CancelFunc
  37. isLogin atomic.Bool // 标记是否已成功登录MQTT后端服务器
  38. dui string // 登录成功后服务端返回的设备唯一ID
  39. inheritDUI string // 继承的历史ID, 可选可为空(换板时)
  40. sensorFileLock sync.Mutex // 用于保护"传感器"数据文件的读和写
  41. // 主动上报的后台任务, 登录成功时用于照片补录和上报通知类消息
  42. reuploadHistTask *singletask.OnceTask // 补录数据, 单实例
  43. reportMcuCfgTask *singletask.OnceTask // 上报配置, 单实例
  44. reportRtuPosTask *singletask.OnceTask // 上报位置, 单实例
  45. // 注册本地的远程方法, 登录成功后用于让服务端能够主动下发指令
  46. registerRpcMeths *singletask.OnceTask // 注册方法, 单实例
  47. }
  48. func ModuleInit() bool {
  49. err := loadCfgServers()
  50. if err != nil {
  51. baseapp.Logger.Errorf("[%s] 加载服务器配置项失败: %v!!", MODULE_NAME, err)
  52. return false
  53. }
  54. inheritDUI := "" // 更换数据板时, 要继承的历史ID, (可选)可为空
  55. if data, err := os.ReadFile(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt")); err == nil {
  56. inheritDUI = strings.TrimSpace(string(data))
  57. if len(inheritDUI) != 14 || !IsDecimal(inheritDUI) {
  58. baseapp.Logger.Errorf("[%s] 文件中的DUI: %q 无效, 无法继承历史台账数据!!", MODULE_NAME, inheritDUI)
  59. os.Exit(1)
  60. }
  61. }
  62. ctx, cancel := context.WithCancel(context.Background())
  63. Reporter = &MQTTReporter{
  64. client: nil,
  65. isLogin: atomic.Bool{},
  66. ctx: ctx,
  67. cancel: cancel,
  68. dui: "",
  69. inheritDUI: inheritDUI,
  70. reuploadHistTask: &singletask.OnceTask{},
  71. reportMcuCfgTask: &singletask.OnceTask{},
  72. reportRtuPosTask: &singletask.OnceTask{},
  73. registerRpcMeths: &singletask.OnceTask{},
  74. }
  75. Reporter.init() // 1, Reporter执行初始化,开始就要调用
  76. go Reporter.keepOnline() // 2, Reporter主处理循环,维持登录状态
  77. go Reporter.loopTakePhoto() // 3, 循环处理控制板发来的相机拍照请求
  78. go Reporter.loopRecvOneEnvData() // 4, 循环处理控制板发来的单条环境数据
  79. return true
  80. }
  81. func ModuleExit() {
  82. if Reporter != nil {
  83. Reporter.cancel()
  84. }
  85. }
  86. func (r *MQTTReporter) init() {
  87. logoutNotif, _ := makeLogoutNotif(netmgrd.GetIMEI())
  88. opts := mqtt.NewClientOptions().
  89. AddBroker(CfgServers.MQTTSrv.Address).
  90. SetUsername(CfgServers.MQTTSrv.Username).SetPassword(CfgServers.MQTTSrv.Password).
  91. SetConnectRetry(false).SetAutoReconnect(false).SetCleanSession(true).
  92. SetKeepAlive(10*time.Second).SetPingTimeout(5*time.Second). // Ping心跳间隔, 超时时间
  93. SetOrderMatters(false). /*离线遗愿消息*/ SetWill("/yfkj/xy-v/server/notify", string(logoutNotif), MqttQos1, false)
  94. opts.OnConnect = func(c mqtt.Client) { baseapp.Logger.Infof("[%s] 服务器连接成功", MODULE_NAME) }
  95. opts.OnConnectionLost = func(c mqtt.Client, err error) {
  96. if r.isLogin.Swap(false) {
  97. baseapp.Logger.Warnf("[%s] 服务器连接丢失: %v!", MODULE_NAME, err)
  98. }
  99. }
  100. r.client = mqtt.NewClient(opts)
  101. }
  102. func (r *MQTTReporter) keepOnline() {
  103. t := time.NewTimer(FastInterval)
  104. defer t.Stop()
  105. for {
  106. select {
  107. case <-r.ctx.Done():
  108. return
  109. case <-t.C:
  110. t.Reset(r.tick())
  111. } // end select
  112. } // end for
  113. }
  114. func (r *MQTTReporter) tick() time.Duration {
  115. if r.IsLogin() {
  116. return FastInterval
  117. }
  118. baseapp.Logger.Infof("[%s] 尝试连接、登录服务器(%s)...", MODULE_NAME, CfgServers.MQTTSrv.Address)
  119. if err := r.connect(); err != nil {
  120. baseapp.Logger.Errorf("[%s] 无法连接上服务器: %v!!", MODULE_NAME, err)
  121. } else if err := r.subLoginResp(); err != nil {
  122. baseapp.Logger.Errorf("[%s] 订阅登录应答失败: %v!!", MODULE_NAME, err)
  123. } else if err := r.sendLoginReq(); err != nil {
  124. baseapp.Logger.Errorf("[%s] 发送登录请求失败: %v!!", MODULE_NAME, err)
  125. }
  126. return SlowInterval
  127. }
  128. func (r *MQTTReporter) connect() error {
  129. if r.client.IsConnected() {
  130. return nil
  131. }
  132. token := r.client.Connect()
  133. select {
  134. case <-r.ctx.Done():
  135. return nil
  136. case <-token.Done():
  137. }
  138. return token.Error()
  139. }
  140. func (r *MQTTReporter) subLoginResp() error {
  141. token := r.client.Subscribe("/yfkj/xy-v/server/rpc/response", MqttQos1, r.onLogin)
  142. select {
  143. case <-r.ctx.Done():
  144. return nil
  145. case <-token.Done():
  146. }
  147. return token.Error()
  148. }
  149. func (r *MQTTReporter) onLogin(client mqtt.Client, msg mqtt.Message) {
  150. imei, dui, err := parseLoginResp(msg.Payload())
  151. if err != nil {
  152. baseapp.Logger.Errorf("[%s] 设备登录失败: %v!!", MODULE_NAME, err)
  153. return
  154. }
  155. if imei != netmgrd.GetIMEI() { // 判断是否我的应答
  156. return
  157. }
  158. if len(r.inheritDUI) > 0 && r.inheritDUI == dui {
  159. _ = os.Remove(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt"))
  160. } else if len(r.inheritDUI) > 0 && r.inheritDUI != dui {
  161. baseapp.Logger.Errorf("[%s] 设备登录失败: 要继承的DUI(%s)与服务器返回的DUI(%s)不匹配!!", MODULE_NAME, r.inheritDUI, dui)
  162. return
  163. }
  164. r.dui = dui
  165. r.isLogin.Store(true)
  166. r.client.Unsubscribe("/yfkj/xy-v/server/rpc/response") // 登录成功后, 取消对登录应答的订阅(不关心是否已取消)
  167. baseapp.Logger.Infof("[%s] 设备登录成功, DUI: %s", MODULE_NAME, r.dui)
  168. ftpclient.FileUpFolder = dui // 设置默认的照片上传目录
  169. // 启动执行多个异步任务
  170. r.registerRpcMeths.Run(r.instRPCMethods, true) // 注册方法, 供远程调用, 单实例运行
  171. r.reuploadHistTask.Run(r.reuploadLegacy, true) // 补录上传历史遗存数据, 单实例运行
  172. r.reportMcuCfgTask.Run(func() { r.reportMcuCfg(mcu.CfgParams) }, true) // 上报控制板的运行参数, 单实例运行
  173. r.reportRtuPosTask.Run(r.reportLocation, true) // 定位成功上报当前位置, 单实例运行
  174. }
  175. func (r *MQTTReporter) reuploadLegacy() {
  176. baseapp.Logger.Infof("[%s] ++数据补录开始++", MODULE_NAME)
  177. nums1 := 0
  178. filename := "sensor_data.json"
  179. if FileExists(filename) {
  180. if nums1 = r.reportSensorHist(filename); nums1 >= 0 {
  181. os.Remove(filename) // 上传成功后删除历史数据文件
  182. } else {
  183. nums1 = 0
  184. }
  185. }
  186. baseapp.Logger.Infof("[%s] --数据补录结束--, 上传条数: %d", MODULE_NAME, nums1)
  187. baseapp.Logger.Infof("[%s] ++照片补录开始++", MODULE_NAME)
  188. nums2 := 0
  189. filepath.WalkDir(baseapp.IMG_DIR, func(path string, d fs.DirEntry, walkErr error) error {
  190. if walkErr != nil || d.IsDir() || filepath.Ext(path) != ".jpg" {
  191. return nil
  192. } else if !r.IsLogin() || r.ctx.Err() != nil {
  193. return context.Canceled
  194. }
  195. ftpclient.FileUploader.Lock() // 上传锁定(多个上传任务并发时, 保证串行执行)
  196. defer ftpclient.FileUploader.Unlock()
  197. mcu.GlobalWorkState.Add(mcu.PhotoUploading)
  198. defer mcu.GlobalWorkState.Remove(mcu.PhotoUploading)
  199. _, err := ftpclient.UploadFileToFtp(r.ctx, path, CfgServers.Img2Ftp.Address, CfgServers.Img2Ftp.Username, CfgServers.Img2Ftp.Password, ftpclient.DefaultUploadTimeout)
  200. if err == nil {
  201. baseapp.Logger.Infof("[%s] 照片上传成功, 本地文件: %q已删除", MODULE_NAME, path)
  202. os.Remove(path)
  203. nums2++
  204. }
  205. return nil
  206. })
  207. baseapp.Logger.Infof("[%s] --照片补录结束--, 上传张数: %d", MODULE_NAME, nums2)
  208. }
  209. func (r *MQTTReporter) reportMcuCfg(cfg *mcu.Config) {
  210. baseapp.Logger.Infof("[%s] ++上报配置参数++", MODULE_NAME)
  211. for {
  212. if !r.IsLogin() || r.ctx.Err() != nil {
  213. return
  214. }
  215. req, err := jsonrpc2.BuildNotification("report_config", cfg)
  216. if err != nil {
  217. baseapp.Logger.Errorf("[%s] 构建通知失败: %v!!", MODULE_NAME, err)
  218. return
  219. }
  220. msg, err := json.Marshal(req)
  221. if err != nil {
  222. baseapp.Logger.Errorf("[%s] 编码通知失败: %v!!", MODULE_NAME, err)
  223. return
  224. }
  225. text := string(msg)
  226. token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
  227. select {
  228. case <-r.ctx.Done():
  229. return
  230. case <-token.Done():
  231. }
  232. if token.Error() != nil {
  233. continue
  234. }
  235. baseapp.Logger.Infof("[%s] --上报配置成功--, 报文内容: %s", MODULE_NAME, text)
  236. return
  237. }
  238. }
  239. func (r *MQTTReporter) reportLocation() {
  240. baseapp.Logger.Infof("[%s] ++上报当前位置++", MODULE_NAME)
  241. t := time.NewTicker(time.Second)
  242. defer t.Stop()
  243. for {
  244. if !r.IsLogin() || r.ctx.Err() != nil {
  245. return
  246. }
  247. lat, lon, err := gps.Get2DPosition()
  248. if err != nil {
  249. select {
  250. case <-r.ctx.Done():
  251. return
  252. case <-t.C:
  253. continue
  254. }
  255. }
  256. text := fmt.Sprintf(`{ "method": "report_location", "params": {"lat": "%s", "lon": "%s"} }`, lat, lon)
  257. token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
  258. select {
  259. case <-r.ctx.Done():
  260. return
  261. case <-token.Done():
  262. }
  263. if token.Error() != nil {
  264. continue
  265. }
  266. baseapp.Logger.Infof("[%s] --上报位置成功--, 报文内容: %s", MODULE_NAME, text)
  267. return
  268. }
  269. }
  270. func (r *MQTTReporter) instRPCMethods() {
  271. t := time.NewTicker(time.Second)
  272. defer t.Stop()
  273. for {
  274. if !r.IsLogin() || r.ctx.Err() != nil {
  275. return
  276. }
  277. token := r.client.Subscribe(fmt.Sprintf("/yfkj/xy-v/device/rpc/%s/request", r.dui), MqttQos1, r.handleRequests)
  278. select {
  279. case <-r.ctx.Done():
  280. return
  281. case <-token.Done():
  282. }
  283. if token.Error() == nil {
  284. baseapp.Logger.Infof("[%s] 本地RPC方法已注册, 等待远程调用...", MODULE_NAME)
  285. break
  286. }
  287. select {
  288. case <-r.ctx.Done():
  289. return
  290. case <-t.C:
  291. continue
  292. }
  293. }
  294. }
  295. func (r *MQTTReporter) sendLoginReq() error {
  296. loginReq, err := makeLoginReq(netmgrd.GetIMEI(), netmgrd.GetSimICCID(), netmgrd.GetRSSI(), baseapp.Version, r.inheritDUI)
  297. if err != nil {
  298. return err
  299. }
  300. token := r.client.Publish("/yfkj/xy-v/server/rpc/request", MqttQos1, false, loginReq)
  301. select {
  302. case <-r.ctx.Done():
  303. return nil
  304. case <-token.Done():
  305. }
  306. return token.Error()
  307. }
  308. func (r *MQTTReporter) IsLogin() bool {
  309. return r.isLogin.Load()
  310. }
  311. func (r *MQTTReporter) handleRequests(client mqtt.Client, msg mqtt.Message) {
  312. go r.execOneCmd(msg)
  313. }
  314. func (r *MQTTReporter) execOneCmd(msg mqtt.Message) {
  315. str := string(msg.Payload())
  316. baseapp.Logger.Infof("[%s] 收到一个RPC请求: %s", MODULE_NAME, str)
  317. var resp *jsonrpc2.Response // 预定义一个空的应答
  318. req, err := jsonrpc2.ParseRequest(str)
  319. if err != nil || req.ID == nil /* 不接受通知类型的消息 */ {
  320. resp = jsonrpc2.BuildError(nil, jsonrpc2.ErrParse, "")
  321. goto retp
  322. }
  323. switch req.Method {
  324. // 下发配置参数
  325. case "set_config":
  326. var cfg mcu.Config
  327. err := json.Unmarshal(req.Params, &cfg)
  328. if err == nil {
  329. err = mcu.SaveCfgParams(&cfg)
  330. }
  331. if err == nil { // 上报当前配置, 单实例运行(平台侧不处理错误应答、错误时不回滚之前的配置, 就需要重新上报)
  332. r.reportMcuCfgTask.Run(func() { r.reportMcuCfg(mcu.CfgParams) }, true)
  333. }
  334. resp = buildResp(req, "Success", err)
  335. // 下发拍照指令
  336. case "take_photo":
  337. rf, err := r.takePhotoAndUpToFtp()
  338. resp = buildResp(req, map[string]string{"filename": rf}, err)
  339. // 下发升级指令
  340. case "upgrade_app":
  341. server, user, pass, rf, md5val1, err := parseUpgradeAppReq(req.Params)
  342. if err != nil {
  343. resp = jsonrpc2.BuildError(req, -32700, err.Error())
  344. break
  345. }
  346. mcu.GlobalWorkState.Add(mcu.AppUpgrading)
  347. defer mcu.GlobalWorkState.Remove(mcu.AppUpgrading)
  348. lf, err := ftpclient.DownloadFileFromFtp(r.ctx, server, user, pass, rf, ftpclient.DefaultDownloadTimeout)
  349. if err != nil {
  350. resp = jsonrpc2.BuildError(req, -32700, err.Error())
  351. break
  352. }
  353. md5val2, err := FileMD5(lf)
  354. if err != nil {
  355. resp = jsonrpc2.BuildError(req, -32700, err.Error())
  356. break
  357. }
  358. if !strings.EqualFold(md5val1, md5val2) {
  359. resp = jsonrpc2.BuildError(req, -32700, fmt.Sprintf("MD5校验失败(%s != %s)", md5val1, md5val2))
  360. break
  361. }
  362. cmdStr := "tar xzf " + lf + " -C " + filepath.Dir(baseapp.EXEC_DIR)
  363. cmd := exec.Command("sh", "-c", cmdStr)
  364. cmd.Stdout = baseapp.Logger.WriterLevel(logrus.DebugLevel)
  365. cmd.Stderr = baseapp.Logger.WriterLevel(logrus.ErrorLevel)
  366. err = cmd.Run()
  367. os.Remove(lf)
  368. resp = buildResp(req, "Success", err)
  369. // 下发重启指令
  370. case "reboot":
  371. err := exec.Command("reboot").Run()
  372. resp = buildResp(req, "Success", err)
  373. // 调用无效方法
  374. default:
  375. resp = jsonrpc2.BuildError(req, jsonrpc2.ErrMethodNotFound, "")
  376. }
  377. retp:
  378. text, err := resp.String()
  379. if err != nil {
  380. baseapp.Logger.Errorf("[%s] 转换RPC应答失败: %v!!", MODULE_NAME, err)
  381. return
  382. }
  383. token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/device/rpc/%s/response", r.dui), MqttQos1, false, text)
  384. select {
  385. case <-r.ctx.Done():
  386. return
  387. case <-token.Done():
  388. }
  389. if err := token.Error(); err != nil {
  390. baseapp.Logger.Errorf("[%s] 发送RPC应答失败: %v!!", MODULE_NAME, err)
  391. }
  392. baseapp.Logger.Infof("[%s] 发送一个RPC应答, 报文内容: %s", MODULE_NAME, text)
  393. }