ftpclient.go 6.1 KB


  1. // Author: NiuJiuRu
  2. // Email: niujiuru@qq.com
  3. package utils
  4. import (
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jlaffaye/ftp"
  14. "hnyfkj.com.cn/rtu/linux/baseapp"
  15. )
  16. const (
  17. defaultRtyInterval = 1 * time.Second
  18. defaultLogInterval = 2 * time.Second
  19. DefaultUploadTimeout = 5 * time.Minute
  20. DefaultDownloadTimeout = 5 * time.Minute
  21. )
  22. var (
  23. fileLock = struct {
  24. sync.Mutex
  25. m map[string]struct{}
  26. }{m: make(map[string]struct{})}
  27. FileUpFolder = "" // 上传文件目录
  28. FileUploader = struct {
  29. uploadLock sync.Mutex // 上传照片任务只能串行进行时, 可以通过使用该锁来实现排队串行
  30. }{}
  31. )
  32. func tryLockFile(file string) (unlock func(), ok bool) {
  33. fileLock.Lock()
  34. defer fileLock.Unlock()
  35. if _, ok := fileLock.m[file]; ok {
  36. return nil, false
  37. }
  38. fileLock.m[file] = struct{}{}
  39. return func() {
  40. fileLock.Lock()
  41. delete(fileLock.m, file)
  42. fileLock.Unlock()
  43. }, true
  44. }
  45. type progressReader struct {
  46. io.Reader
  47. filename string
  48. total, transferred int64
  49. label string // "上传"或"下载"
  50. ctx context.Context
  51. doneLogged int32
  52. }
  53. func newProgressReader(r io.Reader, filename string, total, transferred int64, label string, ctx context.Context) *progressReader {
  54. pr := &progressReader{Reader: r, filename: filename, total: total, transferred: transferred, label: label, ctx: ctx}
  55. go pr.startProgressLogger()
  56. return pr
  57. }
  58. func (p *progressReader) Read(buf []byte) (int, error) {
  59. n, err := p.Reader.Read(buf)
  60. if n > 0 {
  61. atomic.AddInt64(&p.transferred, int64(n))
  62. }
  63. if err == io.EOF && atomic.CompareAndSwapInt32(&p.doneLogged, 0, 1) {
  64. transferred := atomic.LoadInt64(&p.transferred)
  65. baseapp.Logger.Infof("[%s] 文件%q%s进度: 100.00%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, p.total-transferred, p.total)
  66. }
  67. return n, err
  68. }
  69. func (p *progressReader) startProgressLogger() {
  70. ticker := time.NewTicker(defaultLogInterval)
  71. defer ticker.Stop()
  72. for {
  73. select {
  74. case <-ticker.C:
  75. if atomic.LoadInt32(&p.doneLogged) == 1 {
  76. return
  77. }
  78. transferred := atomic.LoadInt64(&p.transferred)
  79. if transferred >= p.total {
  80. return
  81. }
  82. progress := float64(transferred) / float64(p.total) * 100
  83. baseapp.Logger.Infof("[%s] 文件%q%s进度: %.2f%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, progress, p.total-transferred, p.total)
  84. case <-p.ctx.Done():
  85. return
  86. }
  87. }
  88. }
  89. type stopError struct{ err error }
  90. func (e *stopError) Error() string { return e.err.Error() }
  91. func UploadFileToFtp(ctx context.Context, localFile, serverAddr, loginUser, loginPass string, timeout time.Duration) (string, error) {
  92. unlock, ok := tryLockFile(localFile)
  93. if !ok {
  94. return "", fmt.Errorf("文件%q正在使用中", localFile)
  95. }
  96. defer unlock()
  97. if ctx == nil {
  98. ctx = context.Background()
  99. }
  100. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  101. defer cancel()
  102. if FileUpFolder == "" {
  103. return "", fmt.Errorf("无效的上传目录")
  104. }
  105. lf, err := os.Open(localFile)
  106. if err != nil {
  107. return "", err
  108. }
  109. defer lf.Close()
  110. info, err := lf.Stat()
  111. if err != nil {
  112. return "", err
  113. }
  114. lfs := info.Size() // 总上传的字节数
  115. rd := FileUpFolder // 远程目录名
  116. remoteFile := filepath.Join(rd, filepath.Base(localFile)) // 远程文件名
  117. for {
  118. select {
  119. case <-timeoutCtx.Done():
  120. return "", timeoutCtx.Err()
  121. default:
  122. }
  123. err := func() error {
  124. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  125. if err != nil {
  126. return &stopError{err}
  127. }
  128. defer c.Quit()
  129. if err := c.Login(loginUser, loginPass); err != nil {
  130. return &stopError{err}
  131. }
  132. _ = c.MakeDir(rd) // 尝试创建远程目录, 忽略已存在和其它错误
  133. rfs, err := c.FileSize(remoteFile) // 已上传的字节数
  134. if err != nil || rfs > lfs {
  135. rfs = 0
  136. }
  137. if _, err := lf.Seek(rfs, io.SeekStart); err != nil {
  138. return &stopError{err}
  139. }
  140. pr := newProgressReader(lf, localFile, lfs, rfs, "上传", timeoutCtx)
  141. if err := c.StorFrom(remoteFile, pr, uint64(rfs)); err != nil {
  142. return err
  143. }
  144. return nil
  145. }()
  146. if err != nil {
  147. if lfe, ok := err.(*stopError); ok {
  148. return "", lfe.err
  149. }
  150. time.Sleep(defaultRtyInterval)
  151. continue
  152. }
  153. return remoteFile, nil
  154. }
  155. }
  156. func DownloadFileFromFtp(ctx context.Context, serverAddr, loginUser, loginPass, remoteFile string, timeout time.Duration) (string, error) {
  157. unlock, ok := tryLockFile(remoteFile)
  158. if !ok {
  159. return "", fmt.Errorf("文件%q正在使用中", remoteFile)
  160. }
  161. defer unlock()
  162. if ctx == nil {
  163. ctx = context.Background()
  164. }
  165. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  166. defer cancel()
  167. ld := baseapp.VAR_DIR // 本地目录名
  168. localFile := filepath.Join(ld, filepath.Base(remoteFile)) // 本地文件名
  169. lf, err := os.OpenFile(localFile, os.O_CREATE|os.O_RDWR, 0644)
  170. if err != nil {
  171. return "", err
  172. }
  173. defer lf.Close()
  174. info, err := lf.Stat()
  175. if err != nil {
  176. return "", err
  177. }
  178. lfs := info.Size() // 已下载的字节数
  179. for {
  180. select {
  181. case <-timeoutCtx.Done():
  182. return "", timeoutCtx.Err()
  183. default:
  184. }
  185. err := func() error {
  186. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  187. if err != nil {
  188. return &stopError{err}
  189. }
  190. defer c.Quit()
  191. if err := c.Login(loginUser, loginPass); err != nil {
  192. return &stopError{err}
  193. }
  194. rfs, err := c.FileSize(remoteFile) // 总下载的字节数
  195. if err != nil {
  196. return &stopError{err}
  197. }
  198. if lfs > rfs {
  199. lfs = 0
  200. }
  201. if _, err := lf.Seek(lfs, io.SeekStart); err != nil {
  202. return &stopError{err}
  203. }
  204. resp, err := c.RetrFrom(remoteFile, uint64(lfs))
  205. if err != nil {
  206. return err
  207. }
  208. defer resp.Close()
  209. pr := newProgressReader(resp, remoteFile, rfs, lfs, "下载", timeoutCtx)
  210. n, err := io.Copy(lf, pr)
  211. if err != nil {
  212. return err
  213. }
  214. lfs += n
  215. return nil
  216. }()
  217. if err != nil {
  218. if lfe, ok := err.(*stopError); ok {
  219. return "", lfe.err
  220. }
  221. time.Sleep(defaultRtyInterval)
  222. continue
  223. }
  224. return localFile, nil
  225. }
  226. }