Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix flaky test TestConcurrentLock #57591

Merged
merged 10 commits into from
Dec 3, 2024
4 changes: 4 additions & 0 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ type AzureBlobStorage struct {
cpkInfo *blob.CPKInfo
}

func (*AzureBlobStorage) MarkStrongConsistency() {
// See https://github.com/MicrosoftDocs/azure-docs/issues/105331#issuecomment-1450252384
}

func newAzureBlobStorage(ctx context.Context, options *backuppb.AzureBlobStorage, opts *ExternalStorageOptions) (*AzureBlobStorage, error) {
clientBuilder, err := getAzureServiceClientBuilder(options, opts)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ type GCSStorage struct {
clients []*storage.Client
}

func (s *GCSStorage) MarkStrongConsistency() {
// See https://cloud.google.com/storage/docs/consistency#strongly_consistent_operations
}

// GetBucketHandle gets the handle to the GCS API on the bucket.
func (s *GCSStorage) GetBucketHandle() *storage.BucketHandle {
i := s.idx.Inc() % int64(len(s.handles))
Expand Down
20 changes: 18 additions & 2 deletions br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -123,8 +124,23 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin
base := filepath.Join(l.base, opt.SubDir)
return filepath.Walk(base, func(path string, f os.FileInfo, err error) error {
if os.IsNotExist(err) {
// if path not exists, we should return nil to continue.
return nil
log.Info("Local Storage Hint: WalkDir yields a tomestone, a race may happen.", zap.String("path", path))
if !opt.IncludeTombstone {
// if path not exists and the client doesn't require its tombstone,
// we should return nil to continue.
return nil
}
path, err = filepath.Rel(l.base, path)
if err != nil {
log.Panic("filepath.Walk returns a path that isn't a subdir of the base dir.",
zap.String("path", path), zap.String("base", l.base), logutil.ShortError(err))
}
if !strings.HasPrefix(path, opt.ObjPrefix) {
return nil
}
// NOTE: This may cause a tombstone of the dir emit to the caller when
// call `Walk` in a non-exist dir.
return fn(path, TombstoneSize)
}
if err != nil {
return errors.Trace(err)
Expand Down
13 changes: 11 additions & 2 deletions br/pkg/storage/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (cx *VerifyWriteContext) IntentFileName() string {
// - There shouldn't be any other intention files.
// - Verify() returns no error. (If there is one.)
func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.UUID, error) {
if _, ok := s.(StrongConsisency); !ok {
log.Warn("The external storage implementation doesn't provide a strong consistency guarantee. "+
"Please avoid concurrently accessing it if possible.",
zap.String("type", fmt.Sprintf("%T", s)))
}

txnID := uuid.New()
cx := VerifyWriteContext{
Context: ctx,
Expand All @@ -82,7 +88,7 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U
if err := checkConflict(); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during initial check")
}
failpoint.Inject("exclusive-write-commit-to-1", func() {})
failpoint.InjectCall("exclusive-write-commit-to-1")

if err := s.WriteFile(cx, intentFileName, []byte{}); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during writing intention file")
Expand All @@ -97,7 +103,7 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U
if err := checkConflict(); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during checking whether there are other intentions")
}
failpoint.Inject("exclusive-write-commit-to-2", func() {})
failpoint.InjectCall("exclusive-write-commit-to-2")

return txnID, s.WriteFile(cx, w.Target, w.Content(txnID))
}
Expand All @@ -106,9 +112,12 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U
func (cx VerifyWriteContext) assertNoOtherOfPrefixExpect(pfx string, expect string) error {
fileName := path.Base(pfx)
dirName := path.Dir(pfx)

return cx.Storage.WalkDir(cx, &WalkOption{
SubDir: dirName,
ObjPrefix: fileName,
// We'd better read a deleted intention...
IncludeTombstone: true,
}, func(path string, size int64) error {
if path != expect {
return fmt.Errorf("there is conflict file %s", path)
Expand Down
34 changes: 29 additions & 5 deletions br/pkg/storage/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"os"
"path/filepath"
"sync/atomic"
"testing"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -86,8 +87,28 @@ func TestConcurrentLock(t *testing.T) {
errChA := make(chan error, 1)
errChB := make(chan error, 1)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", "1*pause"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", "1*pause"))
waitRecvTwice := func(ch chan<- struct{}) func() {
return func() {
ch <- struct{}{}
ch <- struct{}{}
}
}

asyncOnceFunc := func(f func()) func() {
run := new(atomic.Bool)
return func() {
if run.CompareAndSwap(false, true) {
f()
}
}
}
chA := make(chan struct{})
onceA := asyncOnceFunc(waitRecvTwice(chA))
chB := make(chan struct{})
onceB := asyncOnceFunc(waitRecvTwice(chB))

require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", onceA))
require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", onceB))

go func() {
_, err := storage.TryLockRemote(ctx, strg, "test.lock", "I wanna read it, but I hesitated before send my intention!")
Expand All @@ -99,16 +120,19 @@ func TestConcurrentLock(t *testing.T) {
errChB <- err
}()

failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2")
<-chA
<-chB

<-chB
<-chA

// There is exactly one error.
errA := <-errChA
errB := <-errChB
if errA == nil {
require.Error(t, errB)
} else {
require.NoError(t, errB)
require.NoError(t, errB, "%s", errA)
}

requireFileExists(t, filepath.Join(path, "test.lock"))
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type S3Storage struct {
options *backuppb.S3
}

func (*S3Storage) MarkStrongConsistency() {
// See https://aws.amazon.com/cn/s3/consistency/
}

// GetS3APIHandle gets the handle to the S3 API.
func (rs *S3Storage) GetS3APIHandle() s3iface.S3API {
return rs.svc
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (
// Permission represents the permission we need to check in create storage.
type Permission string

// StrongConsistency is a marker interface that indicates the storage is strong consistent
// over its `Read`, `Write` and `WalkDir` APIs.
type StrongConsisency interface {
MarkStrongConsistency()
}

const (
// AccessBuckets represents bucket access permission
// it replace the origin skip-check-path.
Expand All @@ -33,7 +39,8 @@ const (
// we cannot check DeleteObject permission alone, so we use PutAndDeleteObject instead.
PutAndDeleteObject Permission = "PutAndDeleteObject"

DefaultRequestConcurrency uint = 128
DefaultRequestConcurrency uint = 128
TombstoneSize int64 = -1
)

// WalkOption is the option of storage.WalkDir.
Expand Down Expand Up @@ -62,6 +69,14 @@ type WalkOption struct {
// to reduce the possibility of timeout on an extremely slow connection, or
// perform testing.
ListCount int64
// IncludeTombstone will allow `Walk` to emit removed files during walking.
//
// In most cases, `Walk` runs over a snapshot, if a file in the snapshot
// was deleted during walking, the file will be ignored. Set this to `true`
// will make them be sent to the callback.
//
// The size of a deleted file should be `TombstoneSize`.
IncludeTombstone bool
}

// ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods.
Expand Down
Loading