| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package reporter
- import (
- "encoding/json"
- "fmt"
- "io"
- "os"
- "time"
- "hnyfkj.com.cn/rtu/linux/baseapp"
- "hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
- mcu "hnyfkj.com.cn/rtu/xy_v/mcu_ctrl_board"
- )
- func (r *MQTTReporter) loopRecvOneEnvData() {
- for {
- select {
- case envData := <-mcu.Board.OneEnvDataCh:
- baseapp.Logger.Infof("[%s] 收到一条环境数据: %s", MODULE_NAME, envData.String())
- r.reportSensorData(envData)
- case <-r.ctx.Done():
- return
- } // end select
- } // end for
- }
- func (r *MQTTReporter) reportSensorData(envData *mcu.EnvSensorData) {
- mcu.GlobalWorkState.Add(mcu.SensorDataUploading)
- defer mcu.GlobalWorkState.Remove(mcu.SensorDataUploading)
- now := time.Now()
- uploadingData := []*mcu.EnvSensorData{envData}
- req, err := jsonrpc2.BuildNotification("report_sensor_data", uploadingData)
- if err != nil {
- baseapp.Logger.Errorf("[%s] 构建通知失败: %v!!", MODULE_NAME, err)
- return
- }
- msg, err := json.Marshal(req)
- if err != nil {
- baseapp.Logger.Errorf("[%s] 编码通知失败: %v!!", MODULE_NAME, err)
- return
- }
- text := string(msg)
- token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
- select {
- case <-r.ctx.Done():
- return
- case <-token.Done():
- }
- if token.Error() != nil {
- baseapp.Logger.Errorf("[%s] 环境数据上传失败: %v!!", MODULE_NAME, token.Error())
- if err := r.saveSensorData(envData, "sensor_data.json"); err != nil {
- baseapp.Logger.Errorf("[%s] 转存环境数据失败: %v!!", MODULE_NAME, err)
- }
- return
- }
- elapsed := time.Since(now).Milliseconds()
- baseapp.Logger.Infof("[%s] 环境数据上传成功, 用时: %d毫秒", MODULE_NAME, elapsed)
- }
- func (r *MQTTReporter) reportSensorHist(filePath string) int {
- mcu.GlobalWorkState.Add(mcu.SensorHistUploading)
- defer mcu.GlobalWorkState.Remove(mcu.SensorHistUploading)
- uploadingData, err := r.loadSensorData(filePath)
- if err != nil {
- baseapp.Logger.Errorf("[%s] 加载环境数据失败: %v!!", MODULE_NAME, err)
- return -1
- }
- nums := len(uploadingData)
- if nums == 0 { // 没有历史环境数据
- return 0
- }
- req, err := jsonrpc2.BuildNotification("report_sensor_data", uploadingData)
- if err != nil {
- baseapp.Logger.Errorf("[%s] 构建通知失败: %v!!", MODULE_NAME, err)
- return -1
- }
- msg, err := json.Marshal(req)
- if err != nil {
- baseapp.Logger.Errorf("[%s] 编码通知失败: %v!!", MODULE_NAME, err)
- return -1
- }
- text := string(msg)
- token := r.client.Publish(fmt.Sprintf("/yfkj/xy-v/notify/%s", r.dui), MqttQos1, false, text)
- select {
- case <-r.ctx.Done():
- return -1
- case <-token.Done():
- }
- if token.Error() != nil {
- baseapp.Logger.Errorf("[%s] 环境数据上传失败: %v!!", MODULE_NAME, token.Error())
- return -1
- }
- return nums
- }
- func (r *MQTTReporter) loadSensorData(filePath string) ([]*mcu.EnvSensorData, error) {
- r.sensorFileLock.Lock()
- defer r.sensorFileLock.Unlock()
- var existingData []*mcu.EnvSensorData
- f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- decoder := json.NewDecoder(f)
- if err := decoder.Decode(&existingData); err != nil && err != io.EOF {
- return nil, err
- }
- return existingData, nil
- }
- func (r *MQTTReporter) saveSensorData(envData *mcu.EnvSensorData, filePath string) error {
- existingData, err := r.loadSensorData(filePath)
- if err != nil {
- return err
- }
- r.sensorFileLock.Lock()
- defer r.sensorFileLock.Unlock()
- if len(existingData) >= 1000 {
- existingData = existingData[1:] // 删除最早的一条记录(避免磁盘空间不足)
- }
- existingData = append(existingData, envData)
- f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return err
- }
- defer f.Close()
- err = f.Truncate(0)
- if err != nil { // 清空现有的文件内容
- return err
- }
- _, err = f.Seek(0, 0)
- if err != nil { // 移动文件指针到开头
- return err
- }
- encoder := json.NewEncoder(f)
- encoder.SetIndent("", " ") // 美化输出格式
- if err := encoder.Encode(existingData); err != nil {
- return err
- }
- return nil
- }
|