ftpclient.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package reporter
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jlaffaye/ftp"
  12. "hnyfkj.com.cn/rtu/bxs-sy/baseapp"
  13. )
  14. const (
  15. defaultRtyInterval = 1 * time.Second
  16. defaultLogInterval = 2 * time.Second
  17. defaultUploadTimeout = 5 * time.Minute
  18. defaultDownloadTimeout = 5 * time.Minute
  19. )
  20. var (
  21. fileLock = struct {
  22. sync.Mutex
  23. m map[string]struct{}
  24. }{m: make(map[string]struct{})}
  25. fileUploader = struct {
  26. uploadLock sync.Mutex // 由于和MCU通信协议状态位的限制, 上传照片任务目前只能串行进行, 避免状态混乱 2025-10-17
  27. }{}
  28. )
  29. func tryLockFile(file string) (unlock func(), ok bool) {
  30. fileLock.Lock()
  31. defer fileLock.Unlock()
  32. if _, ok := fileLock.m[file]; ok {
  33. return nil, false
  34. }
  35. fileLock.m[file] = struct{}{}
  36. return func() {
  37. fileLock.Lock()
  38. delete(fileLock.m, file)
  39. fileLock.Unlock()
  40. }, true
  41. }
  42. type progressReader struct {
  43. io.Reader
  44. filename string
  45. total, transferred int64
  46. label string // "上传"或"下载"
  47. ctx context.Context
  48. doneLogged int32
  49. }
  50. func newProgressReader(r io.Reader, filename string, total, transferred int64, label string, ctx context.Context) *progressReader {
  51. pr := &progressReader{Reader: r, filename: filename, total: total, transferred: transferred, label: label, ctx: ctx}
  52. go pr.startProgressLogger()
  53. return pr
  54. }
  55. func (p *progressReader) Read(buf []byte) (int, error) {
  56. n, err := p.Reader.Read(buf)
  57. if n > 0 {
  58. atomic.AddInt64(&p.transferred, int64(n))
  59. }
  60. if err == io.EOF && atomic.CompareAndSwapInt32(&p.doneLogged, 0, 1) {
  61. transferred := atomic.LoadInt64(&p.transferred)
  62. baseapp.Logger.Infof("[%s] 文件%q%s进度: 100.00%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, p.total-transferred, p.total)
  63. }
  64. return n, err
  65. }
  66. func (p *progressReader) startProgressLogger() {
  67. ticker := time.NewTicker(defaultLogInterval)
  68. defer ticker.Stop()
  69. for {
  70. select {
  71. case <-ticker.C:
  72. if atomic.LoadInt32(&p.doneLogged) == 1 {
  73. return
  74. }
  75. transferred := atomic.LoadInt64(&p.transferred)
  76. if transferred >= p.total {
  77. return
  78. }
  79. progress := float64(transferred) / float64(p.total) * 100
  80. baseapp.Logger.Infof("[%s] 文件%q%s进度: %.2f%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, progress, p.total-transferred, p.total)
  81. case <-p.ctx.Done():
  82. return
  83. }
  84. }
  85. }
  86. type stopError struct{ err error }
  87. func (e *stopError) Error() string { return e.err.Error() }
  88. func uploadFileToFtp(ctx context.Context, localFile, serverAddr, loginUser, loginPass string, timeout time.Duration) (string, error) {
  89. unlock, ok := tryLockFile(localFile)
  90. if !ok {
  91. return "", fmt.Errorf("文件%q正在使用中", localFile)
  92. }
  93. defer unlock()
  94. if ctx == nil {
  95. ctx = context.Background()
  96. }
  97. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  98. defer cancel()
  99. if Reporter.dui == "" {
  100. return "", fmt.Errorf("无效DUI")
  101. }
  102. lf, err := os.Open(localFile)
  103. if err != nil {
  104. return "", err
  105. }
  106. defer lf.Close()
  107. info, err := lf.Stat()
  108. if err != nil {
  109. return "", err
  110. }
  111. lfs := info.Size() // 总上传的字节数
  112. rd := Reporter.dui // 远程目录名
  113. remoteFile := filepath.Join(rd, filepath.Base(localFile)) // 远程文件名
  114. for {
  115. select {
  116. case <-timeoutCtx.Done():
  117. return "", timeoutCtx.Err()
  118. default:
  119. }
  120. err := func() error {
  121. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  122. if err != nil {
  123. return &stopError{err}
  124. }
  125. defer c.Quit()
  126. if err := c.Login(loginUser, loginPass); err != nil {
  127. return &stopError{err}
  128. }
  129. _ = c.MakeDir(rd) // 尝试创建远程目录, 忽略已存在和其它错误
  130. rfs, err := c.FileSize(remoteFile) // 已上传的字节数
  131. if err != nil || rfs > lfs {
  132. rfs = 0
  133. }
  134. if _, err := lf.Seek(rfs, io.SeekStart); err != nil {
  135. return &stopError{err}
  136. }
  137. pr := newProgressReader(lf, localFile, lfs, rfs, "上传", timeoutCtx)
  138. if err := c.StorFrom(remoteFile, pr, uint64(rfs)); err != nil {
  139. return err
  140. }
  141. return nil
  142. }()
  143. if err != nil {
  144. if lfe, ok := err.(*stopError); ok {
  145. return "", lfe.err
  146. }
  147. time.Sleep(defaultRtyInterval)
  148. continue
  149. }
  150. return remoteFile, nil
  151. }
  152. }
  153. func downloadFileFromFtp(ctx context.Context, serverAddr, loginUser, loginPass, remoteFile string, timeout time.Duration) (string, error) {
  154. unlock, ok := tryLockFile(remoteFile)
  155. if !ok {
  156. return "", fmt.Errorf("文件%q正在使用中", remoteFile)
  157. }
  158. defer unlock()
  159. if ctx == nil {
  160. ctx = context.Background()
  161. }
  162. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  163. defer cancel()
  164. ld := baseapp.VAR_DIR // 本地目录名
  165. localFile := filepath.Join(ld, filepath.Base(remoteFile)) // 本地文件名
  166. lf, err := os.OpenFile(localFile, os.O_CREATE|os.O_RDWR, 0644)
  167. if err != nil {
  168. return "", err
  169. }
  170. defer lf.Close()
  171. info, err := lf.Stat()
  172. if err != nil {
  173. return "", err
  174. }
  175. lfs := info.Size() // 已下载的字节数
  176. for {
  177. select {
  178. case <-timeoutCtx.Done():
  179. return "", timeoutCtx.Err()
  180. default:
  181. }
  182. err := func() error {
  183. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  184. if err != nil {
  185. return &stopError{err}
  186. }
  187. defer c.Quit()
  188. if err := c.Login(loginUser, loginPass); err != nil {
  189. return &stopError{err}
  190. }
  191. rfs, err := c.FileSize(remoteFile) // 总下载的字节数
  192. if err != nil {
  193. return &stopError{err}
  194. }
  195. if lfs > rfs {
  196. lfs = 0
  197. }
  198. if _, err := lf.Seek(lfs, io.SeekStart); err != nil {
  199. return &stopError{err}
  200. }
  201. resp, err := c.RetrFrom(remoteFile, uint64(lfs))
  202. if err != nil {
  203. return err
  204. }
  205. defer resp.Close()
  206. pr := newProgressReader(resp, remoteFile, rfs, lfs, "下载", timeoutCtx)
  207. n, err := io.Copy(lf, pr)
  208. if err != nil {
  209. return err
  210. }
  211. lfs += n
  212. return nil
  213. }()
  214. if err != nil {
  215. if lfe, ok := err.(*stopError); ok {
  216. return "", lfe.err
  217. }
  218. time.Sleep(defaultRtyInterval)
  219. continue
  220. }
  221. return localFile, nil
  222. }
  223. }