From f0301a78ca24886030d4b1a8024700fb84298f5e Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Sun, 22 Dec 2024 18:42:06 +0800 Subject: [PATCH] fix force global checkpoint (#20865) fix force global checkpoint implementation Approved by: @LeftHandCold --- pkg/vm/engine/tae/db/checkpoint/testutils.go | 49 +++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index 4d9bd3f426961..6d08b66f14889 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -59,25 +59,35 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err interval = r.options.globalVersionInterval } if r.GetPenddingIncrementalCount() != 0 { - end = r.MaxIncrementalCheckpoint().GetEnd() - r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ - force: true, - end: end, - interval: interval, - }) - return nil + end2 := r.MaxIncrementalCheckpoint().GetEnd() + if end2.GE(&end) { + r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ + force: true, + end: end, + interval: interval, + }) + return nil + } } - retryTime := 0 - timeout := time.After(interval) - var err error + var ( + retryTime int + now = time.Now() + err error + ) defer func() { - if err != nil || retryTime > 0 { - logutil.Error("ForceGlobalCheckpoint-End", - zap.Error(err), - zap.Uint64("retryTime", uint64(retryTime))) - return + logger := logutil.Info + if err != nil { + logger = logutil.Error } + logger( + "ForceGlobalCheckpoint-End", + zap.Int("retry-time", retryTime), + zap.Duration("cost", time.Since(now)), + zap.Error(err), + ) }() + + timeout := time.After(interval) for { select { case <-timeout: @@ -104,12 +114,6 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err } func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end types.TS, versionInterval time.Duration) error { - prevGlobalEnd := types.TS{} - global := r.store.MaxGlobalCheckpoint() - if global != nil { - prevGlobalEnd = global.end - } - r.ForceGlobalCheckpoint(end, versionInterval) op := func() (ok bool, err error) { @@ -117,7 +121,8 @@ func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end typ if global == nil { return false, nil } - return global.end.GT(&prevGlobalEnd), nil + ok = global.end.GE(&end) + return } err := common.RetryWithIntervalAndTimeout( op,