Просмотр исходного кода

编写与大数据平台通信的代码

niujiuru 3 недель назад
Родитель
Сommit
241d0bcacf
4 измененных файлов с 203 добавлено и 1 удалено
  1. 3 0
      go.mod
  2. 6 0
      go.sum
  3. 100 0
      reporter/protocol.go
  4. 94 1
      reporter/reporter.go

+ 3 - 0
go.mod

@@ -3,6 +3,7 @@ module hnyfkj.com.cn/rtu/xy_v
 go 1.24.2
 
 require (
+	github.com/eclipse/paho.mqtt.golang v1.5.1
 	gopkg.in/ini.v1 v1.67.0
 	hnyfkj.com.cn/rtu/linux v0.0.0
 )
@@ -10,6 +11,7 @@ require (
 require (
 	github.com/alexflint/go-filemutex v1.3.0 // indirect
 	github.com/beevik/ntp v1.5.0 // indirect
+	github.com/gorilla/websocket v1.5.3 // indirect
 	github.com/hashicorp/errwrap v1.0.0 // indirect
 	github.com/hashicorp/go-multierror v1.1.1 // indirect
 	github.com/jlaffaye/ftp v0.2.0 // indirect
@@ -17,6 +19,7 @@ require (
 	github.com/vishvananda/netlink v1.3.1 // indirect
 	github.com/vishvananda/netns v0.0.5 // indirect
 	golang.org/x/net v0.44.0 // indirect
+	golang.org/x/sync v0.17.0 // indirect
 	golang.org/x/sys v0.36.0 // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
 )

+ 6 - 0
go.sum

@@ -5,6 +5,10 @@ github.com/beevik/ntp v1.5.0/go.mod h1:mJEhBrwT76w9D+IfOEGvuzyuudiW9E52U2BaTrMOY
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
+github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
 github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
@@ -26,6 +30,8 @@ github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zd
 github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
 golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
 golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
+golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
+golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

+ 100 - 0
reporter/protocol.go

@@ -0,0 +1,100 @@
+package reporter
+
+import (
+	"encoding/json"
+	"fmt"
+
+	"hnyfkj.com.cn/rtu/linux/utils/jsonrpc2"
+)
+
+func makeLoginReq(imei, iccid, rssi, appVersion, inheritDUI string) ([]byte, error) {
+	type params struct {
+		DeviceType string `json:"deviceType"`
+		IMEI       string `json:"imei"`
+		ICCID      string `json:"iccid"`
+		RSSI       string `json:"rssi"`
+		AppVersion string `json:"appVersion"`
+		InheritID  string `json:"inheritDUI,omitempty"` // 继承的DUI, 可选可为空
+	}
+
+	paramsVar := params{
+		DeviceType: "-1", // Todo: 拍照性诱智能监测设备产品类型未定
+		IMEI:       imei,
+		ICCID:      iccid,
+		RSSI:       rssi,
+		AppVersion: appVersion,
+		InheritID:  inheritDUI,
+	}
+
+	paramsBytes, err := json.Marshal(paramsVar)
+	if err != nil {
+		return nil, err
+	}
+
+	req := jsonrpc2.Request{
+		JSONRPC: "2.0",
+		Method:  "login",
+		Params:  paramsBytes,
+		ID:      1,
+	}
+
+	return json.Marshal(req)
+}
+
+func parseLoginResp(data []byte) (string, string, error) {
+	var resp jsonrpc2.Response
+	if err := json.Unmarshal(data, &resp); err != nil {
+		return "", "", err
+	}
+
+	if resp.Error != nil {
+		return "", "", fmt.Errorf("服务器返回错误: %d, %s", resp.Error.Code, resp.Error.Message)
+	}
+
+	var result struct {
+		IMEI string `json:"imei"`
+		DUI  string `json:"DUI"`
+	}
+	if err := json.Unmarshal(resp.Result, &result); err != nil {
+		return "", "", err
+	}
+
+	if len(result.DUI) != 14 || !isDecimal(result.DUI) {
+		return "", "", fmt.Errorf("无效DUI: %q", result.DUI)
+	}
+
+	return result.IMEI, result.DUI, nil
+}
+
+func makeLogoutMsg(imei string) ([]byte, error) {
+	type params struct {
+		IMEI string `json:"imei"`
+	}
+
+	paramsVar := params{
+		IMEI: imei,
+	}
+
+	paramsBytes, err := json.Marshal(paramsVar)
+	if err != nil {
+		return nil, err
+	}
+
+	req := jsonrpc2.Request{
+		JSONRPC: "2.0",
+		Method:  "logout",
+		Params:  paramsBytes,
+		ID:      1,
+	}
+
+	return json.Marshal(req)
+}
+
+func isDecimal(s string) bool {
+	for _, ch := range s {
+		if ch < '0' || ch > '9' {
+			return false
+		}
+	}
+	return s != "" // 不允许空字符串
+}

+ 94 - 1
reporter/reporter.go

@@ -1,9 +1,43 @@
 package reporter
 
-import "hnyfkj.com.cn/rtu/linux/baseapp"
+import (
+	"context"
+	"io/fs"
+	"os"
+	"path/filepath"
+	"sync/atomic"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	mcu "hnyfkj.com.cn/rtu/xy_v/mcu_ctrl_board"
+
+	"hnyfkj.com.cn/rtu/linux/baseapp"
+	"hnyfkj.com.cn/rtu/linux/netmgrd"
+	"hnyfkj.com.cn/rtu/linux/utils/ftpclient"
+	"hnyfkj.com.cn/rtu/linux/utils/singletask"
+)
 
 const MODULE_NAME = "Reporter"
 
+var (
+	Reporter *MQTTReporter
+)
+
+type MQTTReporter struct {
+	client mqtt.Client
+
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	isLogin    atomic.Bool // 标记是否已成功登录MQTT后端服务器
+	dui        string      // 登录成功后服务端返回的设备唯一ID
+	inheritDUI string      // 继承的历史ID, 可选可为空(换板时)
+
+	// 主动上报的后台任务, 登录成功时用于照片补录和上报通知类消息
+	uploadPhotosTask *singletask.OnceTask // 补录照片, 单实例
+	reportMcuCfgTask *singletask.OnceTask // 上报配置, 单实例
+	reportRtuPosTask *singletask.OnceTask // 上报位置, 单实例
+}
+
 func ModuleInit() bool {
 	err := loadCfgServers()
 	if err != nil {
@@ -13,3 +47,62 @@ func ModuleInit() bool {
 
 	return true
 }
+
+func (r *MQTTReporter) IsLogin() bool {
+	return r.isLogin.Load()
+}
+
+func (r *MQTTReporter) OnLogin(client mqtt.Client, msg mqtt.Message) {
+	imei, dui, err := parseLoginResp(msg.Payload())
+	if err != nil {
+		baseapp.Logger.Errorf("[%s] 登录失败: %v!!", MODULE_NAME, err)
+		return
+	}
+
+	if imei != netmgrd.GetIMEI() { // 判断是否我的应答
+		return
+	}
+
+	if len(r.inheritDUI) > 0 && r.inheritDUI == dui {
+		_ = os.Remove(filepath.Join(baseapp.VAR_DIR, "inheritDUI.txt"))
+	} else if len(r.inheritDUI) > 0 && r.inheritDUI != dui {
+		baseapp.Logger.Errorf("[%s] 登录失败: 要继承的DUI(%s)与服务器返回的DUI(%s)不匹配!!", MODULE_NAME, r.inheritDUI, dui)
+		return
+	}
+
+	r.dui = dui
+	r.isLogin.Store(true)
+	r.client.Unsubscribe("/yfkj/bxs-sy/server/rpc/response") // 登录成功后, 取消对登录应答的订阅(不关心是否取消成功)
+	baseapp.Logger.Infof("[%s] 登录成功, 设备DUI: %s", MODULE_NAME, r.dui)
+
+	r.uploadPhotosTask.Run(r.uploadPendingPhotos, true) // 补录上传历史遗存照片, 单实例运行
+}
+
+func (r *MQTTReporter) uploadPendingPhotos() {
+	baseapp.Logger.Infof("[%s] 拍照补录上传任务开始", MODULE_NAME)
+	nums := 0
+	filepath.WalkDir(baseapp.IMG_DIR, func(path string, d fs.DirEntry, walkErr error) error {
+		if walkErr != nil || d.IsDir() || filepath.Ext(path) != ".jpg" {
+			return nil
+		} else if !r.IsLogin() || r.ctx.Err() != nil {
+			return context.Canceled
+		}
+
+		ftpclient.FileUploader.Lock() // 上传锁定(多个上传任务并发时, 保证串行执行)
+		defer ftpclient.FileUploader.Unlock()
+
+		mcu.GlobalWorkState.Add(mcu.PhotoUploading)
+		defer mcu.GlobalWorkState.Remove(mcu.PhotoUploading)
+
+		_, err := ftpclient.UploadFileToFtp(r.ctx, path, CfgServers.Img2Ftp.Address, CfgServers.Img2Ftp.Username, CfgServers.Img2Ftp.Password, ftpclient.DefaultUploadTimeout)
+		if err == nil {
+			baseapp.Logger.Infof("[%s] 拍照补录上传成功, 本地文件: %q已删除", MODULE_NAME, path)
+			os.Remove(path)
+			nums++
+		}
+
+		return nil
+	})
+
+	baseapp.Logger.Infof("[%s] 拍照补录上传任务结束, 本次共上传%d张照片", MODULE_NAME, nums)
+}