From e098406dbd409cded7ab0ea31cc24e2a1395af5a Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Mon, 23 Dec 2024 14:35:50 +0800 Subject: [PATCH] refactor ickp impl phase 1 (#20866) refactor ickp implementation phase 1 Approved by: @LeftHandCold --- pkg/vm/engine/tae/db/checkpoint/entry.go | 134 ++++++++- pkg/vm/engine/tae/db/checkpoint/flusher.go | 4 +- pkg/vm/engine/tae/db/checkpoint/runner.go | 226 +++++++++++---- .../engine/tae/db/checkpoint/runner_test.go | 272 ++++++++++++++++++ pkg/vm/engine/tae/db/checkpoint/store.go | 244 ++++++++++++++++ pkg/vm/engine/tae/db/checkpoint/testutils.go | 131 +++------ pkg/vm/engine/tae/db/checkpoint/types.go | 5 +- pkg/vm/engine/tae/db/db.go | 13 +- pkg/vm/engine/tae/db/dbutils/error.go | 2 +- pkg/vm/engine/tae/db/test/catalog_test.go | 2 +- pkg/vm/engine/tae/db/test/db_test.go | 16 +- pkg/vm/engine/tae/db/test/replay_test.go | 24 +- pkg/vm/engine/tae/db/testutil/engine.go | 8 +- 13 files changed, 890 insertions(+), 191 deletions(-) diff --git a/pkg/vm/engine/tae/db/checkpoint/entry.go b/pkg/vm/engine/tae/db/checkpoint/entry.go index fc4051f53521c..d14fbdb3060d3 100644 --- a/pkg/vm/engine/tae/db/checkpoint/entry.go +++ b/pkg/vm/engine/tae/db/checkpoint/entry.go @@ -30,6 +30,32 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) +type Intent interface { + String() string + Wait() <-chan struct{} +} + +type EntryOption func(*CheckpointEntry) + +func WithEndEntryOption(end types.TS) EntryOption { + return func(e *CheckpointEntry) { + e.end = end + } +} + +func WithStateEntryOption(state State) EntryOption { + return func(e *CheckpointEntry) { + e.state = state + } +} + +func WithCheckedEntryOption(policyChecked, flushedChecked bool) EntryOption { + return func(e *CheckpointEntry) { + e.policyChecked = policyChecked + e.flushChecked = flushedChecked + } +} + type CheckpointEntry struct { sync.RWMutex sid string @@ -43,13 +69,20 @@ type CheckpointEntry struct { ckpLSN uint64 truncateLSN uint64 + policyChecked bool + flushChecked bool + // only for new entry logic procedure bornTime time.Time refreshCnt uint32 + + doneC chan struct{} } -func NewCheckpointEntry(sid string, start, end types.TS, typ EntryType) *CheckpointEntry { - return &CheckpointEntry{ +func NewCheckpointEntry( + sid string, start, end types.TS, typ EntryType, opts ...EntryOption, +) *CheckpointEntry { + e := &CheckpointEntry{ sid: sid, start: start, end: end, @@ -57,7 +90,45 @@ func NewCheckpointEntry(sid string, start, end types.TS, typ EntryType) *Checkpo entryType: typ, version: logtail.CheckpointCurrentVersion, bornTime: time.Now(), + doneC: make(chan struct{}), + } + for _, opt := range opts { + opt(e) } + return e +} + +func InheritCheckpointEntry( + from *CheckpointEntry, + replaceOpts ...EntryOption, +) *CheckpointEntry { + from.RLock() + defer from.RUnlock() + e := &CheckpointEntry{ + sid: from.sid, + start: from.start, + end: from.end, + state: from.state, + entryType: from.entryType, + version: from.version, + bornTime: from.bornTime, + refreshCnt: from.refreshCnt, + policyChecked: from.policyChecked, + flushChecked: from.flushChecked, + doneC: from.doneC, + } + for _, opt := range replaceOpts { + opt(e) + } + return e +} + +func (e *CheckpointEntry) Wait() <-chan struct{} { + return e.doneC +} + +func (e *CheckpointEntry) Done() { + close(e.doneC) } // e.start >= o.end @@ -65,6 +136,36 @@ func (e *CheckpointEntry) AllGE(o *CheckpointEntry) bool { return e.start.GE(&o.end) } +func (e *CheckpointEntry) SetPolicyChecked() { + e.Lock() + defer e.Unlock() + e.policyChecked = true +} + +func (e *CheckpointEntry) IsPolicyChecked() bool { + e.RLock() + defer e.RUnlock() + return e.policyChecked +} + +func (e *CheckpointEntry) SetFlushChecked() { + e.Lock() + defer e.Unlock() + e.flushChecked = true +} + +func (e *CheckpointEntry) IsFlushChecked() bool { + e.RLock() + defer e.RUnlock() + return e.flushChecked +} + +func (e *CheckpointEntry) AllChecked() bool { + e.RLock() + defer e.RUnlock() + return e.policyChecked && e.flushChecked +} + func (e *CheckpointEntry) SetVersion(version uint32) { e.Lock() defer e.Unlock() @@ -72,6 +173,8 @@ func (e *CheckpointEntry) SetVersion(version uint32) { } func (e *CheckpointEntry) SetLSN(ckpLSN, truncateLSN uint64) { + e.Lock() + defer e.Unlock() e.ckpLSN = ckpLSN e.truncateLSN = truncateLSN } @@ -85,13 +188,21 @@ func (e *CheckpointEntry) Age() time.Duration { defer e.RUnlock() return time.Since(e.bornTime) } +func (e *CheckpointEntry) ResetAge() { + e.Lock() + defer e.Unlock() + e.bornTime = time.Now() + e.refreshCnt = 0 +} func (e *CheckpointEntry) TooOld() bool { e.RLock() defer e.RUnlock() - return time.Since(e.bornTime) > time.Minute*4*time.Duration(e.refreshCnt+1) + return time.Since(e.bornTime) > time.Minute*3*time.Duration(e.refreshCnt+1) } func (e *CheckpointEntry) LSNString() string { - return fmt.Sprintf("ckp %d, truncate %d", e.ckpLSN, e.truncateLSN) + e.RLock() + defer e.RUnlock() + return fmt.Sprintf("%d-%d", e.ckpLSN, e.truncateLSN) } func (e *CheckpointEntry) LSN() uint64 { @@ -195,7 +306,16 @@ func (e *CheckpointEntry) String() string { t = "G" } state := e.GetState() - return fmt.Sprintf("CKP[%s][%v][%s](%s->%s)", t, state, e.LSNString(), e.start.ToString(), e.end.ToString()) + return fmt.Sprintf( + "CKP[%s][%v][%v:%v][%s](%s->%s)", + t, + state, + e.IsPolicyChecked(), + e.IsFlushChecked(), + e.LSNString(), + e.start.ToString(), + e.end.ToString(), + ) } func (e *CheckpointEntry) Prefetch( @@ -259,7 +379,9 @@ func (e *CheckpointEntry) ReadMetaIdx( return data.ReadTNMetaBatch(ctx, e.version, e.tnLocation, reader) } -func (e *CheckpointEntry) GetByTableID(ctx context.Context, fs *objectio.ObjectFS, tid uint64) (ins, del, dataObject, tombstoneObject *api.Batch, err error) { +func (e *CheckpointEntry) GetTableByID( + ctx context.Context, fs *objectio.ObjectFS, tid uint64, +) (ins, del, dataObject, tombstoneObject *api.Batch, err error) { reader, err := blockio.NewObjectReader(e.sid, fs.Service, e.cnLocation) if err != nil { return diff --git a/pkg/vm/engine/tae/db/checkpoint/flusher.go b/pkg/vm/engine/tae/db/checkpoint/flusher.go index f4c3e1321d0ce..c0db9f5953b80 100644 --- a/pkg/vm/engine/tae/db/checkpoint/flusher.go +++ b/pkg/vm/engine/tae/db/checkpoint/flusher.go @@ -371,8 +371,8 @@ func (flusher *flushImpl) triggerJob(ctx context.Context) { request.tree = entry flusher.flushRequestQ.Enqueue(request) } - _, endTS := entry.GetTimeRange() - flusher.checkpointSchduler.TryScheduleCheckpoint(endTS) + _, ts := entry.GetTimeRange() + flusher.checkpointSchduler.TryScheduleCheckpoint(ts, false) } func (flusher *flushImpl) onFlushRequest(items ...any) { diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index 28fdb72db28d2..d3459c6dd7569 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -22,9 +22,10 @@ import ( "sync/atomic" "time" + "github.com/matrixorigin/matrixone/pkg/common/moerr" + v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/store" - v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/perfcounter" @@ -231,7 +232,7 @@ func NewRunner( } r.fillDefaults() - r.store = newRunnerStore(r.rt.SID(), r.options.globalVersionInterval) + r.store = newRunnerStore(r.rt.SID(), r.options.globalVersionInterval, time.Minute*2) r.incrementalPolicy = &timeBasedPolicy{interval: r.options.minIncrementalInterval} r.globalPolicy = &countBasedPolicy{minCount: r.options.globalMinCount} @@ -321,11 +322,8 @@ func (r *runner) onGCCheckpointEntries(items ...any) { func (r *runner) onIncrementalCheckpointEntries(items ...any) { now := time.Now() - entry := r.MaxIncrementalCheckpoint() - // In some unit tests, ckp is managed manually, and ckp deletion (CleanPendingCheckpoint) - // can be called when the queue still has unexecuted task. - // Add `entry == nil` here as protective codes - if entry == nil || entry.GetState() != ST_Running { + entry, rollback := r.store.TakeICKPIntent() + if entry == nil { return } var ( @@ -339,7 +337,7 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) { now = time.Now() logutil.Info( - "Checkpoint-Start", + "ICKP-Execute-Start", zap.String("entry", entry.String()), ) @@ -352,7 +350,7 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) { logger = logutil.Error } logger( - "Checkpoint-Error", + "ICKP-Execute-Error", zap.String("entry", entry.String()), zap.Error(err), zap.String("phase", errPhase), @@ -364,8 +362,9 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) { fields = append(fields, zap.Uint64("lsn", lsn)) fields = append(fields, zap.Uint64("reserve", r.options.reservedWALEntryCount)) fields = append(fields, zap.String("entry", entry.String())) + fields = append(fields, zap.Duration("age", entry.Age())) logutil.Info( - "Checkpoint-End", + "ICKP-Execute-End", fields..., ) } @@ -375,6 +374,7 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) { var file string if fields, files, err = r.doIncrementalCheckpoint(entry); err != nil { errPhase = "do-ckp" + rollback() return } @@ -382,17 +382,28 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) { if lsn > r.options.reservedWALEntryCount { lsnToTruncate = lsn - r.options.reservedWALEntryCount } + entry.SetLSN(lsn, lsnToTruncate) - entry.SetState(ST_Finished) + if !r.store.CommitICKPIntent(entry, false) { + errPhase = "commit" + rollback() + err = moerr.NewInternalErrorNoCtxf("cannot commit ickp") + return + } + v2.TaskCkpEntryPendingDurationHistogram.Observe(entry.Age().Seconds()) + defer entry.Done() if file, err = r.saveCheckpoint( entry.start, entry.end, lsn, lsnToTruncate, ); err != nil { errPhase = "save-ckp" + rollback() return } + files = append(files, file) + // PXU TODO: if crash here, the checkpoint log entry will be lost var logEntry wal.LogEntry if logEntry, err = r.wal.RangeCheckpoint(1, lsnToTruncate, files...); err != nil { errPhase = "wal-ckp" @@ -571,70 +582,177 @@ func (r *runner) onPostCheckpointEntries(entries ...any) { } } -func (r *runner) tryScheduleIncrementalCheckpoint(start, end types.TS) { - // ts := types.BuildTS(time.Now().UTC().UnixNano(), 0) - _, count := r.source.ScanInRange(start, end) - if count < r.options.minCount { - return +func (r *runner) softScheduleCheckpoint(ts *types.TS) (ret *CheckpointEntry, err error) { + var ( + updated bool + ) + intent := r.store.GetICKPIntent() + + check := func() (done bool) { + if !r.source.IsCommitted(intent.GetStart(), intent.GetEnd()) { + return false + } + tree := r.source.ScanInRangePruned(intent.GetStart(), intent.GetEnd()) + tree.GetTree().Compact() + if !tree.IsEmpty() && intent.TooOld() { + logutil.Warn( + "CheckPoint-Wait-TooOld", + zap.String("entry", intent.String()), + zap.Duration("age", intent.Age()), + ) + intent.DeferRetirement() + } + return tree.IsEmpty() } - entry := NewCheckpointEntry(r.rt.SID(), start, end, ET_Incremental) - r.store.TryAddNewIncrementalCheckpointEntry(entry) -} -func (r *runner) TryScheduleCheckpoint(endts types.TS) { - if r.disabled.Load() { + now := time.Now() + + defer func() { + logger := logutil.Info + if err != nil { + logger = logutil.Error + } else { + ret = intent + } + intentInfo := "nil" + if intent != nil { + intentInfo = intent.String() + } + if (err != nil && err != ErrPendingCheckpoint) || intent.TooOld() { + logger( + "ICKP-Schedule-Soft", + zap.String("intent", intentInfo), + zap.String("ts", ts.ToString()), + zap.Duration("cost", time.Since(now)), + zap.Error(err), + ) + } + }() + + if intent == nil { + start := r.store.GetCheckpointed() + if ts.LT(&start) { + return + } + if !r.incrementalPolicy.Check(start) { + return + } + _, count := r.source.ScanInRange(start, *ts) + if count < r.options.minCount { + return + } + intent, updated = r.store.UpdateICKPIntent(ts, true, false) + if updated { + logutil.Info( + "ICKP-Schedule-Soft-Updated", + zap.String("intent", intent.String()), + zap.String("ts", ts.ToString()), + ) + } + } + + // [intent == nil] + // if intent is nil, it means no need to do checkpoint + if intent == nil { return } - entry := r.MaxIncrementalCheckpoint() - global := r.MaxGlobalCheckpoint() - // no prev checkpoint found. try schedule the first - // checkpoint - if entry == nil { - if global == nil { - r.tryScheduleIncrementalCheckpoint(types.TS{}, endts) + // [intent != nil] + + var ( + policyChecked bool + flushedChecked bool + ) + policyChecked = intent.IsPolicyChecked() + if !policyChecked { + if !r.incrementalPolicy.Check(intent.GetStart()) { return - } else { - maxTS := global.end.Prev() - if r.incrementalPolicy.Check(maxTS) { - r.tryScheduleIncrementalCheckpoint(maxTS.Next(), endts) - } + } + _, count := r.source.ScanInRange(intent.GetStart(), intent.GetEnd()) + if count < r.options.minCount { return } + policyChecked = true } - if entry.IsPendding() { - check := func() (done bool) { - if !r.source.IsCommitted(entry.GetStart(), entry.GetEnd()) { - return false - } - tree := r.source.ScanInRangePruned(entry.GetStart(), entry.GetEnd()) - tree.GetTree().Compact() - if !tree.IsEmpty() && entry.TooOld() { - logutil.Infof("waiting for dirty tree %s", tree.String()) - entry.DeferRetirement() - } - return tree.IsEmpty() - } + flushedChecked = intent.IsFlushChecked() + if !flushedChecked && check() { + flushedChecked = true + } - if !check() { - logutil.Debugf("%s is waiting", entry.String()) - return + if policyChecked != intent.IsPolicyChecked() || flushedChecked != intent.IsFlushChecked() { + endTS := intent.GetEnd() + intent, updated = r.store.UpdateICKPIntent(&endTS, policyChecked, flushedChecked) + + if updated { + logutil.Info( + "ICKP-Schedule-Soft-Updated", + zap.String("intent", intent.String()), + zap.String("endTS", ts.ToString()), + ) } - entry.SetState(ST_Running) - v2.TaskCkpEntryPendingDurationHistogram.Observe(entry.Age().Seconds()) + } + + // no need to do checkpoint + if intent == nil { + return + } + + if intent.end.LT(ts) { + err = ErrPendingCheckpoint r.incrementalCheckpointQueue.Enqueue(struct{}{}) return } - if entry.IsRunning() { + if intent.AllChecked() { r.incrementalCheckpointQueue.Enqueue(struct{}{}) + } + return +} + +// NOTE: +// when `force` is true, it must be called after force flush till the given ts +// force: if true, not to check the validness of the checkpoint +func (r *runner) TryScheduleCheckpoint( + ts types.TS, force bool, +) (ret Intent, err error) { + if r.disabled.Load() { + return + } + if !force { + return r.softScheduleCheckpoint(&ts) + } + + intent, updated := r.store.UpdateICKPIntent(&ts, true, true) + if intent == nil { return } - if r.incrementalPolicy.Check(entry.end) { - r.tryScheduleIncrementalCheckpoint(entry.end.Next(), endts) + now := time.Now() + defer func() { + logger := logutil.Info + if err != nil { + logger = logutil.Error + } + + logger( + "ICKP-Schedule-Force", + zap.String("intent", intent.String()), + zap.String("ts", ts.ToString()), + zap.Bool("updated", updated), + zap.Duration("cost", time.Since(now)), + zap.Error(err), + ) + }() + + r.incrementalCheckpointQueue.Enqueue(struct{}{}) + + if intent.end.LT(&ts) { + err = ErrPendingCheckpoint + return } + + return intent, nil } func (r *runner) fillDefaults() { diff --git a/pkg/vm/engine/tae/db/checkpoint/runner_test.go b/pkg/vm/engine/tae/db/checkpoint/runner_test.go index a58097163dcaf..d9024cf0377bd 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner_test.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner_test.go @@ -17,7 +17,9 @@ package checkpoint import ( "context" "fmt" + "sync" "testing" + "time" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -371,3 +373,273 @@ func TestICKPSeekLT(t *testing.T) { } assert.Equal(t, 0, len(ckps)) } + +func Test_RunnerStore1(t *testing.T) { + store := newRunnerStore("", time.Second, time.Second*1000) + _ = types.NextGlobalTsForTest() + _ = types.NextGlobalTsForTest() + t3 := types.NextGlobalTsForTest() + intent, updated := store.UpdateICKPIntent(&t3, true, true) + assert.True(t, updated) + assert.True(t, intent.IsPolicyChecked()) + assert.True(t, intent.IsFlushChecked()) + assert.True(t, intent.IsPendding()) + assert.True(t, intent.AllChecked()) + assert.True(t, intent.end.EQ(&t3)) + + taken, rollback := store.TakeICKPIntent() + assert.NotNil(t, taken) + assert.NotNil(t, rollback) + assert.True(t, taken.IsRunning()) + assert.True(t, taken.end.EQ(&t3)) + + committed := store.CommitICKPIntent(taken, true) + assert.True(t, committed) + + intent, updated = store.UpdateICKPIntent(&t3, true, true) + assert.False(t, updated) + assert.Nilf(t, intent, intent.String()) +} + +func Test_RunnerStore2(t *testing.T) { + store := newRunnerStore("", time.Second, time.Second*1000) + t1 := types.NextGlobalTsForTest() + intent, updated := store.UpdateICKPIntent(&t1, false, false) + assert.True(t, updated) + assert.True(t, intent.start.IsEmpty()) + assert.True(t, intent.end.EQ(&t1)) + assert.True(t, intent.IsPendding()) + assert.False(t, intent.AllChecked()) + + intent, updated = store.UpdateICKPIntent(&t1, true, false) + assert.True(t, updated) + assert.True(t, intent.IsPolicyChecked()) + assert.False(t, intent.IsFlushChecked()) + assert.True(t, intent.IsPendding()) + assert.False(t, intent.AllChecked()) + + intent, updated = store.UpdateICKPIntent(&t1, false, true) + assert.False(t, updated) + assert.True(t, intent.IsPolicyChecked()) + assert.False(t, intent.IsFlushChecked()) + assert.True(t, intent.IsPendding()) + assert.False(t, intent.AllChecked()) + bornTime := intent.bornTime + + intent, updated = store.UpdateICKPIntent(&t1, true, true) + assert.True(t, updated) + assert.True(t, intent.IsPolicyChecked()) + assert.True(t, intent.IsFlushChecked()) + assert.True(t, intent.IsPendding()) + assert.True(t, intent.AllChecked()) + assert.True(t, intent.end.EQ(&t1)) + assert.True(t, bornTime.Equal(intent.bornTime)) + + t2 := types.NextGlobalTsForTest() + intent2, updated := store.UpdateICKPIntent(&t2, true, false) + assert.False(t, updated) + assert.Equal(t, intent, intent2) + intent2, updated = store.UpdateICKPIntent(&t2, false, true) + assert.False(t, updated) + assert.Equal(t, intent, intent2) + + intent2, updated = store.UpdateICKPIntent(&t2, true, true) + assert.True(t, updated) + assert.True(t, intent2.IsPolicyChecked()) + assert.True(t, intent2.IsFlushChecked()) + assert.True(t, intent2.IsPendding()) + assert.True(t, intent2.AllChecked()) + assert.True(t, intent2.end.EQ(&t2)) +} + +func Test_RunnerStore3(t *testing.T) { + store := newRunnerStore("", time.Second, time.Second*1000) + + t1 := types.NextGlobalTsForTest() + intent, updated := store.UpdateICKPIntent(&t1, false, false) + assert.True(t, updated) + assert.True(t, intent.start.IsEmpty()) + assert.True(t, intent.end.EQ(&t1)) + assert.True(t, intent.IsPendding()) + + intent2 := store.incrementalIntent.Load() + assert.Equal(t, intent, intent2) + + t2 := types.NextGlobalTsForTest() + intent3, updated := store.UpdateICKPIntent(&t2, true, true) + assert.True(t, updated) + assert.True(t, intent3.start.IsEmpty()) + assert.True(t, intent3.end.EQ(&t2)) + assert.True(t, intent3.IsPendding()) + intent4 := store.incrementalIntent.Load() + assert.Equal(t, intent3, intent4) + + ii, updated := store.UpdateICKPIntent(&t1, true, true) + assert.False(t, updated) + assert.Equal(t, ii, intent4) + + taken, rollback := store.TakeICKPIntent() + assert.NotNil(t, taken) + assert.NotNil(t, rollback) + assert.True(t, taken.IsRunning()) + intent5 := store.incrementalIntent.Load() + assert.Equal(t, intent5, taken) + + taken2, rollback2 := store.TakeICKPIntent() + assert.Nil(t, taken2) + assert.Nil(t, rollback2) + + t3 := types.NextGlobalTsForTest() + ii2, updated := store.UpdateICKPIntent(&t3, true, true) + assert.False(t, updated) + assert.Equal(t, ii2, intent5) + + rollback() + intent6 := store.incrementalIntent.Load() + assert.True(t, intent6.IsPendding()) + assert.True(t, intent6.end.EQ(&t2)) + assert.True(t, intent6.start.IsEmpty()) + assert.Equal(t, intent6.bornTime, intent5.bornTime) + assert.Equal(t, intent6.refreshCnt, intent5.refreshCnt) + assert.Equal(t, intent6.policyChecked, intent5.policyChecked) + assert.Equal(t, intent6.flushChecked, intent5.flushChecked) + + ii2, updated = store.UpdateICKPIntent(&t3, true, true) + assert.True(t, updated) + assert.True(t, ii2.IsPendding()) + assert.True(t, ii2.end.EQ(&t3)) + assert.True(t, ii2.start.IsEmpty()) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-ii2.Wait() + }() + + taken, rollback = store.TakeICKPIntent() + assert.NotNil(t, taken) + assert.NotNil(t, rollback) + assert.True(t, taken.IsRunning()) + intent7 := store.incrementalIntent.Load() + assert.Equal(t, intent7, taken) + + maxEntry := store.MaxIncrementalCheckpoint() + assert.Nil(t, maxEntry) + + committed := store.CommitICKPIntent(taken, true) + assert.True(t, committed) + assert.True(t, taken.IsFinished()) + + wg.Wait() + + maxEntry = store.MaxIncrementalCheckpoint() + assert.Equal(t, maxEntry, taken) + + intent8 := store.incrementalIntent.Load() + assert.Nil(t, intent8) + + // UpdateICKPIntent with a smaller ts than the finished one + intent9, updated := store.UpdateICKPIntent(&t3, true, true) + assert.False(t, updated) + assert.Nil(t, intent9) + + t4 := types.NextGlobalTsForTest() + t4 = t4.Next() + + // UpdateICKPIntent with a larger ts than the finished one + // check if the intent is updated + // check if the start ts is equal to the last end ts + intent10, updated := store.UpdateICKPIntent(&t4, true, true) + assert.True(t, updated) + assert.True(t, intent10.IsPendding()) + assert.True(t, intent10.end.EQ(&t4)) + prev := intent10.start.Prev() + assert.True(t, prev.EQ(&t3)) + + taken2, rollback2 = store.TakeICKPIntent() + assert.NotNil(t, taken2) + assert.NotNil(t, rollback2) + assert.True(t, taken2.IsRunning()) + intent11 := store.incrementalIntent.Load() + assert.Equal(t, intent11, taken2) + + // cannot commit a different intent with the incremental intent + t5 := types.NextGlobalTsForTest() + taken2_1 := InheritCheckpointEntry(taken2, WithEndEntryOption(t5)) + committed = store.CommitICKPIntent(taken2_1, true) + assert.False(t, committed) + + taken2.start = taken2.start.Next() + committed = store.CommitICKPIntent(taken2, true) + assert.False(t, committed) + + taken2.start = taken2.start.Prev() + committed = store.CommitICKPIntent(taken2, true) + assert.True(t, committed) + assert.True(t, taken2.IsFinished()) + + timer := time.After(time.Second * 10) + select { + case <-intent10.Wait(): + case <-timer: + assert.Equal(t, 1, 0) + } +} + +func Test_RunnerStore4(t *testing.T) { + store := newRunnerStore("", time.Second, time.Second*1000) + + t1 := types.NextGlobalTsForTest() + intent, updated := store.UpdateICKPIntent(&t1, true, false) + assert.True(t, updated) + assert.True(t, intent.start.IsEmpty()) + assert.True(t, intent.end.EQ(&t1)) + assert.True(t, intent.IsPendding()) + + t2 := types.NextGlobalTsForTest() + intent2, updated := store.UpdateICKPIntent(&t2, true, true) + assert.True(t, updated) + assert.True(t, intent2.start.IsEmpty()) + assert.True(t, intent2.end.EQ(&t2)) + assert.True(t, intent2.IsPendding()) + assert.True(t, intent2.AllChecked()) + + taken, rollback := store.TakeICKPIntent() + assert.NotNil(t, taken) + assert.NotNil(t, rollback) + + t3 := types.NextGlobalTsForTest() + intent3, updated := store.UpdateICKPIntent(&t3, true, true) + assert.False(t, updated) + assert.True(t, intent3.IsRunning()) + assert.True(t, intent3.end.EQ(&t2)) + + rollback() + intent4 := store.incrementalIntent.Load() + assert.True(t, intent4.IsPendding()) + assert.True(t, intent4.end.EQ(&t2)) + assert.True(t, intent4.start.IsEmpty()) + assert.True(t, intent4.AllChecked()) +} + +func Test_RunnerStore5(t *testing.T) { + store := newRunnerStore("", time.Second, time.Second*1000) + + t1 := types.NextGlobalTsForTest() + t2 := types.NextGlobalTsForTest() + intent, updated := store.UpdateICKPIntent(&t2, true, false) + assert.True(t, updated) + assert.True(t, intent.start.IsEmpty()) + assert.True(t, intent.end.EQ(&t2)) + t.Log(intent.String()) + + intent2, updated := store.UpdateICKPIntent(&t1, true, true) + assert.True(t, updated) + assert.True(t, intent2.end.EQ(&t1)) + assert.True(t, intent2.start.IsEmpty()) + assert.True(t, intent2.IsPendding()) + assert.True(t, intent2.AllChecked()) + assert.True(t, intent2.bornTime.After(intent.bornTime)) + t.Log(intent2.String()) +} diff --git a/pkg/vm/engine/tae/db/checkpoint/store.go b/pkg/vm/engine/tae/db/checkpoint/store.go index d02b2bf9f3780..29b02b9c412d9 100644 --- a/pkg/vm/engine/tae/db/checkpoint/store.go +++ b/pkg/vm/engine/tae/db/checkpoint/store.go @@ -33,10 +33,12 @@ import ( func newRunnerStore( sid string, globalHistoryDuration time.Duration, + intentOldAge time.Duration, ) *runnerStore { s := new(runnerStore) s.sid = sid s.globalHistoryDuration = globalHistoryDuration + s.intentOldAge = intentOldAge s.incrementals = btree.NewBTreeGOptions( func(a, b *CheckpointEntry) bool { return a.end.LT(&b.end) @@ -59,6 +61,9 @@ type runnerStore struct { sid string globalHistoryDuration time.Duration + intentOldAge time.Duration + + incrementalIntent atomic.Pointer[CheckpointEntry] incrementals *btree.BTreeG[*CheckpointEntry] globals *btree.BTreeG[*CheckpointEntry] @@ -71,6 +76,245 @@ type runnerStore struct { gcWatermark atomic.Value } +func (s *runnerStore) GetICKPIntent() *CheckpointEntry { + return s.incrementalIntent.Load() +} + +func (s *runnerStore) GetCheckpointed() types.TS { + s.RLock() + defer s.RUnlock() + return s.GetCheckpointedLocked() +} + +func (s *runnerStore) GetCheckpointedLocked() types.TS { + var ret types.TS + maxICKP, _ := s.incrementals.Max() + maxGCKP, _ := s.globals.Max() + if maxICKP == nil { + // no ickp and no gckp, it's the first ickp + if maxGCKP == nil { + ret = types.TS{} + } else { + ret = maxGCKP.end + } + } else { + ret = maxICKP.end.Next() + } + return ret +} + +// updated: +// true: updated and intent must contain the updated ts +// false: not updated and intent is the old intent +// policyChecked, flushChecked: +// it cannot update the intent if the intent is checked by policy or flush +func (s *runnerStore) UpdateICKPIntent( + ts *types.TS, policyChecked, flushChecked bool, +) (intent *CheckpointEntry, updated bool) { + for { + old := s.incrementalIntent.Load() + // in the case we will decrease the end ts of the old intent + if old != nil && !old.AllChecked() && policyChecked && flushChecked { + checkpointed := s.GetCheckpointed() + // no need to do checkpoint + if checkpointed.GE(ts) { + intent = nil + return + } + newIntent := InheritCheckpointEntry( + old, + WithEndEntryOption(*ts), + WithCheckedEntryOption(policyChecked, flushChecked), + ) + if old.end.GT(ts) { + newIntent.ResetAge() + } + if s.incrementalIntent.CompareAndSwap(old, newIntent) { + intent = newIntent + updated = true + return + } + continue + } + // Scenario 1: + // there is already an intent meets one of the following conditions: + // 1. the range of the old intent contains the ts, no need to update + // 2. the intent is not pendding: Running or Finished, cannot update + if old != nil && (old.end.GT(ts) || !old.IsPendding() || old.Age() > s.intentOldAge) { + intent = old + return + } + + // Here + // 1. old == nil + // 2. old.end <= ts && old.IsPendding() && old.Age() <= s.intentOldAge + + if old != nil { + // if the old intent is checked by policy and the incoming intent is not checked by policy + // incoming vs old: false vs true + // it cannot update the intent in this case + + if !policyChecked && old.IsPolicyChecked() { + intent = old + return + } + if !flushChecked && old.IsFlushChecked() { + intent = old + return + } + } + + var start types.TS + if old != nil { + // Scenario 2: + // there is an pendding intent with smaller end ts. we need to update + // the intent to extend the end ts to the given ts + start = old.start + } else { + // Scenario 3: + // there is no intent, we need to create a new intent + // start-ts: + // 1. if there is no ickp and no gckp, it's the first ickp, start ts is empty + // 2. if there is no ickp but has gckp, start ts is the end ts of the max gckp + // 3. if there is ickp, start ts is the end ts of the max ickp + // end-ts: the given ts + start = s.GetCheckpointed() + } + + if old != nil && old.end.EQ(ts) { + if old.IsPolicyChecked() == policyChecked && old.IsFlushChecked() == flushChecked { + intent = old + return + } + } + + // if the start ts is larger equal to the given ts, no need to update + if start.GE(ts) { + intent = old + return + } + var newIntent *CheckpointEntry + if old == nil { + newIntent = NewCheckpointEntry( + s.sid, + start, + *ts, + ET_Incremental, + WithCheckedEntryOption(policyChecked, flushChecked), + ) + } else { + // the incoming checked status can override the old status + // it is impossible that the old is checked and the incoming is not checked here + // false -> true: impossible here + newIntent = InheritCheckpointEntry( + old, + WithEndEntryOption(*ts), + WithCheckedEntryOption(policyChecked, flushChecked), + ) + } + if s.incrementalIntent.CompareAndSwap(old, newIntent) { + intent = newIntent + updated = true + return + } + } +} + +func (s *runnerStore) TakeICKPIntent() (taken *CheckpointEntry, rollback func()) { + for { + old := s.incrementalIntent.Load() + if old == nil || !old.IsPendding() || !old.AllChecked() { + return + } + taken = InheritCheckpointEntry( + old, + WithStateEntryOption(ST_Running), + ) + if s.incrementalIntent.CompareAndSwap(old, taken) { + rollback = func() { + // rollback the intent + putBack := InheritCheckpointEntry( + taken, + WithStateEntryOption(ST_Pending), + ) + s.incrementalIntent.Store(putBack) + } + break + } + taken = nil + rollback = nil + } + return +} + +// intent must be in Running state +func (s *runnerStore) CommitICKPIntent(intent *CheckpointEntry, done bool) (committed bool) { + defer func() { + if done && committed { + intent.Done() + } + }() + old := s.incrementalIntent.Load() + // should not happen + if old != intent { + logutil.Error( + "CommitICKPIntent-Error", + zap.String("intent", intent.String()), + zap.String("expected", old.String()), + ) + return + } + s.Lock() + defer s.Unlock() + maxICKP, _ := s.incrementals.Max() + maxGCKP, _ := s.globals.Max() + var ( + maxICKPEndNext types.TS + maxGCKPEnd types.TS + ) + if maxICKP != nil { + maxICKPEndNext = maxICKP.end.Next() + } + if maxGCKP != nil { + maxGCKPEnd = maxGCKP.end + } + if maxICKP == nil && maxGCKP == nil { + if !intent.start.IsEmpty() { + logutil.Error( + "CommitICKPIntent-Error", + zap.String("intent", intent.String()), + zap.String("max-i", "nil"), + zap.String("max-g", "nil"), + ) + // PXU TODO: err = xxx + return + } + } else if (maxICKP == nil && !maxGCKPEnd.EQ(&intent.start)) || + (maxICKP != nil && !maxICKPEndNext.EQ(&intent.start)) { + maxi := "nil" + maxg := "nil" + if maxICKP != nil { + maxi = maxICKP.String() + } + if maxGCKP != nil { + maxg = maxGCKP.String() + } + logutil.Error( + "CommitICKPIntent-Error", + zap.String("intent", intent.String()), + zap.String("max-i", maxi), + zap.String("max-g", maxg), + ) + // PXU TODO: err = xxx + return + } + s.incrementalIntent.Store(nil) + intent.SetState(ST_Finished) + s.incrementals.Set(intent) + committed = true + return +} + func (s *runnerStore) ExportStatsLocked() []zap.Field { fields := make([]zap.Field, 0, 8) fields = append(fields, zap.Int("gc-count", s.gcCount)) diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index 6d08b66f14889..289228826e99a 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -23,7 +23,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" "go.uber.org/zap" ) @@ -35,7 +34,7 @@ type TestRunner interface { ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error ForceGlobalCheckpointSynchronously(ctx context.Context, end types.TS, versionInterval time.Duration) error ForceCheckpointForBackup(end types.TS) (string, error) - ForceIncrementalCheckpoint(end types.TS, truncate bool) error + ForceIncrementalCheckpoint(end types.TS) error MaxLSNInRange(end types.TS) uint64 GCNeeded() bool @@ -63,7 +62,7 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err if end2.GE(&end) { r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ force: true, - end: end, + end: end2, interval: interval, }) return nil @@ -83,6 +82,7 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err "ForceGlobalCheckpoint-End", zap.Int("retry-time", retryTime), zap.Duration("cost", time.Since(now)), + zap.String("ts", end.ToString()), zap.Error(err), ) }() @@ -93,12 +93,11 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err case <-timeout: return moerr.NewInternalError(r.ctx, "timeout") default: - err = r.ForceIncrementalCheckpoint(end, false) + err = r.ForceIncrementalCheckpoint(end) if err != nil { - if dbutils.IsRetrieableCheckpoint(err) { + if dbutils.IsCheckpointRetryableErr(err) { retryTime++ - interval := interval.Milliseconds() / 400 - time.Sleep(time.Duration(interval)) + time.Sleep(interval / 20) break } return err @@ -136,104 +135,50 @@ func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end typ return nil } -func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error { - now := time.Now() - prev := r.MaxIncrementalCheckpoint() - if prev != nil && !prev.IsFinished() { - return moerr.NewPrevCheckpointNotFinished() +func (r *runner) ForceIncrementalCheckpoint(ts types.TS) (err error) { + var intent Intent + if intent, err = r.TryScheduleCheckpoint(ts, true); err != nil { + return } - - if prev != nil && end.LE(&prev.end) { - return nil + if intent == nil { + return } - var ( - err error - errPhase string - start types.TS - fatal bool - fields []zap.Field - ) - if prev == nil { - global := r.MaxGlobalCheckpoint() - if global != nil { - start = global.end - } - } else { - start = prev.end.Next() - } + entry := intent.(*CheckpointEntry) - entry := NewCheckpointEntry(r.rt.SID(), start, end, ET_Incremental) - logutil.Info( - "Checkpoint-Start-Force", - zap.String("entry", entry.String()), - ) + if entry.end.LT(&ts) || !entry.AllChecked() { + err = ErrPendingCheckpoint + return + } + // TODO: use context + timeout := time.After(time.Minute * 2) + now := time.Now() defer func() { + logger := logutil.Info if err != nil { - logger := logutil.Error - if fatal { - logger = logutil.Fatal - } - logger( - "Checkpoint-Error-Force", - zap.String("entry", entry.String()), - zap.String("phase", errPhase), - zap.Error(err), - zap.Duration("cost", time.Since(now)), - ) - } else { - fields = append(fields, zap.Duration("cost", time.Since(now))) - fields = append(fields, zap.String("entry", entry.String())) - logutil.Info( - "Checkpoint-End-Force", - fields..., - ) + logger = logutil.Error } + logger( + "ICKP-Schedule-Force-Wait-End", + zap.String("entry", intent.String()), + zap.Duration("cost", time.Since(now)), + zap.Error(err), + ) }() - // TODO: change me - r.store.AddNewIncrementalEntry(entry) + r.incrementalCheckpointQueue.Enqueue(struct{}{}) - var files []string - if fields, files, err = r.doIncrementalCheckpoint(entry); err != nil { - errPhase = "do-ckp" - return err - } - - var lsn, lsnToTruncate uint64 - if truncate { - lsn = r.source.GetMaxLSN(entry.start, entry.end) - if lsn > r.options.reservedWALEntryCount { - lsnToTruncate = lsn - r.options.reservedWALEntryCount - } - entry.ckpLSN = lsn - entry.truncateLSN = lsnToTruncate - } - - var file string - if file, err = r.saveCheckpoint( - entry.start, entry.end, lsn, lsnToTruncate, - ); err != nil { - errPhase = "save-ckp" - return err - } - files = append(files, file) - entry.SetState(ST_Finished) - if truncate { - var e wal.LogEntry - if e, err = r.wal.RangeCheckpoint(1, lsnToTruncate, files...); err != nil { - errPhase = "wal-ckp" - fatal = true - return err - } - if err = e.WaitDone(); err != nil { - errPhase = "wait-wal-ckp" - fatal = true - return err - } + select { + case <-r.ctx.Done(): + err = context.Cause(r.ctx) + return + case <-timeout: + err = moerr.NewInternalErrorNoCtx("timeout") + return + case <-intent.Wait(): } - return nil + return } func (r *runner) ForceCheckpointForBackup(end types.TS) (location string, err error) { diff --git a/pkg/vm/engine/tae/db/checkpoint/types.go b/pkg/vm/engine/tae/db/checkpoint/types.go index 8b42fb90b9da5..b343a76184772 100644 --- a/pkg/vm/engine/tae/db/checkpoint/types.go +++ b/pkg/vm/engine/tae/db/checkpoint/types.go @@ -17,12 +17,15 @@ package checkpoint import ( "context" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) +var ErrPendingCheckpoint = moerr.NewPrevCheckpointNotFinished() + type State int8 const ( @@ -41,7 +44,7 @@ const ( ) type CheckpointScheduler interface { - TryScheduleCheckpoint(types.TS) + TryScheduleCheckpoint(types.TS, bool) (Intent, error) } type Runner interface { diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index dc22d6432a0e9..8c136074cb826 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -170,10 +170,6 @@ func (db *DB) ForceCheckpoint( ts types.TS, flushDuration time.Duration, ) (err error) { - // FIXME: cannot disable with a running job - db.BGCheckpointRunner.DisableCheckpoint() - defer db.BGCheckpointRunner.EnableCheckpoint() - db.BGCheckpointRunner.CleanPenddingCheckpoint() if flushDuration == 0 { flushDuration = time.Minute * 3 / 2 } @@ -187,7 +183,7 @@ func (db *DB) ForceCheckpoint( logger = logutil.Error } logger( - "Control-Force-Checkpoint", + "ICKP-Control-Force-End", zap.Error(err), zap.Duration("total-cost", time.Since(t0)), zap.String("ts", ts.ToString()), @@ -212,11 +208,10 @@ func (db *DB) ForceCheckpoint( err = moerr.NewInternalError(ctx, "force checkpoint timeout") return default: - err = db.BGCheckpointRunner.ForceIncrementalCheckpoint(ts, true) - if dbutils.IsRetrieableCheckpoint(err) { + err = db.BGCheckpointRunner.ForceIncrementalCheckpoint(ts) + if dbutils.IsCheckpointRetryableErr(err) { db.BGCheckpointRunner.CleanPenddingCheckpoint() - interval := flushDuration.Milliseconds() / 400 - time.Sleep(time.Duration(interval)) + time.Sleep(flushDuration / 20) break } return diff --git a/pkg/vm/engine/tae/db/dbutils/error.go b/pkg/vm/engine/tae/db/dbutils/error.go index 1c148cf4edc02..92e615217ea56 100644 --- a/pkg/vm/engine/tae/db/dbutils/error.go +++ b/pkg/vm/engine/tae/db/dbutils/error.go @@ -18,6 +18,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" ) -func IsRetrieableCheckpoint(err error) bool { +func IsCheckpointRetryableErr(err error) bool { return moerr.IsMoErrCode(err, moerr.ErrPrevCheckpointNotFinished) } diff --git a/pkg/vm/engine/tae/db/test/catalog_test.go b/pkg/vm/engine/tae/db/test/catalog_test.go index d61ca4d9a4a4f..056273b9893c3 100644 --- a/pkg/vm/engine/tae/db/test/catalog_test.go +++ b/pkg/vm/engine/tae/db/test/catalog_test.go @@ -185,7 +185,7 @@ func TestCheckpointCatalog2(t *testing.T) { } wg.Wait() ts := types.BuildTS(time.Now().UTC().UnixNano(), 0) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(ts, false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(ts) assert.NoError(t, err) lsn := tae.BGCheckpointRunner.MaxLSNInRange(ts) entry, err := tae.Wal.RangeCheckpoint(1, lsn) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 439c75bb0f49a..6f869696de8d4 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -2710,14 +2710,14 @@ func TestSegDelLogtail(t *testing.T) { testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false) testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false) testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) check := func() { ckpEntries := tae.BGCheckpointRunner.GetAllIncrementalCheckpoints() assert.Equal(t, 1, len(ckpEntries)) entry := ckpEntries[0] - ins, del, dataObj, tombstoneObj, err := entry.GetByTableID(context.Background(), tae.Runtime.Fs, tid) + ins, del, dataObj, tombstoneObj, err := entry.GetTableByID(context.Background(), tae.Runtime.Fs, tid) assert.NoError(t, err) assert.Nil(t, ins) // 0 ins assert.Nil(t, del) // 0 del @@ -3687,7 +3687,7 @@ func TestDropCreated3(t *testing.T) { testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false) testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false) testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.Nil(t, err) tae.Restart(ctx) @@ -3744,7 +3744,7 @@ func TestDropCreated4(t *testing.T) { assert.Nil(t, err) assert.Nil(t, txn.Commit(context.Background())) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.Nil(t, err) tae.Restart(ctx) @@ -4628,7 +4628,7 @@ func TestReadCheckpoint(t *testing.T) { } for _, entry := range entries { for _, tid := range tids { - ins, del, _, _, err := entry.GetByTableID(context.Background(), tae.Runtime.Fs, tid) + ins, del, _, _, err := entry.GetTableByID(context.Background(), tae.Runtime.Fs, tid) assert.NoError(t, err) t.Logf("table %d", tid) if ins != nil { @@ -4644,7 +4644,7 @@ func TestReadCheckpoint(t *testing.T) { entries = tae.BGCheckpointRunner.GetAllGlobalCheckpoints() entry := entries[len(entries)-1] for _, tid := range tids { - ins, del, _, _, err := entry.GetByTableID(context.Background(), tae.Runtime.Fs, tid) + ins, del, _, _, err := entry.GetTableByID(context.Background(), tae.Runtime.Fs, tid) assert.NoError(t, err) t.Logf("table %d", tid) if ins != nil { @@ -8033,7 +8033,7 @@ func TestForceCheckpoint(t *testing.T) { err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.Error(t, err) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) } @@ -8129,7 +8129,7 @@ func TestMarshalPartioned(t *testing.T) { testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false) testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false) testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false) - err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) lsn := tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now()) entry, err := tae.Wal.RangeCheckpoint(1, lsn) diff --git a/pkg/vm/engine/tae/db/test/replay_test.go b/pkg/vm/engine/tae/db/test/replay_test.go index 1bae1ed51faaa..edfa793aab01d 100644 --- a/pkg/vm/engine/tae/db/test/replay_test.go +++ b/pkg/vm/engine/tae/db/test/replay_test.go @@ -87,7 +87,7 @@ func TestReplayCatalog1(t *testing.T) { testutil.CompactBlocks(t, 0, tae, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false) testutil.CompactBlocks(t, 0, tae, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false) testutil.CompactBlocks(t, 0, tae, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false) - err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) } } @@ -186,7 +186,7 @@ func TestReplayCatalog2(t *testing.T) { assert.Nil(t, err) assert.Nil(t, txn.Commit(context.Background())) t.Log(tae.Catalog.SimplePPString(common.PPL1)) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) tae.Close() @@ -461,7 +461,7 @@ func TestReplay2(t *testing.T) { err = tae2.ForceFlush(tae2.TxnMgr.Now(), context.Background(), time.Second*10) assert.NoError(t, err) - err = tae2.BGCheckpointRunner.ForceIncrementalCheckpoint(tae2.TxnMgr.Now(), false) + err = tae2.BGCheckpointRunner.ForceIncrementalCheckpoint(tae2.TxnMgr.Now()) assert.NoError(t, err) txn, rel = testutil.GetRelation(t, 0, tae2, "db", schema.Name) @@ -562,7 +562,7 @@ func TestReplay3(t *testing.T) { assert.NoError(t, txn.Commit(context.Background())) txn, _ = tae.GetRelation() - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) } @@ -816,7 +816,7 @@ func TestReplay5(t *testing.T) { testutil.CompactBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false) err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) lsn := tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now()) entry, err := tae.Wal.RangeCheckpoint(1, lsn) @@ -843,7 +843,7 @@ func TestReplay5(t *testing.T) { testutil.CompactBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false) err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) lsn = tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now()) entry, err = tae.Wal.RangeCheckpoint(1, lsn) @@ -880,7 +880,7 @@ func TestReplay5(t *testing.T) { err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) lsn = tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now()) entry, err = tae.Wal.RangeCheckpoint(1, lsn) @@ -941,7 +941,7 @@ func TestReplay6(t *testing.T) { testutil.MergeBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false) err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(t, err) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) _ = tae.Close() @@ -1121,7 +1121,7 @@ func TestReplay8(t *testing.T) { _ = txn.Rollback(context.Background()) tae.CompactBlocks(false) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) tae.Restart(ctx) @@ -1322,7 +1322,7 @@ func TestReplaySnapshots(t *testing.T) { assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) txn, err = tae.StartTxn(nil) @@ -1346,7 +1346,7 @@ func TestReplaySnapshots(t *testing.T) { assert.False(t, baseNode.IsEmpty()) assert.NoError(t, txn.Commit(context.Background())) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) t.Log(tae.Catalog.SimplePPString(3)) @@ -1382,7 +1382,7 @@ func TestReplayDatabaseEntry(t *testing.T) { assert.Equal(t, createSqlStr, dbEntry.GetCreateSql()) testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false) - err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now(), false) + err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now()) assert.NoError(t, err) tae.Restart(ctx) diff --git a/pkg/vm/engine/tae/db/testutil/engine.go b/pkg/vm/engine/tae/db/testutil/engine.go index c422d9538fd9a..10b747e00f3fe 100644 --- a/pkg/vm/engine/tae/db/testutil/engine.go +++ b/pkg/vm/engine/tae/db/testutil/engine.go @@ -164,21 +164,21 @@ func (e *TestEngine) CheckRowsByScan(exp int, applyDelete bool) { func (e *TestEngine) ForceCheckpoint() { err := e.BGFlusher.ForceFlushWithInterval(e.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10) assert.NoError(e.T, err) - err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now(), false) + err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now()) assert.NoError(e.T, err) } func (e *TestEngine) ForceLongCheckpoint() { err := e.BGFlusher.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second) assert.NoError(e.T, err) - err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now(), false) + err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now()) assert.NoError(e.T, err) } func (e *TestEngine) ForceLongCheckpointTruncate() { err := e.BGFlusher.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second) assert.NoError(e.T, err) - err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now(), true) + err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now()) assert.NoError(e.T, err) } @@ -309,7 +309,7 @@ func (e *TestEngine) IncrementalCheckpoint( flushed := e.DB.BGFlusher.IsAllChangesFlushed(types.TS{}, end, true) require.True(e.T, flushed) } - err := e.DB.BGCheckpointRunner.ForceIncrementalCheckpoint(end, false) + err := e.DB.BGCheckpointRunner.ForceIncrementalCheckpoint(end) require.NoError(e.T, err) if truncate { lsn := e.DB.BGCheckpointRunner.MaxLSNInRange(end)