Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
XuPeng-SH committed Dec 22, 2024
1 parent 599d364 commit f0c35c4
Showing 1 changed file with 27 additions and 22 deletions.
49 changes: 27 additions & 22 deletions pkg/vm/engine/tae/db/checkpoint/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,35 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err
interval = r.options.globalVersionInterval
}
if r.GetPenddingIncrementalCount() != 0 {
end = r.MaxIncrementalCheckpoint().GetEnd()
r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{
force: true,
end: end,
interval: interval,
})
return nil
end2 := r.MaxIncrementalCheckpoint().GetEnd()
if end2.GE(&end) {
r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{
force: true,
end: end,
interval: interval,
})
return nil
}
}
retryTime := 0
timeout := time.After(interval)
var err error
var (
retryTime int
now = time.Now()
err error
)
defer func() {
if err != nil || retryTime > 0 {
logutil.Error("ForceGlobalCheckpoint-End",
zap.Error(err),
zap.Uint64("retryTime", uint64(retryTime)))
return
logger := logutil.Info
if err != nil {
logger = logutil.Error
}
logger(
"ForceGlobalCheckpoint-End",
zap.Int("retry-time", retryTime),
zap.Duration("cost", time.Since(now)),
zap.Error(err),
)
}()

timeout := time.After(interval)
for {
select {
case <-timeout:
Expand All @@ -104,20 +114,15 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err
}

func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end types.TS, versionInterval time.Duration) error {
prevGlobalEnd := types.TS{}
global := r.store.MaxGlobalCheckpoint()
if global != nil {
prevGlobalEnd = global.end
}

r.ForceGlobalCheckpoint(end, versionInterval)

op := func() (ok bool, err error) {
global := r.store.MaxGlobalCheckpoint()
if global == nil {
return false, nil
}
return global.end.GT(&prevGlobalEnd), nil
ok = global.end.GE(&end)
return
}
err := common.RetryWithIntervalAndTimeout(
op,
Expand Down

0 comments on commit f0c35c4

Please sign in to comment.