ftpclient.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. // Author: NiuJiuRu
  2. // Email: niujiuru@qq.com
  3. package ftpclient
  4. import (
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jlaffaye/ftp"
  14. "hnyfkj.com.cn/rtu/linux/baseapp"
  15. )
  16. const MODULE_NAME = "FtpClient"
  17. const (
  18. defaultRtyInterval = 1 * time.Second
  19. defaultLogInterval = 2 * time.Second
  20. DefaultUploadTimeout = 5 * time.Minute
  21. DefaultDownloadTimeout = 5 * time.Minute
  22. )
  23. var (
  24. fileLock = struct {
  25. sync.Mutex
  26. m map[string]struct{}
  27. }{m: make(map[string]struct{})}
  28. FileUpFolder = "" // 上传文件目录
  29. FileUploader = struct {
  30. UploadLock sync.Mutex // 上传照片任务只能串行进行时, 可以通过使用该锁来实现排队串行
  31. }{}
  32. )
  33. func tryLockFile(file string) (unlock func(), ok bool) {
  34. fileLock.Lock()
  35. defer fileLock.Unlock()
  36. if _, ok := fileLock.m[file]; ok {
  37. return nil, false
  38. }
  39. fileLock.m[file] = struct{}{}
  40. return func() {
  41. fileLock.Lock()
  42. delete(fileLock.m, file)
  43. fileLock.Unlock()
  44. }, true
  45. }
  46. type progressReader struct {
  47. io.Reader
  48. filename string
  49. total, transferred int64
  50. label string // "上传"或"下载"
  51. ctx context.Context
  52. doneLogged int32
  53. }
  54. func newProgressReader(r io.Reader, filename string, total, transferred int64, label string, ctx context.Context) *progressReader {
  55. pr := &progressReader{Reader: r, filename: filename, total: total, transferred: transferred, label: label, ctx: ctx}
  56. go pr.startProgressLogger()
  57. return pr
  58. }
  59. func (p *progressReader) Read(buf []byte) (int, error) {
  60. n, err := p.Reader.Read(buf)
  61. if n > 0 {
  62. atomic.AddInt64(&p.transferred, int64(n))
  63. }
  64. if err == io.EOF && atomic.CompareAndSwapInt32(&p.doneLogged, 0, 1) {
  65. transferred := atomic.LoadInt64(&p.transferred)
  66. baseapp.Logger.Infof("[%s] 文件%q%s进度: 100.00%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, p.total-transferred, p.total)
  67. }
  68. return n, err
  69. }
  70. func (p *progressReader) startProgressLogger() {
  71. ticker := time.NewTicker(defaultLogInterval)
  72. defer ticker.Stop()
  73. for {
  74. select {
  75. case <-ticker.C:
  76. if atomic.LoadInt32(&p.doneLogged) == 1 {
  77. return
  78. }
  79. transferred := atomic.LoadInt64(&p.transferred)
  80. if transferred >= p.total {
  81. return
  82. }
  83. progress := float64(transferred) / float64(p.total) * 100
  84. baseapp.Logger.Infof("[%s] 文件%q%s进度: %.2f%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, progress, p.total-transferred, p.total)
  85. case <-p.ctx.Done():
  86. return
  87. }
  88. }
  89. }
  90. type stopError struct{ err error }
  91. func (e *stopError) Error() string { return e.err.Error() }
  92. func UploadFileToFtp(ctx context.Context, localFile, serverAddr, loginUser, loginPass string, timeout time.Duration) (string, error) {
  93. unlock, ok := tryLockFile(localFile)
  94. if !ok {
  95. return "", fmt.Errorf("文件%q正在使用中", localFile)
  96. }
  97. defer unlock()
  98. if ctx == nil {
  99. ctx = context.Background()
  100. }
  101. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  102. defer cancel()
  103. if FileUpFolder == "" {
  104. return "", fmt.Errorf("无效的上传目录")
  105. }
  106. lf, err := os.Open(localFile)
  107. if err != nil {
  108. return "", err
  109. }
  110. defer lf.Close()
  111. info, err := lf.Stat()
  112. if err != nil {
  113. return "", err
  114. }
  115. lfs := info.Size() // 总上传的字节数
  116. rd := FileUpFolder // 远程目录名
  117. remoteFile := filepath.Join(rd, filepath.Base(localFile)) // 远程文件名
  118. for {
  119. select {
  120. case <-timeoutCtx.Done():
  121. return "", timeoutCtx.Err()
  122. default:
  123. }
  124. err := func() error {
  125. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  126. if err != nil {
  127. return &stopError{err}
  128. }
  129. defer c.Quit()
  130. if err := c.Login(loginUser, loginPass); err != nil {
  131. return &stopError{err}
  132. }
  133. _ = c.MakeDir(rd) // 尝试创建远程目录, 忽略已存在和其它错误
  134. rfs, err := c.FileSize(remoteFile) // 已上传的字节数
  135. if err != nil || rfs > lfs {
  136. rfs = 0
  137. }
  138. if _, err := lf.Seek(rfs, io.SeekStart); err != nil {
  139. return &stopError{err}
  140. }
  141. pr := newProgressReader(lf, localFile, lfs, rfs, "上传", timeoutCtx)
  142. if err := c.StorFrom(remoteFile, pr, uint64(rfs)); err != nil {
  143. return err
  144. }
  145. return nil
  146. }()
  147. if err != nil {
  148. if lfe, ok := err.(*stopError); ok {
  149. return "", lfe.err
  150. }
  151. time.Sleep(defaultRtyInterval)
  152. continue
  153. }
  154. return remoteFile, nil
  155. }
  156. }
  157. func DownloadFileFromFtp(ctx context.Context, serverAddr, loginUser, loginPass, remoteFile string, timeout time.Duration) (string, error) {
  158. unlock, ok := tryLockFile(remoteFile)
  159. if !ok {
  160. return "", fmt.Errorf("文件%q正在使用中", remoteFile)
  161. }
  162. defer unlock()
  163. if ctx == nil {
  164. ctx = context.Background()
  165. }
  166. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  167. defer cancel()
  168. ld := baseapp.VAR_DIR // 本地目录名
  169. localFile := filepath.Join(ld, filepath.Base(remoteFile)) // 本地文件名
  170. lf, err := os.OpenFile(localFile, os.O_CREATE|os.O_RDWR, 0644)
  171. if err != nil {
  172. return "", err
  173. }
  174. defer lf.Close()
  175. info, err := lf.Stat()
  176. if err != nil {
  177. return "", err
  178. }
  179. lfs := info.Size() // 已下载的字节数
  180. for {
  181. select {
  182. case <-timeoutCtx.Done():
  183. return "", timeoutCtx.Err()
  184. default:
  185. }
  186. err := func() error {
  187. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  188. if err != nil {
  189. return &stopError{err}
  190. }
  191. defer c.Quit()
  192. if err := c.Login(loginUser, loginPass); err != nil {
  193. return &stopError{err}
  194. }
  195. rfs, err := c.FileSize(remoteFile) // 总下载的字节数
  196. if err != nil {
  197. return &stopError{err}
  198. }
  199. if lfs > rfs {
  200. lfs = 0
  201. }
  202. if _, err := lf.Seek(lfs, io.SeekStart); err != nil {
  203. return &stopError{err}
  204. }
  205. resp, err := c.RetrFrom(remoteFile, uint64(lfs))
  206. if err != nil {
  207. return err
  208. }
  209. defer resp.Close()
  210. pr := newProgressReader(resp, remoteFile, rfs, lfs, "下载", timeoutCtx)
  211. n, err := io.Copy(lf, pr)
  212. if err != nil {
  213. return err
  214. }
  215. lfs += n
  216. return nil
  217. }()
  218. if err != nil {
  219. if lfe, ok := err.(*stopError); ok {
  220. return "", lfe.err
  221. }
  222. time.Sleep(defaultRtyInterval)
  223. continue
  224. }
  225. return localFile, nil
  226. }
  227. }