diff --git a/br/pkg/mock/storage/storage.go b/br/pkg/mock/storage/storage.go index 32e96c1dd3448..83db0fffe0b88 100644 --- a/br/pkg/mock/storage/storage.go +++ b/br/pkg/mock/storage/storage.go @@ -38,7 +38,7 @@ func (m *MockExternalStorage) EXPECT() *MockExternalStorageMockRecorder { } // Create mocks base method. -func (m *MockExternalStorage) Create(arg0 context.Context, arg1 string) (storage.ExternalFileWriter, error) { +func (m *MockExternalStorage) Create(arg0 context.Context, arg1 string, _ *storage.WriterOption) (storage.ExternalFileWriter, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Create", arg0, arg1) ret0, _ := ret[0].(storage.ExternalFileWriter) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index fb2e60392ff2d..f985af7d42155 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -415,7 +415,7 @@ func (s *AzureBlobStorage) URI() string { } // Create implements the StorageWriter interface. -func (s *AzureBlobStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) { +func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { client := s.containerClient.NewBlockBlobClient(s.withPrefix(name)) uploader := &azblobUploader{ blobClient: client, diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 5794c813c9d5f..9790f9f9d8196 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -24,7 +24,7 @@ func WithCompression(inner ExternalStorage, compressionType CompressType) Extern return &withCompression{ExternalStorage: inner, compressType: compressionType} } -func (w *withCompression) Create(ctx context.Context, name string) (ExternalFileWriter, error) { +func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { var ( writer ExternalFileWriter err error @@ -32,7 +32,7 @@ func (w *withCompression) Create(ctx context.Context, name string) (ExternalFile if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok { writer, err = s3Storage.CreateUploader(ctx, name) } else { - writer, err = w.ExternalStorage.Create(ctx, name) + writer, err = w.ExternalStorage.Create(ctx, name, nil) } if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index c5a0adccfa6a2..0305a2e1eb376 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -238,7 +238,7 @@ func (s *GCSStorage) URI() string { } // Create implements ExternalStorage interface. -func (s *GCSStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) { +func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { object := s.objectName(name) wc := s.bucket.Object(object).NewWriter(ctx) wc.StorageClass = s.gcs.StorageClass diff --git a/br/pkg/storage/hdfs.go b/br/pkg/storage/hdfs.go index 4a971f7a3cb0f..5877febbafab2 100644 --- a/br/pkg/storage/hdfs.go +++ b/br/pkg/storage/hdfs.go @@ -123,7 +123,7 @@ func (s *HDFSStorage) URI() string { } // Create opens a file writer by path. path is relative path to storage base path -func (*HDFSStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) { +func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) { return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup") } diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index 0259e715c7968..c0285812dc720 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -131,7 +131,7 @@ func (l *LocalStorage) Open(_ context.Context, path string) (ExternalFileReader, } // Create implements ExternalStorage interface. -func (l *LocalStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) { +func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { file, err := os.Create(filepath.Join(l.base, name)) if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/storage/local_test.go b/br/pkg/storage/local_test.go index db1ba424b9d6b..21ac630c865f1 100644 --- a/br/pkg/storage/local_test.go +++ b/br/pkg/storage/local_test.go @@ -25,7 +25,7 @@ func TestDeleteFile(t *testing.T) { require.NoError(t, err) require.Equal(t, false, ret) - _, err = store.Create(context.Background(), name) + _, err = store.Create(context.Background(), name, nil) require.NoError(t, err) ret, err = store.FileExists(context.Background(), name) diff --git a/br/pkg/storage/memstore.go b/br/pkg/storage/memstore.go index 96276ca600790..0ae108b89835e 100644 --- a/br/pkg/storage/memstore.go +++ b/br/pkg/storage/memstore.go @@ -219,7 +219,7 @@ func (*MemStorage) URI() string { // Create creates a file and returning a writer to write data into. // When the writer is closed, the data is stored in the file. // It implements the `ExternalStorage` interface -func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) { +func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) { select { case <-ctx.Done(): return nil, ctx.Err() diff --git a/br/pkg/storage/memstore_test.go b/br/pkg/storage/memstore_test.go index a85a2ff467fa1..bc122e6c2307c 100644 --- a/br/pkg/storage/memstore_test.go +++ b/br/pkg/storage/memstore_test.go @@ -51,7 +51,7 @@ func TestMemStoreBasic(t *testing.T) { require.NotNil(t, err) // create a writer to write - w, err := store.Create(ctx, "/hello.txt") + w, err := store.Create(ctx, "/hello.txt", nil) require.Nil(t, err) _, err = w.Write(ctx, []byte("hello world 3")) require.Nil(t, err) diff --git a/br/pkg/storage/noop.go b/br/pkg/storage/noop.go index 8e58366efdcf5..9d8e39abde68c 100644 --- a/br/pkg/storage/noop.go +++ b/br/pkg/storage/noop.go @@ -43,7 +43,7 @@ func (*noopStorage) URI() string { } // Create implements ExternalStorage interface. -func (*noopStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) { +func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) { return &noopWriter{}, nil } diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index c127c7de745e2..614c04a795bb5 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -12,6 +12,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" alicred "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials" @@ -908,11 +909,59 @@ func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (ExternalF }, nil } -// Create creates multi upload request. -func (rs *S3Storage) Create(ctx context.Context, name string) (ExternalFileWriter, error) { - uploader, err := rs.CreateUploader(ctx, name) +type s3ObjectWriter struct { + wd *io.PipeWriter + wg *sync.WaitGroup + err error +} + +// Write implement the io.Writer interface. +func (s *s3ObjectWriter) Write(_ context.Context, p []byte) (int, error) { + return s.wd.Write(p) +} + +// Close implement the io.Closer interface. +func (s *s3ObjectWriter) Close(_ context.Context) error { + err := s.wd.Close() if err != nil { - return nil, err + return err + } + s.wg.Wait() + return s.err +} + +// Create creates multi upload request. +func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) { + var uploader ExternalFileWriter + var err error + if option == nil || option.Concurrency <= 1 { + uploader, err = rs.CreateUploader(ctx, name) + if err != nil { + return nil, err + } + } else { + up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) { + u.Concurrency = option.Concurrency + u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024) + }) + rd, wd := io.Pipe() + upParams := &s3manager.UploadInput{ + Bucket: aws.String(rs.options.Bucket), + Key: aws.String(rs.options.Prefix + name), + Body: rd, + } + s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}} + s3Writer.wg.Add(1) + go func() { + _, err := up.UploadWithContext(ctx, upParams) + err1 := rd.Close() + if err != nil { + log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1)) + } + s3Writer.err = err + s3Writer.wg.Done() + }() + uploader = s3Writer } uploaderWriter := newBufferedWriter(uploader, hardcodedS3ChunkSize, NoCompression) return uploaderWriter, nil diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 8f22d65ccb3bd..90f4aaf7ac3e4 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -77,6 +77,10 @@ type Writer interface { Close(ctx context.Context) error } +type WriterOption struct { + Concurrency int +} + // ExternalStorage represents a kind of file system storage. type ExternalStorage interface { // WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic @@ -100,8 +104,8 @@ type ExternalStorage interface { // URI returns the base path as a URI URI() string - // Create opens a file writer by path. path is relative path to storage base path - Create(ctx context.Context, path string) (ExternalFileWriter, error) + // Create opens a file writer by path. path is relative path to storage base path. Currently only s3 implemented WriterOption + Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) // Rename file name from oldFileName to newFileName Rename(ctx context.Context, oldFileName, newFileName string) error } diff --git a/br/pkg/storage/writer_test.go b/br/pkg/storage/writer_test.go index 22fa87d34de47..4f41aeb97a4f1 100644 --- a/br/pkg/storage/writer_test.go +++ b/br/pkg/storage/writer_test.go @@ -29,7 +29,7 @@ func TestExternalFileWriter(t *testing.T) { storage, err := Create(ctx, backend, true) require.NoError(t, err) fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt" - writer, err := storage.Create(ctx, fileName) + writer, err := storage.Create(ctx, fileName, nil) require.NoError(t, err) for _, str := range test.content { p := []byte(str) @@ -105,7 +105,7 @@ func TestCompressReaderWriter(t *testing.T) { storage = WithCompression(storage, test.compressType) suffix := createSuffixString(test.compressType) fileName := strings.ReplaceAll(test.name, " ", "-") + suffix - writer, err := storage.Create(ctx, fileName) + writer, err := storage.Create(ctx, fileName, nil) require.NoError(t, err) for _, str := range test.content { p := []byte(str) diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 4c999c979d5b7..8d2ecb35c9247 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -155,7 +155,7 @@ func fakeCheckpointFiles( filename := fmt.Sprintf("%v.ts", info.storeID) buff := make([]byte, 8) binary.LittleEndian.PutUint64(buff, info.global_checkpoint) - if _, err := s.Create(ctx, filename); err != nil { + if _, err := s.Create(ctx, filename, nil); err != nil { return errors.Trace(err) } if err := s.WriteFile(ctx, filename, buff); err != nil { diff --git a/dumpling/export/writer_util.go b/dumpling/export/writer_util.go index fe7672f4f0015..2a4dbbc1cf4cd 100644 --- a/dumpling/export/writer_util.go +++ b/dumpling/export/writer_util.go @@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) { fileName += compressFileSuffix(compressType) fullPath := s.URI() + "/" + fileName - writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName) + writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil) if err != nil { tctx.L().Warn("fail to open file", zap.String("path", fullPath), @@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage, initRoutine := func() error { // use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close, // which will cause a context canceled error when closing gcs's Writer - w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName) + w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil) if err != nil { pCtx.L().Warn("fail to open file", zap.String("path", fullPath),