diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 1a1490a..0f00dd7 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -19,6 +19,12 @@ import ( pb "github.com/vesoft-inc/nebula-agent/v3/pkg/proto" ) +const ( + defaultUploadPartSize = 1024 * 1024 * 32 + defaultDownloadPartSize = 1024 * 1024 * 32 + maxRetries = 3 +) + type S3 struct { backend *pb.Backend @@ -88,13 +94,18 @@ func (s *S3) downloadToFile(file, key string) error { Key: aws.String(key), } - downloader := s3manager.NewDownloader(s.sess) - numBytes, err := downloader.Download(fd, req) - if err != nil { - return fmt.Errorf("download from %s to %s failed: %w", key, file, err) + downloader := s3manager.NewDownloader(s.sess, func(u *s3manager.Downloader) { + u.PartSize = defaultDownloadPartSize + }) + for i := 0; i < maxRetries; i++ { + numBytes, err := downloader.Download(fd, req) + if err == nil { + log.Debugf("Download from %s to %s successfully, bytes=%d.", key, file, numBytes) + break + } + log.Errorf("download from %s to %s failed: %v", key, file, err) } - log.Debugf("Download from %s to %s successfully, bytes=%d.", key, file, numBytes) return nil } @@ -159,17 +170,22 @@ func (s *S3) uploadToStorage(key, file string) error { } defer fd.Close() - uploader := s3manager.NewUploader(s.sess) - _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.backend.GetS3().Bucket), - Key: aws.String(key), - Body: fd, + uploader := s3manager.NewUploader(s.sess, func(u *s3manager.Uploader) { + u.PartSize = defaultUploadPartSize }) - if err != nil { - return fmt.Errorf("upload from %s to %s failed: %w", file, key, err) + for i := 0; i < maxRetries; i++ { + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.backend.GetS3().Bucket), + Key: aws.String(key), + Body: fd, + }) + if err == nil { + log.Debugf("Upload from %s to %s successfully.", file, key) + break + } + log.Errorf("upload from %s to %s failed: %v", file, key, err) } - log.Debugf("Upload from %s to %s successfully.", file, key) return nil }