From 8e8f28b7661839fd57360b2ea6d062f112c70bc8 Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 21 Nov 2024 17:12:48 +0800 Subject: [PATCH 01/10] fix flaky test `TestConcurrentLock` Signed-off-by: hillium --- br/pkg/storage/local.go | 14 ++++++++++++-- br/pkg/storage/locking.go | 5 +++-- br/pkg/storage/locking_test.go | 34 +++++++++++++++++++++++++++++----- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index b825c79e90381..a81f5611d33a0 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -28,7 +28,8 @@ const ( // // export for using in tests. type LocalStorage struct { - base string + base string + baseFD *os.File // Whether ignoring ENOINT while deleting. // Don't fail when deleting an unexist file is more like // a normal ExternalStorage implementation does. @@ -95,6 +96,11 @@ func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) er if err := os.Rename(tmpPath, filepath.Join(l.base, name)); err != nil { return errors.Trace(err) } + if err := l.baseFD.Sync(); err != nil { + // So the write can be observed by `Walk` immediately... + return errors.Trace(err) + } + return nil } @@ -283,5 +289,9 @@ func NewLocalStorage(base string) (*LocalStorage, error) { return nil, errors.Trace(err) } } - return &LocalStorage{base: base}, nil + baseFD, err := os.Open(base) + if err != nil { + return nil, errors.Trace(err) + } + return &LocalStorage{base: base, baseFD: baseFD}, nil } diff --git a/br/pkg/storage/locking.go b/br/pkg/storage/locking.go index 67ea7c2003081..2580e4c0667c7 100644 --- a/br/pkg/storage/locking.go +++ b/br/pkg/storage/locking.go @@ -82,7 +82,7 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U if err := checkConflict(); err != nil { return uuid.UUID{}, errors.Annotate(err, "during initial check") } - failpoint.Inject("exclusive-write-commit-to-1", func() {}) + failpoint.InjectCall("exclusive-write-commit-to-1") if err := s.WriteFile(cx, intentFileName, []byte{}); err != nil { return uuid.UUID{}, errors.Annotate(err, "during writing intention file") @@ -97,7 +97,7 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U if err := checkConflict(); err != nil { return uuid.UUID{}, errors.Annotate(err, "during checking whether there are other intentions") } - failpoint.Inject("exclusive-write-commit-to-2", func() {}) + failpoint.InjectCall("exclusive-write-commit-to-2") return txnID, s.WriteFile(cx, w.Target, w.Content(txnID)) } @@ -106,6 +106,7 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U func (cx VerifyWriteContext) assertNoOtherOfPrefixExpect(pfx string, expect string) error { fileName := path.Base(pfx) dirName := path.Dir(pfx) + return cx.Storage.WalkDir(cx, &WalkOption{ SubDir: dirName, ObjPrefix: fileName, diff --git a/br/pkg/storage/locking_test.go b/br/pkg/storage/locking_test.go index dc8757db7b774..cd0f00da9b830 100644 --- a/br/pkg/storage/locking_test.go +++ b/br/pkg/storage/locking_test.go @@ -6,6 +6,7 @@ import ( "context" "os" "path/filepath" + "sync/atomic" "testing" "github.com/pingcap/failpoint" @@ -86,8 +87,28 @@ func TestConcurrentLock(t *testing.T) { errChA := make(chan error, 1) errChB := make(chan error, 1) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", "1*pause")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", "1*pause")) + waitRecvTwice := func(ch chan<- struct{}) func() { + return func() { + ch <- struct{}{} + ch <- struct{}{} + } + } + // sync.OnceFunc holds a `Mutex` during executing `f`... + onceFunc := func(f func()) func() { + run := new(atomic.Bool) + return func() { + if run.CompareAndSwap(false, true) { + f() + } + } + } + chA := make(chan struct{}) + onceA := onceFunc(waitRecvTwice(chA)) + chB := make(chan struct{}) + onceB := onceFunc(waitRecvTwice(chB)) + + require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", onceA)) + require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", onceB)) go func() { _, err := storage.TryLockRemote(ctx, strg, "test.lock", "I wanna read it, but I hesitated before send my intention!") @@ -99,8 +120,11 @@ func TestConcurrentLock(t *testing.T) { errChB <- err }() - failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1") - failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2") + <-chA + <-chB + + <-chB + <-chA // There is exactly one error. errA := <-errChA @@ -108,7 +132,7 @@ func TestConcurrentLock(t *testing.T) { if errA == nil { require.Error(t, errB) } else { - require.NoError(t, errB) + require.NoError(t, errB, "%s", errA) } requireFileExists(t, filepath.Join(path, "test.lock")) From 30a4c993d39f992079d365432b4ce5a376e88f55 Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 21 Nov 2024 17:50:59 +0800 Subject: [PATCH 02/10] disable gosec Signed-off-by: hillium --- br/pkg/storage/local.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index a81f5611d33a0..ce1f8e7ba39b5 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -289,6 +289,10 @@ func NewLocalStorage(base string) (*LocalStorage, error) { return nil, errors.Trace(err) } } + + // Here the path targets to a directory and we will only call `Sync` over it. + // Disable the G304 warning which focus on relative path injection like "../../import_stuff". + //nolint: gosec baseFD, err := os.Open(base) if err != nil { return nil, errors.Trace(err) From 1caabaf21f88806fec839770fe378fa4a118c8b2 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 22 Nov 2024 11:36:44 +0800 Subject: [PATCH 03/10] close fd when closing storage Signed-off-by: hillium --- br/pkg/storage/local.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index ce1f8e7ba39b5..46a6de05012ee 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -262,7 +262,9 @@ func (l *LocalStorage) Rename(_ context.Context, oldFileName, newFileName string } // Close implements ExternalStorage interface. -func (*LocalStorage) Close() {} +func (l *LocalStorage) Close() { + _ = l.baseFD.Close() +} func pathExists(_path string) (bool, error) { _, err := os.Stat(_path) From 8ac1d3f5dc7765313c0eb81d526c8b06f215ac40 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 22 Nov 2024 12:42:28 +0800 Subject: [PATCH 04/10] make a better name for once Signed-off-by: hillium --- br/pkg/storage/locking_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/br/pkg/storage/locking_test.go b/br/pkg/storage/locking_test.go index cd0f00da9b830..2bd044dd9441a 100644 --- a/br/pkg/storage/locking_test.go +++ b/br/pkg/storage/locking_test.go @@ -93,8 +93,8 @@ func TestConcurrentLock(t *testing.T) { ch <- struct{}{} } } - // sync.OnceFunc holds a `Mutex` during executing `f`... - onceFunc := func(f func()) func() { + + asyncOnceFunc := func(f func()) func() { run := new(atomic.Bool) return func() { if run.CompareAndSwap(false, true) { @@ -103,9 +103,9 @@ func TestConcurrentLock(t *testing.T) { } } chA := make(chan struct{}) - onceA := onceFunc(waitRecvTwice(chA)) + onceA := asyncOnceFunc(waitRecvTwice(chA)) chB := make(chan struct{}) - onceB := onceFunc(waitRecvTwice(chB)) + onceB := asyncOnceFunc(waitRecvTwice(chB)) require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", onceA)) require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", onceB)) From fbd4b5670906cab7bc5808646a6cbe3bdeed0ce1 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 27 Nov 2024 15:26:40 +0800 Subject: [PATCH 05/10] sync basedir Signed-off-by: hillium --- br/pkg/storage/local.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index 46a6de05012ee..5e5fdf73cdded 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -7,6 +7,7 @@ import ( "context" "io" "os" + "path" "path/filepath" "strings" @@ -28,8 +29,7 @@ const ( // // export for using in tests. type LocalStorage struct { - base string - baseFD *os.File + base string // Whether ignoring ENOINT while deleting. // Don't fail when deleting an unexist file is more like // a normal ExternalStorage implementation does. @@ -93,15 +93,12 @@ func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) er return errors.Trace(err) } } - if err := os.Rename(tmpPath, filepath.Join(l.base, name)); err != nil { - return errors.Trace(err) - } - if err := l.baseFD.Sync(); err != nil { - // So the write can be observed by `Walk` immediately... + targetPath := filepath.Join(l.base, name) + if err := os.Rename(tmpPath, targetPath); err != nil { return errors.Trace(err) } - return nil + return syncPath(path.Dir(targetPath)) } // ReadFile reads the file from the storage and returns the contents. @@ -263,7 +260,6 @@ func (l *LocalStorage) Rename(_ context.Context, oldFileName, newFileName string // Close implements ExternalStorage interface. func (l *LocalStorage) Close() { - _ = l.baseFD.Close() } func pathExists(_path string) (bool, error) { @@ -292,12 +288,17 @@ func NewLocalStorage(base string) (*LocalStorage, error) { } } + return &LocalStorage{base: base}, nil +} + +func syncPath(path string) error { // Here the path targets to a directory and we will only call `Sync` over it. // Disable the G304 warning which focus on relative path injection like "../../import_stuff". //nolint: gosec - baseFD, err := os.Open(base) + file, err := os.Open(path) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } - return &LocalStorage{base: base, baseFD: baseFD}, nil + defer file.Close() + return errors.Trace(file.Sync()) } From 7b36abbd594e34240f7ad9cb8bc6adfa601b830d Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 28 Nov 2024 15:44:39 +0800 Subject: [PATCH 06/10] added strong consistency mark and allow local storage emit tombstone on `Walk`. Signed-off-by: hillium --- br/pkg/storage/azblob.go | 4 ++++ br/pkg/storage/gcs.go | 4 ++++ br/pkg/storage/local.go | 50 ++++++++++++++++----------------------- br/pkg/storage/locking.go | 12 ++++++++-- br/pkg/storage/s3.go | 4 ++++ br/pkg/storage/storage.go | 17 ++++++++++++- 6 files changed, 58 insertions(+), 33 deletions(-) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index d8ee26fd5713b..eda934a3f4aad 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -326,6 +326,10 @@ type AzureBlobStorage struct { cpkInfo *blob.CPKInfo } +func (*AzureBlobStorage) MarkStrongConsistency() { + // See https://github.com/MicrosoftDocs/azure-docs/issues/105331#issuecomment-1450252384 +} + func newAzureBlobStorage(ctx context.Context, options *backuppb.AzureBlobStorage, opts *ExternalStorageOptions) (*AzureBlobStorage, error) { clientBuilder, err := getAzureServiceClientBuilder(options, opts) if err != nil { diff --git a/br/pkg/storage/gcs.go b/br/pkg/storage/gcs.go index f9a01856b6e44..0ad39f7d32dfb 100644 --- a/br/pkg/storage/gcs.go +++ b/br/pkg/storage/gcs.go @@ -113,6 +113,10 @@ type GCSStorage struct { clients []*storage.Client } +func (s *GCSStorage) MarkStrongConsistency() { + // See https://cloud.google.com/storage/docs/consistency#strongly_consistent_operations +} + // GetBucketHandle gets the handle to the GCS API on the bucket. func (s *GCSStorage) GetBucketHandle() *storage.BucketHandle { i := s.idx.Inc() % int64(len(s.handles)) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index 5e5fdf73cdded..02644972273f7 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -7,7 +7,6 @@ import ( "context" "io" "os" - "path" "path/filepath" "strings" @@ -98,7 +97,7 @@ func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) er return errors.Trace(err) } - return syncPath(path.Dir(targetPath)) + return nil } // ReadFile reads the file from the storage and returns the contents. @@ -124,10 +123,17 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin opt = &WalkOption{} } base := filepath.Join(l.base, opt.SubDir) - return filepath.Walk(base, func(path string, f os.FileInfo, err error) error { + return filepath.WalkDir(base, func(path string, f os.DirEntry, err error) error { if os.IsNotExist(err) { - // if path not exists, we should return nil to continue. - return nil + if !opt.IncludeTombstone { + return nil + } + // We should return to the caller relative path. + path, _ = filepath.Rel(l.base, path) + if !strings.HasPrefix(path, opt.ObjPrefix) { + return nil + } + return fn(path, TombstoneSize) } if err != nil { return errors.Trace(err) @@ -143,27 +149,23 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin } return nil } + // in mac osx, the path parameter is absolute path; in linux, the path is relative path to execution base dir, // so use Rel to convert to relative path to l.base path, _ = filepath.Rel(l.base, path) - if !strings.HasPrefix(path, opt.ObjPrefix) { return nil } - size := f.Size() - // if not a regular file, we need to use os.stat to get the real file size - if !f.Mode().IsRegular() { - stat, err := os.Stat(filepath.Join(l.base, path)) - if err != nil { - // error may happen because of file deleted after walk started, or other errors - // like #49423. We just return 0 size and let the caller handle it in later - // logic. - log.Warn("failed to get file size", zap.String("path", path), zap.Error(err)) - return fn(path, 0) - } - size = stat.Size() + stat, err := os.Stat(filepath.Join(l.base, path)) + if err != nil { + // error may happen because of file deleted after walk started, or other errors + // like #49423. We just return 0 size and let the caller handle it in later + // logic. + log.Warn("failed to get file size", zap.String("path", path), zap.Error(err)) + return fn(path, 0) } + size := stat.Size() return fn(path, size) }) } @@ -290,15 +292,3 @@ func NewLocalStorage(base string) (*LocalStorage, error) { return &LocalStorage{base: base}, nil } - -func syncPath(path string) error { - // Here the path targets to a directory and we will only call `Sync` over it. - // Disable the G304 warning which focus on relative path injection like "../../import_stuff". - //nolint: gosec - file, err := os.Open(path) - if err != nil { - return errors.Trace(err) - } - defer file.Close() - return errors.Trace(file.Sync()) -} diff --git a/br/pkg/storage/locking.go b/br/pkg/storage/locking.go index 2580e4c0667c7..b4915f9d4dcdc 100644 --- a/br/pkg/storage/locking.go +++ b/br/pkg/storage/locking.go @@ -63,6 +63,12 @@ func (cx *VerifyWriteContext) IntentFileName() string { // - There shouldn't be any other intention files. // - Verify() returns no error. (If there is one.) func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.UUID, error) { + if _, ok := s.(StrongConsisency); !ok { + log.Warn("The external storage implementation doesn't provide a strong consistency guarantee. "+ + "Please avoid concurrently accessing it if possible.", + zap.String("type", fmt.Sprintf("%T", s))) + } + txnID := uuid.New() cx := VerifyWriteContext{ Context: ctx, @@ -108,8 +114,10 @@ func (cx VerifyWriteContext) assertNoOtherOfPrefixExpect(pfx string, expect stri dirName := path.Dir(pfx) return cx.Storage.WalkDir(cx, &WalkOption{ - SubDir: dirName, - ObjPrefix: fileName, + SubDir: dirName, + ObjPrefix: fileName, + // We'd better read a deleted intention... + IncludeTombstone: true, }, func(path string, size int64) error { if path != expect { return fmt.Errorf("there is conflict file %s", path) diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index 3987512b2a0a2..00a90f55c3400 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -83,6 +83,10 @@ type S3Storage struct { options *backuppb.S3 } +func (*S3Storage) MarkStrongConsistency() { + // See https://aws.amazon.com/cn/s3/consistency/ +} + // GetS3APIHandle gets the handle to the S3 API. func (rs *S3Storage) GetS3APIHandle() s3iface.S3API { return rs.svc diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 042d4a1f4d715..0ad8cdc20aeac 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -18,6 +18,12 @@ import ( // Permission represents the permission we need to check in create storage. type Permission string +// StrongConsistency is a marker interface that indicates the storage is strong consistent +// over its `Read`, `Write` and `WalkDir` APIs. +type StrongConsisency interface { + MarkStrongConsistency() +} + const ( // AccessBuckets represents bucket access permission // it replace the origin skip-check-path. @@ -33,7 +39,8 @@ const ( // we cannot check DeleteObject permission alone, so we use PutAndDeleteObject instead. PutAndDeleteObject Permission = "PutAndDeleteObject" - DefaultRequestConcurrency uint = 128 + DefaultRequestConcurrency uint = 128 + TombstoneSize int64 = -1 ) // WalkOption is the option of storage.WalkDir. @@ -62,6 +69,14 @@ type WalkOption struct { // to reduce the possibility of timeout on an extremely slow connection, or // perform testing. ListCount int64 + // IncludeTombstone will allow `Walk` to emit removed files during walking. + // + // In most cases, `Walk` runs over a snapshot, if a file in the snapshot + // was deleted during walking, the file will be ignored. Set this to `true` + // will make them be sent to the callback. + // + // The size of a deleted file should be `TombstoneSize`. + IncludeTombstone bool } // ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods. From 07e9efda44b69b4fd24fd5b82ce7011e39f64f4a Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 29 Nov 2024 14:11:00 +0800 Subject: [PATCH 07/10] make fmt Signed-off-by: hillium --- br/pkg/storage/locking.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/storage/locking.go b/br/pkg/storage/locking.go index b4915f9d4dcdc..9a5ce3a26cd39 100644 --- a/br/pkg/storage/locking.go +++ b/br/pkg/storage/locking.go @@ -114,8 +114,8 @@ func (cx VerifyWriteContext) assertNoOtherOfPrefixExpect(pfx string, expect stri dirName := path.Dir(pfx) return cx.Storage.WalkDir(cx, &WalkOption{ - SubDir: dirName, - ObjPrefix: fileName, + SubDir: dirName, + ObjPrefix: fileName, // We'd better read a deleted intention... IncludeTombstone: true, }, func(path string, size int64) error { From 214ab0a2e93b44fc138a77d9a3202d9c9709df16 Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 2 Dec 2024 17:17:33 +0800 Subject: [PATCH 08/10] fix test pkg/executor/importer/import_test.go:411 failure Signed-off-by: hillium --- br/pkg/storage/local.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index 02644972273f7..cdfe99359da32 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -124,17 +124,6 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin } base := filepath.Join(l.base, opt.SubDir) return filepath.WalkDir(base, func(path string, f os.DirEntry, err error) error { - if os.IsNotExist(err) { - if !opt.IncludeTombstone { - return nil - } - // We should return to the caller relative path. - path, _ = filepath.Rel(l.base, path) - if !strings.HasPrefix(path, opt.ObjPrefix) { - return nil - } - return fn(path, TombstoneSize) - } if err != nil { return errors.Trace(err) } @@ -157,6 +146,21 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin return nil } + // Check whether the file exists... + // If not, should follow the configuration `IncludeTombstone`. + _, err = os.Lstat(filepath.Join(l.base, path)) + if err != nil { + if os.IsNotExist(err) { + if !opt.IncludeTombstone { + return nil + } + return fn(path, TombstoneSize) + } + return err + } + + // Keep the behavior with past: ignore errors happen during reading stats from symlinks and return zero... + // NOTE: For now `WalkDir` emits broken symlinks but ignores deleted files by default... Can we make it better? stat, err := os.Stat(filepath.Join(l.base, path)) if err != nil { // error may happen because of file deleted after walk started, or other errors From c94162963883ec5bf33166927d1c240e9b284f62 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 3 Dec 2024 12:11:27 +0800 Subject: [PATCH 09/10] make Walk don't return error when walking empty dir Signed-off-by: hillium --- br/pkg/storage/local.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index cdfe99359da32..71afd4278ad7c 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -125,6 +125,13 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin base := filepath.Join(l.base, opt.SubDir) return filepath.WalkDir(base, func(path string, f os.DirEntry, err error) error { if err != nil { + if os.IsNotExist(err) { + // Return directly if the root dir not found as we did before. + // `WalkDir` will try to Lstat the root dir only. + // It yields non-exist files. + log.Warn("Local: WalkDir called in an non-exist dir.", zap.String("path", path)) + return nil + } return errors.Trace(err) } @@ -152,6 +159,7 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin if err != nil { if os.IsNotExist(err) { if !opt.IncludeTombstone { + log.Info("Local Storage Hint: WalkDir yields a tomestone, a race may happen.", zap.String("path", path)) return nil } return fn(path, TombstoneSize) From 4c932605297cff0068dbad7f77f5111443de24d6 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 3 Dec 2024 13:55:06 +0800 Subject: [PATCH 10/10] restore local.go, use a much simpler implementation to don't change default behavior Signed-off-by: hillium --- br/pkg/storage/local.go | 71 ++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index 71afd4278ad7c..154ab9a1c0858 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" "go.uber.org/zap" ) @@ -92,11 +93,9 @@ func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) er return errors.Trace(err) } } - targetPath := filepath.Join(l.base, name) - if err := os.Rename(tmpPath, targetPath); err != nil { + if err := os.Rename(tmpPath, filepath.Join(l.base, name)); err != nil { return errors.Trace(err) } - return nil } @@ -123,15 +122,27 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin opt = &WalkOption{} } base := filepath.Join(l.base, opt.SubDir) - return filepath.WalkDir(base, func(path string, f os.DirEntry, err error) error { - if err != nil { - if os.IsNotExist(err) { - // Return directly if the root dir not found as we did before. - // `WalkDir` will try to Lstat the root dir only. - // It yields non-exist files. - log.Warn("Local: WalkDir called in an non-exist dir.", zap.String("path", path)) + return filepath.Walk(base, func(path string, f os.FileInfo, err error) error { + if os.IsNotExist(err) { + log.Info("Local Storage Hint: WalkDir yields a tomestone, a race may happen.", zap.String("path", path)) + if !opt.IncludeTombstone { + // if path not exists and the client doesn't require its tombstone, + // we should return nil to continue. + return nil + } + path, err = filepath.Rel(l.base, path) + if err != nil { + log.Panic("filepath.Walk returns a path that isn't a subdir of the base dir.", + zap.String("path", path), zap.String("base", l.base), logutil.ShortError(err)) + } + if !strings.HasPrefix(path, opt.ObjPrefix) { return nil } + // NOTE: This may cause a tombstone of the dir emit to the caller when + // call `Walk` in a non-exist dir. + return fn(path, TombstoneSize) + } + if err != nil { return errors.Trace(err) } @@ -145,39 +156,27 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin } return nil } - // in mac osx, the path parameter is absolute path; in linux, the path is relative path to execution base dir, // so use Rel to convert to relative path to l.base path, _ = filepath.Rel(l.base, path) + if !strings.HasPrefix(path, opt.ObjPrefix) { return nil } - // Check whether the file exists... - // If not, should follow the configuration `IncludeTombstone`. - _, err = os.Lstat(filepath.Join(l.base, path)) - if err != nil { - if os.IsNotExist(err) { - if !opt.IncludeTombstone { - log.Info("Local Storage Hint: WalkDir yields a tomestone, a race may happen.", zap.String("path", path)) - return nil - } - return fn(path, TombstoneSize) + size := f.Size() + // if not a regular file, we need to use os.stat to get the real file size + if !f.Mode().IsRegular() { + stat, err := os.Stat(filepath.Join(l.base, path)) + if err != nil { + // error may happen because of file deleted after walk started, or other errors + // like #49423. We just return 0 size and let the caller handle it in later + // logic. + log.Warn("failed to get file size", zap.String("path", path), zap.Error(err)) + return fn(path, 0) } - return err + size = stat.Size() } - - // Keep the behavior with past: ignore errors happen during reading stats from symlinks and return zero... - // NOTE: For now `WalkDir` emits broken symlinks but ignores deleted files by default... Can we make it better? - stat, err := os.Stat(filepath.Join(l.base, path)) - if err != nil { - // error may happen because of file deleted after walk started, or other errors - // like #49423. We just return 0 size and let the caller handle it in later - // logic. - log.Warn("failed to get file size", zap.String("path", path), zap.Error(err)) - return fn(path, 0) - } - size := stat.Size() return fn(path, size) }) } @@ -273,8 +272,7 @@ func (l *LocalStorage) Rename(_ context.Context, oldFileName, newFileName string } // Close implements ExternalStorage interface. -func (l *LocalStorage) Close() { -} +func (*LocalStorage) Close() {} func pathExists(_path string) (bool, error) { _, err := os.Stat(_path) @@ -301,6 +299,5 @@ func NewLocalStorage(base string) (*LocalStorage, error) { return nil, errors.Trace(err) } } - return &LocalStorage{base: base}, nil }