Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support concurrent write for S3 writer (#45723) #48603

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/mock/storage/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (s *AzureBlobStorage) URI() string {
}

// Create implements the StorageWriter interface.
func (s *AzureBlobStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
uploader := &azblobUploader{
blobClient: client,
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ func WithCompression(inner ExternalStorage, compressionType CompressType) Extern
return &withCompression{ExternalStorage: inner, compressType: compressionType}
}

func (w *withCompression) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
var (
writer ExternalFileWriter
err error
)
if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok {
writer, err = s3Storage.CreateUploader(ctx, name)
} else {
writer, err = w.ExternalStorage.Create(ctx, name)
writer, err = w.ExternalStorage.Create(ctx, name, nil)
}
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *GCSStorage) URI() string {
}

// Create implements ExternalStorage interface.
func (s *GCSStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
object := s.objectName(name)
wc := s.bucket.Object(object).NewWriter(ctx)
wc.StorageClass = s.gcs.StorageClass
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *HDFSStorage) URI() string {
}

// Create opens a file writer by path. path is relative path to storage base path
func (*HDFSStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (l *LocalStorage) Open(_ context.Context, path string) (ExternalFileReader,
}

// Create implements ExternalStorage interface.
func (l *LocalStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
file, err := os.Create(filepath.Join(l.base, name))
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestDeleteFile(t *testing.T) {
require.NoError(t, err)
require.Equal(t, false, ret)

_, err = store.Create(context.Background(), name)
_, err = store.Create(context.Background(), name, nil)
require.NoError(t, err)

ret, err = store.FileExists(context.Background(), name)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (*MemStorage) URI() string {
// Create creates a file and returning a writer to write data into.
// When the writer is closed, the data is stored in the file.
// It implements the `ExternalStorage` interface
func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestMemStoreBasic(t *testing.T) {
require.NotNil(t, err)

// create a writer to write
w, err := store.Create(ctx, "/hello.txt")
w, err := store.Create(ctx, "/hello.txt", nil)
require.Nil(t, err)
_, err = w.Write(ctx, []byte("hello world 3"))
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (*noopStorage) URI() string {
}

// Create implements ExternalStorage interface.
func (*noopStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
return &noopWriter{}, nil
}

Expand Down
57 changes: 53 additions & 4 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

alicred "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
Expand Down Expand Up @@ -908,11 +909,59 @@ func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (ExternalF
}, nil
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
uploader, err := rs.CreateUploader(ctx, name)
type s3ObjectWriter struct {
wd *io.PipeWriter
wg *sync.WaitGroup
err error
}

// Write implement the io.Writer interface.
func (s *s3ObjectWriter) Write(_ context.Context, p []byte) (int, error) {
return s.wd.Write(p)
}

// Close implement the io.Closer interface.
func (s *s3ObjectWriter) Close(_ context.Context) error {
err := s.wd.Close()
if err != nil {
return nil, err
return err
}
s.wg.Wait()
return s.err
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) {
var uploader ExternalFileWriter
var err error
if option == nil || option.Concurrency <= 1 {
uploader, err = rs.CreateUploader(ctx, name)
if err != nil {
return nil, err
}
} else {
up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) {
u.Concurrency = option.Concurrency
u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024)
})
rd, wd := io.Pipe()
upParams := &s3manager.UploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
Body: rd,
}
s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}}
s3Writer.wg.Add(1)
go func() {
_, err := up.UploadWithContext(ctx, upParams)
err1 := rd.Close()
if err != nil {
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1))
}
s3Writer.err = err
s3Writer.wg.Done()
}()
uploader = s3Writer
}
uploaderWriter := newBufferedWriter(uploader, hardcodedS3ChunkSize, NoCompression)
return uploaderWriter, nil
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type Writer interface {
Close(ctx context.Context) error
}

type WriterOption struct {
Concurrency int
}

// ExternalStorage represents a kind of file system storage.
type ExternalStorage interface {
// WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic
Expand All @@ -100,8 +104,8 @@ type ExternalStorage interface {
// URI returns the base path as a URI
URI() string

// Create opens a file writer by path. path is relative path to storage base path
Create(ctx context.Context, path string) (ExternalFileWriter, error)
// Create opens a file writer by path. path is relative path to storage base path. Currently only s3 implemented WriterOption
Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
// Rename file name from oldFileName to newFileName
Rename(ctx context.Context, oldFileName, newFileName string) error
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestExternalFileWriter(t *testing.T) {
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt"
writer, err := storage.Create(ctx, fileName)
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestCompressReaderWriter(t *testing.T) {
storage = WithCompression(storage, test.compressType)
suffix := createSuffixString(test.compressType)
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
writer, err := storage.Create(ctx, fileName)
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func fakeCheckpointFiles(
filename := fmt.Sprintf("%v.ts", info.storeID)
buff := make([]byte, 8)
binary.LittleEndian.PutUint64(buff, info.global_checkpoint)
if _, err := s.Create(ctx, filename); err != nil {
if _, err := s.Create(ctx, filename, nil); err != nil {
return errors.Trace(err)
}
if err := s.WriteFile(ctx, filename, buff); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName)
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil)
if err != nil {
tctx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down Expand Up @@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
initRoutine := func() error {
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
// which will cause a context canceled error when closing gcs's Writer
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName)
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil)
if err != nil {
pCtx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down