Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45723
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
wjhuang2016 authored and ti-chi-bot committed Dec 5, 2023
1 parent 96079dd commit 54b2b81
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 0 deletions.
4 changes: 4 additions & 0 deletions br/pkg/mock/storage/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,21 @@ func WithCompression(inner ExternalStorage, compressionType CompressType, cfg De
}
}

<<<<<<< HEAD
func (w *withCompression) Create(ctx context.Context, name string, o *WriterOption) (ExternalFileWriter, error) {
writer, err := w.ExternalStorage.Create(ctx, name, o)
=======
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
var (
writer ExternalFileWriter
err error
)
if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok {
writer, err = s3Storage.CreateUploader(ctx, name)
} else {
writer, err = w.ExternalStorage.Create(ctx, name, nil)
}
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,17 @@ func (f *localFile) GetFileSize() (int64, error) {

// Create implements ExternalStorage interface.
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
<<<<<<< HEAD
filename := filepath.Join(l.base, name)
dir := filepath.Dir(filename)
err := os.MkdirAll(dir, 0750)
if err != nil {
return nil, errors.Trace(err)
}
file, err := os.Create(filename)
=======
file, err := os.Create(filepath.Join(l.base, name))
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,46 @@ func (s *s3ObjectWriter) Close(_ context.Context) error {
err := s.wd.Close()
if err != nil {
return err
<<<<<<< HEAD
=======
}
s.wg.Wait()
return s.err
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) {
var uploader ExternalFileWriter
var err error
if option == nil || option.Concurrency <= 1 {
uploader, err = rs.CreateUploader(ctx, name)
if err != nil {
return nil, err
}
} else {
up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) {
u.Concurrency = option.Concurrency
u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024)
})
rd, wd := io.Pipe()
upParams := &s3manager.UploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
Body: rd,
}
s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}}
s3Writer.wg.Add(1)
go func() {
_, err := up.UploadWithContext(ctx, upParams)
err1 := rd.Close()
if err != nil {
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1))
}
s3Writer.err = err
s3Writer.wg.Done()
}()
uploader = s3Writer
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
}
s.wg.Wait()
return s.err
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ type WriterOption struct {
Concurrency int
}

<<<<<<< HEAD
type ReaderOption struct {
// StartOffset is inclusive. And it's incompatible with Seek.
StartOffset *int64
// EndOffset is exclusive. And it's incompatible with Seek.
EndOffset *int64
}

=======
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
// ExternalStorage represents a kind of file system storage.
type ExternalStorage interface {
// WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic
Expand Down
8 changes: 8 additions & 0 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,11 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
<<<<<<< HEAD
writer, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(tctx, fileName, nil)
=======
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil)
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
tctx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down Expand Up @@ -486,7 +490,11 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
initRoutine := func() error {
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
// which will cause a context canceled error when closing gcs's Writer
<<<<<<< HEAD
w, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(pCtx, fileName, nil)
=======
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil)
>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723))
if err != nil {
pCtx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down

0 comments on commit 54b2b81

Please sign in to comment.