diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index d82a88233833b..506c22163ed8e 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -453,7 +453,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) } - if _, err := c.env.BlockGCUntil(ctx, 0); err != nil { + if err := c.env.UnblockGC(ctx); err != nil { log.Warn("failed to remove service GC safepoint", logutil.ShortError(err)) } metrics.LastCheckpoint.DeleteLabelValues(e.Name) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 8c4c96aed6ac3..bd5de8a4a3051 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -4,8 +4,10 @@ package streamhelper import ( "context" + "math" "time" + "github.com/pingcap/errors" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" @@ -46,7 +48,21 @@ type PDRegionScanner struct { // Returns the minimal service GC safe point across all services. // If the arguments is `0`, this would remove the service safe point. func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) { - return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) + minimalSafePoint, err := c.UpdateServiceGCSafePoint( + ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) + if err != nil { + return 0, errors.Annotate(err, "failed to block gc until") + } + if minimalSafePoint > at { + return 0, errors.Errorf("minimal safe point %d is greater than the target %d", minimalSafePoint, at) + } + return at, nil +} + +func (c PDRegionScanner) UnblockGC(ctx context.Context) error { + // set ttl to 0, means remove the safe point. + _, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64) + return err } // TODO: It should be able to synchoronize the current TS with the PD. diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 58deb6301afb8..8c338d78d6ac1 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -212,7 +212,7 @@ func TestGCServiceSafePoint(t *testing.T) { req.Eventually(func() bool { env.fakeCluster.mu.Lock() defer env.fakeCluster.mu.Unlock() - return env.serviceGCSafePoint == 0 + return env.serviceGCSafePoint != 0 && env.serviceGCSafePointDeleted }, 3*time.Second, 100*time.Millisecond) } diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 1aa7c56829ab9..b30ce8fdb4d1b 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -100,9 +100,17 @@ type fakeCluster struct { regions []*region testCtx *testing.T +<<<<<<< HEAD onGetClient func(uint64) error serviceGCSafePoint uint64 currentTS uint64 +======= + onGetClient func(uint64) error + onClearCache func(uint64) error + serviceGCSafePoint uint64 + serviceGCSafePointDeleted bool + currentTS uint64 +>>>>>>> 545b4a3a443 (advancer: fix the incorrect gc safepoint behaviours (#52835)) } func (r *region) splitAt(newID uint64, k string) *region { @@ -263,10 +271,6 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) { f.mu.Lock() defer f.mu.Unlock() - if at == 0 { - f.serviceGCSafePoint = at - return at, nil - } if f.serviceGCSafePoint > at { return f.serviceGCSafePoint, nil } @@ -274,6 +278,13 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro return at, nil } +func (f *fakeCluster) UnblockGC(ctx context.Context) error { + f.mu.Lock() + defer f.mu.Unlock() + f.serviceGCSafePointDeleted = true + return nil +} + func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) { return f.currentTS, nil } diff --git a/br/pkg/streamhelper/regioniter.go b/br/pkg/streamhelper/regioniter.go index d6aa6f800f22a..b6b7062f01f23 100644 --- a/br/pkg/streamhelper/regioniter.go +++ b/br/pkg/streamhelper/regioniter.go @@ -43,6 +43,9 @@ type TiKVClusterMeta interface { // For now, all tasks (exactly one task in fact) use the same checkpoint. BlockGCUntil(ctx context.Context, at uint64) (uint64, error) + // UnblockGC used to remove the service GC safe point in PD. + UnblockGC(ctx context.Context) error + FetchCurrentTS(ctx context.Context) (uint64, error) } diff --git a/br/pkg/streamhelper/regioniter_test.go b/br/pkg/streamhelper/regioniter_test.go index 6ef65a5c86987..0166e30b5e6d5 100644 --- a/br/pkg/streamhelper/regioniter_test.go +++ b/br/pkg/streamhelper/regioniter_test.go @@ -83,6 +83,10 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e return 0, status.Error(codes.Unimplemented, "Unsupported operation") } +func (c constantRegions) UnblockGC(ctx context.Context) error { + return status.Error(codes.Unimplemented, "Unsupported operation") +} + // TODO: It should be able to synchoronize the current TS with the PD. func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) { return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil