envdata.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package reporter
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "time"
  8. "hnyfkj.com.cn/rtu/linux/baseapp"
  9. "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
  10. mcu "hnyfkj.com.cn/rtu/xy_v/mcu_ctrl_board"
  11. )
  12. func (r *MQTTReporter) loopRecvOneEnvData() {
  13. for {
  14. select {
  15. case envData := <-mcu.Board.OneEnvDataCh:
  16. baseapp.Logger.Infof("[%s] 收到一条环境数据: %s", MODULE_NAME, envData.String())
  17. r.reportSensorData(envData)
  18. case <-r.ctx.Done():
  19. return
  20. } // end select
  21. } // end for
  22. }
  23. func (r *MQTTReporter) reportSensorData(envData *mcu.EnvSensorData) {
  24. mcu.GlobalWorkState.Add(mcu.SensorDataUploading)
  25. defer mcu.GlobalWorkState.Remove(mcu.SensorDataUploading)
  26. now := time.Now()
  27. uploadingData := []*mcu.EnvSensorData{envData}
  28. req, err := jsonrpc2.BuildNotification("report_sensor_data", uploadingData)
  29. if err != nil {
  30. baseapp.Logger.Errorf("[%s] 构建通知失败: %v!!", MODULE_NAME, err)
  31. return
  32. }
  33. msg, err := json.Marshal(req)
  34. if err != nil {
  35. baseapp.Logger.Errorf("[%s] 编码通知失败: %v!!", MODULE_NAME, err)
  36. return
  37. }
  38. text := string(msg)
  39. token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
  40. select {
  41. case <-r.ctx.Done():
  42. return
  43. case <-token.Done():
  44. }
  45. if token.Error() != nil {
  46. baseapp.Logger.Errorf("[%s] 环境数据上传失败: %v!!", MODULE_NAME, token.Error())
  47. if err := r.saveSensorData(envData, "sensor_data.json"); err != nil {
  48. baseapp.Logger.Errorf("[%s] 转存环境数据失败: %v!!", MODULE_NAME, err)
  49. }
  50. return
  51. }
  52. elapsed := time.Since(now).Milliseconds()
  53. baseapp.Logger.Infof("[%s] 环境数据上传成功, 用时: %d毫秒", MODULE_NAME, elapsed)
  54. }
  55. func (r *MQTTReporter) reportSensorHist(filePath string) int {
  56. mcu.GlobalWorkState.Add(mcu.SensorHistUploading)
  57. defer mcu.GlobalWorkState.Remove(mcu.SensorHistUploading)
  58. uploadingData, err := r.loadSensorData(filePath)
  59. if err != nil {
  60. baseapp.Logger.Errorf("[%s] 加载环境数据失败: %v!!", MODULE_NAME, err)
  61. return -1
  62. }
  63. nums := len(uploadingData)
  64. if nums == 0 { // 没有历史环境数据
  65. return 0
  66. }
  67. req, err := jsonrpc2.BuildNotification("report_sensor_data", uploadingData)
  68. if err != nil {
  69. baseapp.Logger.Errorf("[%s] 构建通知失败: %v!!", MODULE_NAME, err)
  70. return -1
  71. }
  72. msg, err := json.Marshal(req)
  73. if err != nil {
  74. baseapp.Logger.Errorf("[%s] 编码通知失败: %v!!", MODULE_NAME, err)
  75. return -1
  76. }
  77. text := string(msg)
  78. token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
  79. select {
  80. case <-r.ctx.Done():
  81. return -1
  82. case <-token.Done():
  83. }
  84. if token.Error() != nil {
  85. baseapp.Logger.Errorf("[%s] 环境数据上传失败: %v!!", MODULE_NAME, token.Error())
  86. return -1
  87. }
  88. return nums
  89. }
  90. func (r *MQTTReporter) loadSensorData(filePath string) ([]*mcu.EnvSensorData, error) {
  91. r.sensorFileLock.Lock()
  92. defer r.sensorFileLock.Unlock()
  93. var existingData []*mcu.EnvSensorData
  94. f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0644)
  95. if err != nil {
  96. return nil, err
  97. }
  98. defer f.Close()
  99. decoder := json.NewDecoder(f)
  100. if err := decoder.Decode(&existingData); err != nil && err != io.EOF {
  101. return nil, err
  102. }
  103. return existingData, nil
  104. }
  105. func (r *MQTTReporter) saveSensorData(envData *mcu.EnvSensorData, filePath string) error {
  106. existingData, err := r.loadSensorData(filePath)
  107. if err != nil {
  108. return err
  109. }
  110. r.sensorFileLock.Lock()
  111. defer r.sensorFileLock.Unlock()
  112. if len(existingData) >= 1000 {
  113. existingData = existingData[1:] // 删除最早的一条记录(避免磁盘空间不足)
  114. }
  115. existingData = append(existingData, envData)
  116. f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0644)
  117. if err != nil {
  118. return err
  119. }
  120. defer f.Close()
  121. err = f.Truncate(0)
  122. if err != nil { // 清空现有的文件内容
  123. return err
  124. }
  125. _, err = f.Seek(0, 0)
  126. if err != nil { // 移动文件指针到开头
  127. return err
  128. }
  129. encoder := json.NewEncoder(f)
  130. encoder.SetIndent("", " ") // 美化输出格式
  131. if err := encoder.Encode(existingData); err != nil {
  132. return err
  133. }
  134. return nil
  135. }