ftpclient.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package utils
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/jlaffaye/ftp"
  12. "hnyfkj.com.cn/rtu/linux/baseapp"
  13. )
  14. const (
  15. defaultRtyInterval = 1 * time.Second
  16. defaultLogInterval = 2 * time.Second
  17. DefaultUploadTimeout = 5 * time.Minute
  18. DefaultDownloadTimeout = 5 * time.Minute
  19. )
  20. var (
  21. fileLock = struct {
  22. sync.Mutex
  23. m map[string]struct{}
  24. }{m: make(map[string]struct{})}
  25. FileUpFolder = "" // 上传文件目录
  26. FileUploader = struct {
  27. uploadLock sync.Mutex // 上传照片任务只能串行进行时, 可以通过使用该锁来实现排队串行
  28. }{}
  29. )
  30. func tryLockFile(file string) (unlock func(), ok bool) {
  31. fileLock.Lock()
  32. defer fileLock.Unlock()
  33. if _, ok := fileLock.m[file]; ok {
  34. return nil, false
  35. }
  36. fileLock.m[file] = struct{}{}
  37. return func() {
  38. fileLock.Lock()
  39. delete(fileLock.m, file)
  40. fileLock.Unlock()
  41. }, true
  42. }
  43. type progressReader struct {
  44. io.Reader
  45. filename string
  46. total, transferred int64
  47. label string // "上传"或"下载"
  48. ctx context.Context
  49. doneLogged int32
  50. }
  51. func newProgressReader(r io.Reader, filename string, total, transferred int64, label string, ctx context.Context) *progressReader {
  52. pr := &progressReader{Reader: r, filename: filename, total: total, transferred: transferred, label: label, ctx: ctx}
  53. go pr.startProgressLogger()
  54. return pr
  55. }
  56. func (p *progressReader) Read(buf []byte) (int, error) {
  57. n, err := p.Reader.Read(buf)
  58. if n > 0 {
  59. atomic.AddInt64(&p.transferred, int64(n))
  60. }
  61. if err == io.EOF && atomic.CompareAndSwapInt32(&p.doneLogged, 0, 1) {
  62. transferred := atomic.LoadInt64(&p.transferred)
  63. baseapp.Logger.Infof("[%s] 文件%q%s进度: 100.00%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, p.total-transferred, p.total)
  64. }
  65. return n, err
  66. }
  67. func (p *progressReader) startProgressLogger() {
  68. ticker := time.NewTicker(defaultLogInterval)
  69. defer ticker.Stop()
  70. for {
  71. select {
  72. case <-ticker.C:
  73. if atomic.LoadInt32(&p.doneLogged) == 1 {
  74. return
  75. }
  76. transferred := atomic.LoadInt64(&p.transferred)
  77. if transferred >= p.total {
  78. return
  79. }
  80. progress := float64(transferred) / float64(p.total) * 100
  81. baseapp.Logger.Infof("[%s] 文件%q%s进度: %.2f%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, progress, p.total-transferred, p.total)
  82. case <-p.ctx.Done():
  83. return
  84. }
  85. }
  86. }
  87. type stopError struct{ err error }
  88. func (e *stopError) Error() string { return e.err.Error() }
  89. func UploadFileToFtp(ctx context.Context, localFile, serverAddr, loginUser, loginPass string, timeout time.Duration) (string, error) {
  90. unlock, ok := tryLockFile(localFile)
  91. if !ok {
  92. return "", fmt.Errorf("文件%q正在使用中", localFile)
  93. }
  94. defer unlock()
  95. if ctx == nil {
  96. ctx = context.Background()
  97. }
  98. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  99. defer cancel()
  100. if FileUpFolder == "" {
  101. return "", fmt.Errorf("无效的上传目录")
  102. }
  103. lf, err := os.Open(localFile)
  104. if err != nil {
  105. return "", err
  106. }
  107. defer lf.Close()
  108. info, err := lf.Stat()
  109. if err != nil {
  110. return "", err
  111. }
  112. lfs := info.Size() // 总上传的字节数
  113. rd := FileUpFolder // 远程目录名
  114. remoteFile := filepath.Join(rd, filepath.Base(localFile)) // 远程文件名
  115. for {
  116. select {
  117. case <-timeoutCtx.Done():
  118. return "", timeoutCtx.Err()
  119. default:
  120. }
  121. err := func() error {
  122. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  123. if err != nil {
  124. return &stopError{err}
  125. }
  126. defer c.Quit()
  127. if err := c.Login(loginUser, loginPass); err != nil {
  128. return &stopError{err}
  129. }
  130. _ = c.MakeDir(rd) // 尝试创建远程目录, 忽略已存在和其它错误
  131. rfs, err := c.FileSize(remoteFile) // 已上传的字节数
  132. if err != nil || rfs > lfs {
  133. rfs = 0
  134. }
  135. if _, err := lf.Seek(rfs, io.SeekStart); err != nil {
  136. return &stopError{err}
  137. }
  138. pr := newProgressReader(lf, localFile, lfs, rfs, "上传", timeoutCtx)
  139. if err := c.StorFrom(remoteFile, pr, uint64(rfs)); err != nil {
  140. return err
  141. }
  142. return nil
  143. }()
  144. if err != nil {
  145. if lfe, ok := err.(*stopError); ok {
  146. return "", lfe.err
  147. }
  148. time.Sleep(defaultRtyInterval)
  149. continue
  150. }
  151. return remoteFile, nil
  152. }
  153. }
  154. func DownloadFileFromFtp(ctx context.Context, serverAddr, loginUser, loginPass, remoteFile string, timeout time.Duration) (string, error) {
  155. unlock, ok := tryLockFile(remoteFile)
  156. if !ok {
  157. return "", fmt.Errorf("文件%q正在使用中", remoteFile)
  158. }
  159. defer unlock()
  160. if ctx == nil {
  161. ctx = context.Background()
  162. }
  163. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  164. defer cancel()
  165. ld := baseapp.VAR_DIR // 本地目录名
  166. localFile := filepath.Join(ld, filepath.Base(remoteFile)) // 本地文件名
  167. lf, err := os.OpenFile(localFile, os.O_CREATE|os.O_RDWR, 0644)
  168. if err != nil {
  169. return "", err
  170. }
  171. defer lf.Close()
  172. info, err := lf.Stat()
  173. if err != nil {
  174. return "", err
  175. }
  176. lfs := info.Size() // 已下载的字节数
  177. for {
  178. select {
  179. case <-timeoutCtx.Done():
  180. return "", timeoutCtx.Err()
  181. default:
  182. }
  183. err := func() error {
  184. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  185. if err != nil {
  186. return &stopError{err}
  187. }
  188. defer c.Quit()
  189. if err := c.Login(loginUser, loginPass); err != nil {
  190. return &stopError{err}
  191. }
  192. rfs, err := c.FileSize(remoteFile) // 总下载的字节数
  193. if err != nil {
  194. return &stopError{err}
  195. }
  196. if lfs > rfs {
  197. lfs = 0
  198. }
  199. if _, err := lf.Seek(lfs, io.SeekStart); err != nil {
  200. return &stopError{err}
  201. }
  202. resp, err := c.RetrFrom(remoteFile, uint64(lfs))
  203. if err != nil {
  204. return err
  205. }
  206. defer resp.Close()
  207. pr := newProgressReader(resp, remoteFile, rfs, lfs, "下载", timeoutCtx)
  208. n, err := io.Copy(lf, pr)
  209. if err != nil {
  210. return err
  211. }
  212. lfs += n
  213. return nil
  214. }()
  215. if err != nil {
  216. if lfe, ok := err.(*stopError); ok {
  217. return "", lfe.err
  218. }
  219. time.Sleep(defaultRtyInterval)
  220. continue
  221. }
  222. return localFile, nil
  223. }
  224. }