ftpclient.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. // Author: NiuJiuRu
  2. // Email: niujiuru@qq.com
  3. package ftpclient
  4. import (
  5. "context"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/jlaffaye/ftp"
  14. "hnyfkj.com.cn/rtu/linux/baseapp"
  15. )
  16. const MODULE_NAME = "FtpClient"
  17. const (
  18. defaultRtyInterval = 1 * time.Second
  19. defaultLogInterval = 2 * time.Second
  20. DefaultUploadTimeout = 5 * time.Minute
  21. DefaultDownloadTimeout = 5 * time.Minute
  22. )
  23. var (
  24. fileLock = struct {
  25. sync.Mutex
  26. m map[string]struct{}
  27. }{m: make(map[string]struct{})}
  28. FileUpFolder = "" // 上传文件目录
  29. FileUploader = &uploader{} // 拍照上传任务只能串行进行时, 可以通过使用该锁来实现排队串行
  30. )
  31. type uploader struct {
  32. uploadLock sync.Mutex
  33. }
  34. func (u *uploader) Lock() {
  35. u.uploadLock.Lock()
  36. }
  37. func (u *uploader) Unlock() {
  38. u.uploadLock.Unlock()
  39. }
  40. func tryLockFile(file string) (unlock func(), ok bool) {
  41. fileLock.Lock()
  42. defer fileLock.Unlock()
  43. if _, ok := fileLock.m[file]; ok {
  44. return nil, false
  45. }
  46. fileLock.m[file] = struct{}{}
  47. return func() {
  48. fileLock.Lock()
  49. delete(fileLock.m, file)
  50. fileLock.Unlock()
  51. }, true
  52. }
  53. type progressReader struct {
  54. io.Reader
  55. filename string
  56. total, transferred int64
  57. label string // "上传"或"下载"
  58. ctx context.Context
  59. doneLogged int32
  60. }
  61. func newProgressReader(r io.Reader, filename string, total, transferred int64, label string, ctx context.Context) *progressReader {
  62. pr := &progressReader{Reader: r, filename: filename, total: total, transferred: transferred, label: label, ctx: ctx}
  63. go pr.startProgressLogger()
  64. return pr
  65. }
  66. func (p *progressReader) Read(buf []byte) (int, error) {
  67. n, err := p.Reader.Read(buf)
  68. if n > 0 {
  69. atomic.AddInt64(&p.transferred, int64(n))
  70. }
  71. if err == io.EOF && atomic.CompareAndSwapInt32(&p.doneLogged, 0, 1) {
  72. transferred := atomic.LoadInt64(&p.transferred)
  73. baseapp.Logger.Infof("[%s] 文件%q%s进度: 100.00%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, p.total-transferred, p.total)
  74. }
  75. return n, err
  76. }
  77. func (p *progressReader) startProgressLogger() {
  78. ticker := time.NewTicker(defaultLogInterval)
  79. defer ticker.Stop()
  80. for {
  81. select {
  82. case <-ticker.C:
  83. if atomic.LoadInt32(&p.doneLogged) == 1 {
  84. return
  85. }
  86. transferred := atomic.LoadInt64(&p.transferred)
  87. if transferred >= p.total {
  88. return
  89. }
  90. progress := float64(transferred) / float64(p.total) * 100
  91. baseapp.Logger.Infof("[%s] 文件%q%s进度: %.2f%%, 剩余: %d字节, 总大小: %d字节", MODULE_NAME, p.filename, p.label, progress, p.total-transferred, p.total)
  92. case <-p.ctx.Done():
  93. return
  94. }
  95. }
  96. }
  97. type stopError struct{ err error }
  98. func (e *stopError) Error() string { return e.err.Error() }
  99. func UploadFileToFtp(ctx context.Context, localFile, serverAddr, loginUser, loginPass string, timeout time.Duration) (string, error) {
  100. unlock, ok := tryLockFile(localFile)
  101. if !ok {
  102. return "", fmt.Errorf("文件%q正在使用中", localFile)
  103. }
  104. defer unlock()
  105. if ctx == nil {
  106. ctx = context.Background()
  107. }
  108. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  109. defer cancel()
  110. if FileUpFolder == "" {
  111. return "", fmt.Errorf("无效的上传目录")
  112. }
  113. lf, err := os.Open(localFile)
  114. if err != nil {
  115. return "", err
  116. }
  117. defer lf.Close()
  118. info, err := lf.Stat()
  119. if err != nil {
  120. return "", err
  121. }
  122. lfs := info.Size() // 总上传的字节数
  123. rd := FileUpFolder // 远程目录名
  124. remoteFile := filepath.Join(rd, filepath.Base(localFile)) // 远程文件名
  125. for {
  126. select {
  127. case <-timeoutCtx.Done():
  128. return "", timeoutCtx.Err()
  129. default:
  130. }
  131. err := func() error {
  132. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  133. if err != nil {
  134. return &stopError{err}
  135. }
  136. defer c.Quit()
  137. if err := c.Login(loginUser, loginPass); err != nil {
  138. return &stopError{err}
  139. }
  140. _ = c.MakeDir(rd) // 尝试创建远程目录, 忽略已存在和其它错误
  141. rfs, err := c.FileSize(remoteFile) // 已上传的字节数
  142. if err != nil || rfs > lfs {
  143. rfs = 0
  144. }
  145. if _, err := lf.Seek(rfs, io.SeekStart); err != nil {
  146. return &stopError{err}
  147. }
  148. pr := newProgressReader(lf, localFile, lfs, rfs, "上传", timeoutCtx)
  149. if err := c.StorFrom(remoteFile, pr, uint64(rfs)); err != nil {
  150. return err
  151. }
  152. return nil
  153. }()
  154. if err != nil {
  155. if lfe, ok := err.(*stopError); ok {
  156. return "", lfe.err
  157. }
  158. time.Sleep(defaultRtyInterval)
  159. continue
  160. }
  161. return remoteFile, nil
  162. }
  163. }
  164. func DownloadFileFromFtp(ctx context.Context, serverAddr, loginUser, loginPass, remoteFile string, timeout time.Duration) (string, error) {
  165. unlock, ok := tryLockFile(remoteFile)
  166. if !ok {
  167. return "", fmt.Errorf("文件%q正在使用中", remoteFile)
  168. }
  169. defer unlock()
  170. if ctx == nil {
  171. ctx = context.Background()
  172. }
  173. timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
  174. defer cancel()
  175. ld := baseapp.VAR_DIR // 本地目录名
  176. localFile := filepath.Join(ld, filepath.Base(remoteFile)) // 本地文件名
  177. lf, err := os.OpenFile(localFile, os.O_CREATE|os.O_RDWR, 0644)
  178. if err != nil {
  179. return "", err
  180. }
  181. defer lf.Close()
  182. info, err := lf.Stat()
  183. if err != nil {
  184. return "", err
  185. }
  186. lfs := info.Size() // 已下载的字节数
  187. for {
  188. select {
  189. case <-timeoutCtx.Done():
  190. return "", timeoutCtx.Err()
  191. default:
  192. }
  193. err := func() error {
  194. c, err := ftp.Dial(serverAddr, ftp.DialWithContext(timeoutCtx))
  195. if err != nil {
  196. return &stopError{err}
  197. }
  198. defer c.Quit()
  199. if err := c.Login(loginUser, loginPass); err != nil {
  200. return &stopError{err}
  201. }
  202. rfs, err := c.FileSize(remoteFile) // 总下载的字节数
  203. if err != nil {
  204. return &stopError{err}
  205. }
  206. if lfs > rfs {
  207. lfs = 0
  208. }
  209. if _, err := lf.Seek(lfs, io.SeekStart); err != nil {
  210. return &stopError{err}
  211. }
  212. resp, err := c.RetrFrom(remoteFile, uint64(lfs))
  213. if err != nil {
  214. return err
  215. }
  216. defer resp.Close()
  217. pr := newProgressReader(resp, remoteFile, rfs, lfs, "下载", timeoutCtx)
  218. n, err := io.Copy(lf, pr)
  219. if err != nil {
  220. return err
  221. }
  222. lfs += n
  223. return nil
  224. }()
  225. if err != nil {
  226. if lfe, ok := err.(*stopError); ok {
  227. return "", lfe.err
  228. }
  229. time.Sleep(defaultRtyInterval)
  230. continue
  231. }
  232. return localFile, nil
  233. }
  234. }