diff --git a/br/pkg/mock/storage/storage.go b/br/pkg/mock/storage/storage.go index c97c7526f9970..73c03d63275b9 100644 --- a/br/pkg/mock/storage/storage.go +++ b/br/pkg/mock/storage/storage.go @@ -38,7 +38,11 @@ func (m *MockExternalStorage) EXPECT() *MockExternalStorageMockRecorder { } // Create mocks base method. +<<<<<<< HEAD func (m *MockExternalStorage) Create(arg0 context.Context, arg1 string, arg2 *storage.WriterOption) (storage.ExternalFileWriter, error) { +======= +func (m *MockExternalStorage) Create(arg0 context.Context, arg1 string, _ *storage.WriterOption) (storage.ExternalFileWriter, error) { +>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723)) m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Create", arg0, arg1, arg2) ret0, _ := ret[0].(storage.ExternalFileWriter) diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 1805982db21d4..982ff9b1ce6cf 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -29,8 +29,21 @@ func WithCompression(inner ExternalStorage, compressionType CompressType, cfg De } } +<<<<<<< HEAD func (w *withCompression) Create(ctx context.Context, name string, o *WriterOption) (ExternalFileWriter, error) { writer, err := w.ExternalStorage.Create(ctx, name, o) +======= +func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { + var ( + writer ExternalFileWriter + err error + ) + if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok { + writer, err = s3Storage.CreateUploader(ctx, name) + } else { + writer, err = w.ExternalStorage.Create(ctx, name, nil) + } +>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723)) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index bc09c1d47f6de..9809bb4d561ed 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -209,6 +209,7 @@ func (f *localFile) GetFileSize() (int64, error) { // Create implements ExternalStorage interface. func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { +<<<<<<< HEAD filename := filepath.Join(l.base, name) dir := filepath.Dir(filename) err := os.MkdirAll(dir, 0750) @@ -216,6 +217,9 @@ func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) ( return nil, errors.Trace(err) } file, err := os.Create(filename) +======= + file, err := os.Create(filepath.Join(l.base, name)) +>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723)) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 97f58ec5b89fd..ce750c60af7d0 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -1015,6 +1015,46 @@ func (s *s3ObjectWriter) Close(_ context.Context) error { err := s.wd.Close() if err != nil { return err +<<<<<<< HEAD +======= + } + s.wg.Wait() + return s.err +} + +// Create creates multi upload request. +func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) { + var uploader ExternalFileWriter + var err error + if option == nil || option.Concurrency <= 1 { + uploader, err = rs.CreateUploader(ctx, name) + if err != nil { + return nil, err + } + } else { + up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) { + u.Concurrency = option.Concurrency + u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024) + }) + rd, wd := io.Pipe() + upParams := &s3manager.UploadInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + name), + Body: rd, + } + s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}} + s3Writer.wg.Add(1) + go func() { + _, err := up.UploadWithContext(ctx, upParams) + err1 := rd.Close() + if err != nil { + log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1)) + } + s3Writer.err = err + s3Writer.wg.Done() + }() + uploader = s3Writer +>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723)) } s.wg.Wait() return s.err diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 0abecc827414c..6ced43c5b04ce 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -92,6 +92,7 @@ type WriterOption struct { Concurrency int } +<<<<<<< HEAD type ReaderOption struct { // StartOffset is inclusive. And it's incompatible with Seek. StartOffset *int64 @@ -99,6 +100,8 @@ type ReaderOption struct { EndOffset *int64 } +======= +>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723)) // ExternalStorage represents a kind of file system storage. type ExternalStorage interface { // WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic diff --git a/dumpling/export/writer_util.go b/dumpling/export/writer_util.go index 1d4e328703336..e4de8bfec91db 100644 --- a/dumpling/export/writer_util.go +++ b/dumpling/export/writer_util.go @@ -453,7 +453,11 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) { fileName += compressFileSuffix(compressType) fullPath := s.URI() + "/" + fileName +<<<<<<< HEAD writer, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(tctx, fileName, nil) +======= + writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil) +>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723)) if err != nil { tctx.L().Warn("fail to open file", zap.String("path", fullPath), @@ -486,7 +490,11 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, initRoutine := func() error { // use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close, // which will cause a context canceled error when closing gcs's Writer +<<<<<<< HEAD w, err := storage.WithCompression(s, compressType, storage.DecompressConfig{}).Create(pCtx, fileName, nil) +======= + w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil) +>>>>>>> 5309c2ff775 (*: support concurrent write for S3 writer (#45723)) if err != nil { pCtx.L().Warn("fail to open file", zap.String("path", fullPath),