diff --git a/pkg/objectio/injects.go b/pkg/objectio/injects.go index 06afd8c087712..fa62df1a6278c 100644 --- a/pkg/objectio/injects.go +++ b/pkg/objectio/injects.go @@ -32,6 +32,8 @@ const ( FJ_TransferSlow = "fj/transfer/slow" FJ_FlushTimeout = "fj/flush/timeout" + FJ_CheckpointSave = "fj/checkpoint/save" + FJ_TraceRanges = "fj/trace/ranges" FJ_TracePartitionState = "fj/trace/partitionstate" FJ_PrefetchThreshold = "fj/prefetch/threshold" @@ -262,6 +264,29 @@ func InjectLog1( return } +func CheckpointSaveInjected() (string, bool) { + _, sarg, injected := fault.TriggerFault(FJ_CheckpointSave) + return sarg, injected +} + +func InjectCheckpointSave(msg string) (rmFault func(), err error) { + if err = fault.AddFaultPoint( + context.Background(), + FJ_CheckpointSave, + ":::", + "echo", + 0, + msg, + false, + ); err != nil { + return + } + rmFault = func() { + fault.RemoveFaultPoint(context.Background(), FJ_CheckpointSave) + } + return +} + func Debug19524Injected() bool { _, _, injected := fault.TriggerFault(FJ_Debug19524) return injected diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index a40c1bbb1d416..83ef57945aec2 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -640,7 +640,7 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara zap.String("exprs", plan2.FormatExprs(rangesParam.BlockFilters)), zap.Uint64("tbl-id", tbl.tableId), zap.String("txn", tbl.db.op.Txn().DebugString()), - zap.String("blocks", blocks.String()), + zap.Int("blocks", blocks.Len()), zap.String("ps", fmt.Sprintf("%p", part)), zap.Duration("cost", cost), zap.Error(err), diff --git a/pkg/vm/engine/tae/db/checkpoint/executor.go b/pkg/vm/engine/tae/db/checkpoint/executor.go new file mode 100644 index 0000000000000..02421bed6aae5 --- /dev/null +++ b/pkg/vm/engine/tae/db/checkpoint/executor.go @@ -0,0 +1,221 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "context" + "sync/atomic" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/logutil" + v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" + "go.uber.org/zap" +) + +type checkpointJob struct { + doneCh chan struct{} + runner *runner + + runICKPFunc func(context.Context, *runner) error +} + +func (job *checkpointJob) RunICKP(ctx context.Context) (err error) { + if job.runICKPFunc != nil { + return job.runICKPFunc(ctx, job.runner) + } + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + + entry, rollback := job.runner.store.TakeICKPIntent() + if entry == nil { + return + } + + var ( + errPhase string + lsnToTruncate uint64 + lsn uint64 + fatal bool + fields []zap.Field + now = time.Now() + ) + + logutil.Info( + "ICKP-Execute-Start", + zap.String("entry", entry.String()), + ) + + defer func() { + if err != nil { + var logger func(msg string, fields ...zap.Field) + if fatal { + logger = logutil.Fatal + } else { + logger = logutil.Error + } + logger( + "ICKP-Execute-Error", + zap.String("entry", entry.String()), + zap.Error(err), + zap.String("phase", errPhase), + zap.Duration("cost", time.Since(now)), + ) + } else { + fields = append(fields, zap.Duration("cost", time.Since(now))) + fields = append(fields, zap.Uint64("truncate", lsnToTruncate)) + fields = append(fields, zap.Uint64("lsn", lsn)) + fields = append(fields, zap.Uint64("reserve", job.runner.options.reservedWALEntryCount)) + fields = append(fields, zap.String("entry", entry.String())) + fields = append(fields, zap.Duration("age", entry.Age())) + logutil.Info( + "ICKP-Execute-End", + fields..., + ) + } + }() + + var files []string + var file string + if fields, files, err = job.runner.doIncrementalCheckpoint(entry); err != nil { + errPhase = "do-ckp" + rollback() + return + } + + lsn = job.runner.source.GetMaxLSN(entry.start, entry.end) + if lsn > job.runner.options.reservedWALEntryCount { + lsnToTruncate = lsn - job.runner.options.reservedWALEntryCount + } + entry.SetLSN(lsn, lsnToTruncate) + + if prepared := job.runner.store.PrepareCommitICKPIntent(entry); !prepared { + errPhase = "prepare" + rollback() + err = moerr.NewInternalErrorNoCtxf("cannot prepare ickp") + return + } + + if file, err = job.runner.saveCheckpoint( + entry.start, entry.end, + ); err != nil { + errPhase = "save-ckp" + job.runner.store.RollbackICKPIntent(entry) + rollback() + return + } + + job.runner.store.CommitICKPIntent(entry) + v2.TaskCkpEntryPendingDurationHistogram.Observe(entry.Age().Seconds()) + + files = append(files, file) + + // PXU TODO: if crash here, the checkpoint log entry will be lost + var logEntry wal.LogEntry + if logEntry, err = job.runner.wal.RangeCheckpoint(1, lsnToTruncate, files...); err != nil { + errPhase = "wal-ckp" + fatal = true + return + } + if err = logEntry.WaitDone(); err != nil { + errPhase = "wait-wal-ckp-done" + fatal = true + return + } + + job.runner.postCheckpointQueue.Enqueue(entry) + job.runner.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ + end: entry.end, + interval: job.runner.options.globalVersionInterval, + ckpLSN: lsn, + truncateLSN: lsnToTruncate, + }) + + return nil +} + +func (job *checkpointJob) WaitC() <-chan struct{} { + return job.doneCh +} + +func (job *checkpointJob) Done() { + close(job.doneCh) +} + +type checkpointExecutor struct { + ctx context.Context + cancel context.CancelCauseFunc + active atomic.Bool + running atomic.Pointer[checkpointJob] + + runner *runner + runICKPFunc func(context.Context, *runner) error +} + +func newCheckpointExecutor( + runner *runner, +) *checkpointExecutor { + ctx, cancel := context.WithCancelCause(context.Background()) + e := &checkpointExecutor{ + runner: runner, + ctx: ctx, + cancel: cancel, + } + e.active.Store(true) + return e +} + +func (e *checkpointExecutor) StopWithCause(cause error) { + e.active.Store(false) + if cause == nil { + cause = ErrCheckpointDisabled + } + e.cancel(cause) + job := e.running.Load() + if job != nil { + <-job.WaitC() + } + e.running.Store(nil) + e.runner = nil +} + +func (e *checkpointExecutor) RunICKP() (err error) { + if !e.active.Load() { + err = ErrCheckpointDisabled + return + } + if e.running.Load() != nil { + err = ErrPendingCheckpoint + } + job := &checkpointJob{ + doneCh: make(chan struct{}), + runner: e.runner, + runICKPFunc: e.runICKPFunc, + } + if !e.running.CompareAndSwap(nil, job) { + err = ErrPendingCheckpoint + return + } + defer func() { + job.Done() + e.running.Store(nil) + }() + err = job.RunICKP(e.ctx) + return +} diff --git a/pkg/vm/engine/tae/db/checkpoint/info.go b/pkg/vm/engine/tae/db/checkpoint/info.go index 646c1ecf38892..b8ce98865a7a5 100644 --- a/pkg/vm/engine/tae/db/checkpoint/info.go +++ b/pkg/vm/engine/tae/db/checkpoint/info.go @@ -55,7 +55,7 @@ type RunnerReader interface { GetDirtyCollector() logtail.Collector } -func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch { +func (r *runner) collectCheckpointMetadata(start, end types.TS) *containers.Batch { bat := makeRespBatchFromSchema(CheckpointSchema) entries := r.GetAllIncrementalCheckpoints() for _, entry := range entries { diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index 116021cb6014e..13d26608bcbd5 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -96,9 +96,8 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) { } metaFiles := make([]*MetaFile, 0) compactedFiles := make([]*MetaFile, 0) - r.checkpointMetaFiles.Lock() for i, dir := range dirs { - r.checkpointMetaFiles.files[dir.Name] = struct{}{} + r.store.AddMetaFile(dir.Name) start, end, ext := blockio.DecodeCheckpointMetadataFileName(dir.Name) metaFile := &MetaFile{ start: start, @@ -112,7 +111,6 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) { } metaFiles = append(metaFiles, metaFile) } - r.checkpointMetaFiles.Unlock() sort.Slice(metaFiles, func(i, j int) bool { return metaFiles[i].end.LT(&metaFiles[j].end) }) diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index a9d65ec88c104..f576ba231cc8a 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -23,7 +23,6 @@ import ( "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" - v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/store" "go.uber.org/zap" @@ -197,16 +196,13 @@ type runner struct { incrementalPolicy *timeBasedPolicy globalPolicy *countBasedPolicy + executor atomic.Pointer[checkpointExecutor] + incrementalCheckpointQueue sm.Queue globalCheckpointQueue sm.Queue postCheckpointQueue sm.Queue gcCheckpointQueue sm.Queue - checkpointMetaFiles struct { - sync.RWMutex - files map[string]struct{} - } - onceStart sync.Once onceStop sync.Once } @@ -240,11 +236,33 @@ func NewRunner( r.globalCheckpointQueue = sm.NewSafeQueue(r.options.checkpointQueueSize, 100, r.onGlobalCheckpointEntries) r.gcCheckpointQueue = sm.NewSafeQueue(100, 100, r.onGCCheckpointEntries) r.postCheckpointQueue = sm.NewSafeQueue(1000, 1, r.onPostCheckpointEntries) - r.checkpointMetaFiles.files = make(map[string]struct{}) + r.StartExecutor() return r } +func (r *runner) StopExecutor(err error) { + executor := r.executor.Load() + if executor == nil { + return + } + executor.StopWithCause(err) + r.executor.CompareAndSwap(executor, nil) +} + +func (r *runner) StartExecutor() { + for { + executor := r.executor.Load() + if executor != nil { + executor.StopWithCause(ErrExecutorRestarted) + } + newExecutor := newCheckpointExecutor(r) + if r.executor.CompareAndSwap(executor, newExecutor) { + break + } + } +} + func (r *runner) String() string { var buf bytes.Buffer _, _ = fmt.Fprintf(&buf, "CheckpointRunner<") @@ -262,25 +280,15 @@ func (r *runner) String() string { } func (r *runner) AddCheckpointMetaFile(name string) { - r.checkpointMetaFiles.Lock() - defer r.checkpointMetaFiles.Unlock() - r.checkpointMetaFiles.files[name] = struct{}{} + r.store.AddMetaFile(name) } func (r *runner) RemoveCheckpointMetaFile(name string) { - r.checkpointMetaFiles.Lock() - defer r.checkpointMetaFiles.Unlock() - delete(r.checkpointMetaFiles.files, name) + r.store.RemoveMetaFile(name) } func (r *runner) GetCheckpointMetaFiles() map[string]struct{} { - r.checkpointMetaFiles.RLock() - defer r.checkpointMetaFiles.RUnlock() - files := make(map[string]struct{}) - for k, v := range r.checkpointMetaFiles.files { - files[k] = v - } - return files + return r.store.GetMetaFiles() } func (r *runner) onGlobalCheckpointEntries(items ...any) { @@ -321,112 +329,38 @@ func (r *runner) onGCCheckpointEntries(items ...any) { } func (r *runner) onIncrementalCheckpointEntries(items ...any) { - now := time.Now() - entry, rollback := r.store.TakeICKPIntent() - if entry == nil { - return - } var ( - err error - errPhase string - lsnToTruncate uint64 - lsn uint64 - fatal bool - fields []zap.Field + err error + now = time.Now() ) - now = time.Now() - - logutil.Info( - "ICKP-Execute-Start", - zap.String("entry", entry.String()), - ) - + executor := r.executor.Load() + if executor == nil { + err = ErrCheckpointDisabled + } defer func() { + logger := logutil.Info if err != nil { - var logger func(msg string, fields ...zap.Field) - if fatal { - logger = logutil.Fatal - } else { - logger = logutil.Error - } + logger = logutil.Error + } + if err != nil || time.Since(now) > time.Second*10 { logger( - "ICKP-Execute-Error", - zap.String("entry", entry.String()), - zap.Error(err), - zap.String("phase", errPhase), + "ICKP-Execute-Runner-End", zap.Duration("cost", time.Since(now)), - ) - } else { - fields = append(fields, zap.Duration("cost", time.Since(now))) - fields = append(fields, zap.Uint64("truncate", lsnToTruncate)) - fields = append(fields, zap.Uint64("lsn", lsn)) - fields = append(fields, zap.Uint64("reserve", r.options.reservedWALEntryCount)) - fields = append(fields, zap.String("entry", entry.String())) - fields = append(fields, zap.Duration("age", entry.Age())) - logutil.Info( - "ICKP-Execute-End", - fields..., + zap.Error(err), ) } }() - var files []string - var file string - if fields, files, err = r.doIncrementalCheckpoint(entry); err != nil { - errPhase = "do-ckp" - rollback() - return - } - - lsn = r.source.GetMaxLSN(entry.start, entry.end) - if lsn > r.options.reservedWALEntryCount { - lsnToTruncate = lsn - r.options.reservedWALEntryCount - } - - entry.SetLSN(lsn, lsnToTruncate) - if !r.store.CommitICKPIntent(entry, false) { - errPhase = "commit" - rollback() - err = moerr.NewInternalErrorNoCtxf("cannot commit ickp") - return - } - v2.TaskCkpEntryPendingDurationHistogram.Observe(entry.Age().Seconds()) - defer entry.Done() - - if file, err = r.saveCheckpoint( - entry.start, entry.end, lsn, lsnToTruncate, - ); err != nil { - errPhase = "save-ckp" - rollback() - return - } - - files = append(files, file) - - // PXU TODO: if crash here, the checkpoint log entry will be lost - var logEntry wal.LogEntry - if logEntry, err = r.wal.RangeCheckpoint(1, lsnToTruncate, files...); err != nil { - errPhase = "wal-ckp" - fatal = true - return - } - if err = logEntry.WaitDone(); err != nil { - errPhase = "wait-wal-ckp-done" - fatal = true - return - } - - r.postCheckpointQueue.Enqueue(entry) - r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ - end: entry.end, - interval: r.options.globalVersionInterval, - ckpLSN: lsn, - truncateLSN: lsnToTruncate, - }) + err = executor.RunICKP() } -func (r *runner) saveCheckpoint(start, end types.TS, ckpLSN, truncateLSN uint64) (name string, err error) { - bat := r.collectCheckpointMetadata(start, end, ckpLSN, truncateLSN) +func (r *runner) saveCheckpoint( + start, end types.TS, +) (name string, err error) { + if injectErrMsg, injected := objectio.CheckpointSaveInjected(); injected { + return "", moerr.NewInternalErrorNoCtx(injectErrMsg) + } + bat := r.collectCheckpointMetadata(start, end) defer bat.Close() name = blockio.EncodeCheckpointMetadataFileName(CheckpointDir, PrefixMetadata, start, end) writer, err := objectio.NewObjectWriterSpecial(objectio.WriterCheckpoint, name, r.rt.Fs.Service) @@ -447,6 +381,7 @@ func (r *runner) saveCheckpoint(start, end types.TS, ckpLSN, truncateLSN uint64) return } +// TODO: using ctx func (r *runner) doIncrementalCheckpoint(entry *CheckpointEntry) (fields []zap.Field, files []string, err error) { factory := logtail.IncrementalCheckpointDataFactory(r.rt.SID(), entry.start, entry.end, true) data, err := factory(r.catalog) @@ -548,7 +483,7 @@ func (r *runner) doGlobalCheckpoint( r.store.TryAddNewGlobalCheckpointEntry(entry) entry.SetState(ST_Finished) var name string - if name, err = r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil { + if name, err = r.saveCheckpoint(entry.start, entry.end); err != nil { errPhase = "save" return } @@ -615,8 +550,10 @@ func (r *runner) softScheduleCheckpoint(ts *types.TS) (ret *CheckpointEntry, err ret = intent } intentInfo := "nil" + ageStr := "" if intent != nil { intentInfo = intent.String() + ageStr = intent.Age().String() } if (err != nil && err != ErrPendingCheckpoint) || (intent != nil && intent.TooOld()) { logger( @@ -624,6 +561,7 @@ func (r *runner) softScheduleCheckpoint(ts *types.TS) (ret *CheckpointEntry, err zap.String("intent", intentInfo), zap.String("ts", ts.ToString()), zap.Duration("cost", time.Since(now)), + zap.String("age", ageStr), zap.Error(err), ) } @@ -700,13 +638,21 @@ func (r *runner) softScheduleCheckpoint(ts *types.TS) (ret *CheckpointEntry, err if intent.end.LT(ts) { err = ErrPendingCheckpoint - r.incrementalCheckpointQueue.Enqueue(struct{}{}) + r.TryTriggerExecuteICKP() return } if intent.AllChecked() { - r.incrementalCheckpointQueue.Enqueue(struct{}{}) + r.TryTriggerExecuteICKP() + } + return +} + +func (r *runner) TryTriggerExecuteICKP() (err error) { + if r.disabled.Load() { + return } + _, err = r.incrementalCheckpointQueue.Enqueue(struct{}{}) return } @@ -745,7 +691,7 @@ func (r *runner) TryScheduleCheckpoint( ) }() - r.incrementalCheckpointQueue.Enqueue(struct{}{}) + r.TryTriggerExecuteICKP() if intent.end.LT(&ts) { err = ErrPendingCheckpoint diff --git a/pkg/vm/engine/tae/db/checkpoint/runner_test.go b/pkg/vm/engine/tae/db/checkpoint/runner_test.go index d9024cf0377bd..6c07586c93e7f 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner_test.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner_test.go @@ -393,8 +393,10 @@ func Test_RunnerStore1(t *testing.T) { assert.True(t, taken.IsRunning()) assert.True(t, taken.end.EQ(&t3)) - committed := store.CommitICKPIntent(taken, true) - assert.True(t, committed) + prepared := store.PrepareCommitICKPIntent(taken) + assert.True(t, prepared) + + store.CommitICKPIntent(taken) intent, updated = store.UpdateICKPIntent(&t3, true, true) assert.False(t, updated) @@ -496,13 +498,14 @@ func Test_RunnerStore3(t *testing.T) { rollback() intent6 := store.incrementalIntent.Load() - assert.True(t, intent6.IsPendding()) - assert.True(t, intent6.end.EQ(&t2)) - assert.True(t, intent6.start.IsEmpty()) - assert.Equal(t, intent6.bornTime, intent5.bornTime) - assert.Equal(t, intent6.refreshCnt, intent5.refreshCnt) - assert.Equal(t, intent6.policyChecked, intent5.policyChecked) - assert.Equal(t, intent6.flushChecked, intent5.flushChecked) + assert.Nil(t, intent6) + // assert.True(t, intent6.IsPendding()) + // assert.True(t, intent6.end.EQ(&t2)) + // assert.True(t, intent6.start.IsEmpty()) + // assert.Equal(t, intent6.bornTime, intent5.bornTime) + // assert.Equal(t, intent6.refreshCnt, intent5.refreshCnt) + // assert.Equal(t, intent6.policyChecked, intent5.policyChecked) + // assert.Equal(t, intent6.flushChecked, intent5.flushChecked) ii2, updated = store.UpdateICKPIntent(&t3, true, true) assert.True(t, updated) @@ -527,8 +530,9 @@ func Test_RunnerStore3(t *testing.T) { maxEntry := store.MaxIncrementalCheckpoint() assert.Nil(t, maxEntry) - committed := store.CommitICKPIntent(taken, true) - assert.True(t, committed) + prepared := store.PrepareCommitICKPIntent(taken) + assert.True(t, prepared) + store.CommitICKPIntent(taken) assert.True(t, taken.IsFinished()) wg.Wait() @@ -563,21 +567,36 @@ func Test_RunnerStore3(t *testing.T) { assert.True(t, taken2.IsRunning()) intent11 := store.incrementalIntent.Load() assert.Equal(t, intent11, taken2) + t.Logf("taken2: %s", taken2.String()) + entries := store.incrementals.Items() + for i, entry := range entries { + t.Logf("entry[%d]: %s", i, entry.String()) + } // cannot commit a different intent with the incremental intent t5 := types.NextGlobalTsForTest() taken2_1 := InheritCheckpointEntry(taken2, WithEndEntryOption(t5)) - committed = store.CommitICKPIntent(taken2_1, true) - assert.False(t, committed) + prepared = store.PrepareCommitICKPIntent(taken2_1) + assert.False(t, prepared) - taken2.start = taken2.start.Next() - committed = store.CommitICKPIntent(taken2, true) - assert.False(t, committed) + prepared = store.PrepareCommitICKPIntent(taken2) + assert.True(t, prepared) - taken2.start = taken2.start.Prev() - committed = store.CommitICKPIntent(taken2, true) - assert.True(t, committed) + store.CommitICKPIntent(taken2) assert.True(t, taken2.IsFinished()) + entries = store.incrementals.Items() + assert.Equal(t, 2, len(entries)) + for _, entry := range entries { + assert.True(t, entry.IsFinished()) + } + assert.Equalf( + t, + taken2.end.Next(), + store.GetCheckpointed(), + "%s:%s", + taken2.end.ToString(), + store.GetCheckpointed().ToString(), + ) timer := time.After(time.Second * 10) select { @@ -617,10 +636,8 @@ func Test_RunnerStore4(t *testing.T) { rollback() intent4 := store.incrementalIntent.Load() - assert.True(t, intent4.IsPendding()) - assert.True(t, intent4.end.EQ(&t2)) - assert.True(t, intent4.start.IsEmpty()) - assert.True(t, intent4.AllChecked()) + assert.Nil(t, intent4) + <-taken.Wait() } func Test_RunnerStore5(t *testing.T) { @@ -643,3 +660,33 @@ func Test_RunnerStore5(t *testing.T) { assert.True(t, intent2.bornTime.After(intent.bornTime)) t.Log(intent2.String()) } + +func Test_Executor1(t *testing.T) { + executor := newCheckpointExecutor(nil) + assert.True(t, executor.active.Load()) + + done := make(chan struct{}) + running := make(chan struct{}) + mockRunICKP := func(ctx context.Context, _ *runner) (err error) { + close(running) + <-ctx.Done() + close(done) + err = context.Cause(ctx) + return + } + executor.runICKPFunc = mockRunICKP + go func() { + err := executor.RunICKP() + assert.Equal(t, err, ErrPendingCheckpoint) + }() + // wait running + <-running + // stop executor + executor.StopWithCause(ErrPendingCheckpoint) + <-done + + assert.False(t, executor.active.Load()) + err := executor.RunICKP() + assert.Equal(t, err, ErrCheckpointDisabled) + executor.StopWithCause(nil) +} diff --git a/pkg/vm/engine/tae/db/checkpoint/store.go b/pkg/vm/engine/tae/db/checkpoint/store.go index 29b02b9c412d9..1d53ba46d9e3a 100644 --- a/pkg/vm/engine/tae/db/checkpoint/store.go +++ b/pkg/vm/engine/tae/db/checkpoint/store.go @@ -53,6 +53,7 @@ func newRunnerStore( NoLocks: true, }, ) + s.metaFiles = make(map[string]struct{}) return s } @@ -68,7 +69,7 @@ type runnerStore struct { incrementals *btree.BTreeG[*CheckpointEntry] globals *btree.BTreeG[*CheckpointEntry] compacted atomic.Pointer[CheckpointEntry] - // metaFiles map[string]struct{} + metaFiles map[string]struct{} gcIntent types.TS gcCount int @@ -117,6 +118,10 @@ func (s *runnerStore) UpdateICKPIntent( if old != nil && !old.AllChecked() && policyChecked && flushChecked { checkpointed := s.GetCheckpointed() // no need to do checkpoint + if checkpointed.GT(&old.end) { + s.incrementalIntent.CompareAndSwap(old, nil) + continue + } if checkpointed.GE(ts) { intent = nil return @@ -232,12 +237,9 @@ func (s *runnerStore) TakeICKPIntent() (taken *CheckpointEntry, rollback func()) ) if s.incrementalIntent.CompareAndSwap(old, taken) { rollback = func() { - // rollback the intent - putBack := InheritCheckpointEntry( - taken, - WithStateEntryOption(ST_Pending), - ) - s.incrementalIntent.Store(putBack) + // clear the intent and notify the intent is done + s.incrementalIntent.Store(nil) + taken.Done() } break } @@ -247,72 +249,81 @@ func (s *runnerStore) TakeICKPIntent() (taken *CheckpointEntry, rollback func()) return } -// intent must be in Running state -func (s *runnerStore) CommitICKPIntent(intent *CheckpointEntry, done bool) (committed bool) { - defer func() { - if done && committed { - intent.Done() - } - }() - old := s.incrementalIntent.Load() +func (s *runnerStore) PrepareCommitICKPIntent( + intent *CheckpointEntry, +) (ok bool) { + expect := s.incrementalIntent.Load() // should not happen - if old != intent { + if intent != expect || !intent.IsRunning() { logutil.Error( - "CommitICKPIntent-Error", - zap.String("intent", intent.String()), - zap.String("expected", old.String()), + "ICKP-PrepareCommit", + zap.Any("expected", expect), + zap.Any("actual", intent), ) return } s.Lock() defer s.Unlock() - maxICKP, _ := s.incrementals.Max() - maxGCKP, _ := s.globals.Max() - var ( - maxICKPEndNext types.TS - maxGCKPEnd types.TS - ) - if maxICKP != nil { - maxICKPEndNext = maxICKP.end.Next() - } - if maxGCKP != nil { - maxGCKPEnd = maxGCKP.end - } - if maxICKP == nil && maxGCKP == nil { - if !intent.start.IsEmpty() { - logutil.Error( - "CommitICKPIntent-Error", - zap.String("intent", intent.String()), - zap.String("max-i", "nil"), - zap.String("max-g", "nil"), - ) - // PXU TODO: err = xxx - return - } - } else if (maxICKP == nil && !maxGCKPEnd.EQ(&intent.start)) || - (maxICKP != nil && !maxICKPEndNext.EQ(&intent.start)) { - maxi := "nil" - maxg := "nil" - if maxICKP != nil { - maxi = maxICKP.String() - } - if maxGCKP != nil { - maxg = maxGCKP.String() - } - logutil.Error( + s.incrementals.Set(intent) + ok = true + return +} + +func (s *runnerStore) RollbackICKPIntent( + intent *CheckpointEntry, +) { + expect := s.incrementalIntent.Load() + // should not happen + if intent != expect || !intent.IsRunning() { + logutil.Fatal( + "ICKP-Rollback", + zap.Any("expected", expect), + zap.Any("actual", intent), + ) + } + s.Lock() + defer s.Unlock() + s.incrementals.Delete(intent) +} + +// intent must be in Running state +func (s *runnerStore) CommitICKPIntent( + intent *CheckpointEntry, +) { + defer intent.Done() + old := s.incrementalIntent.Load() + // should not happen + if old != intent { + logutil.Fatal( "CommitICKPIntent-Error", zap.String("intent", intent.String()), - zap.String("max-i", maxi), - zap.String("max-g", maxg), + zap.String("expected", old.String()), ) - // PXU TODO: err = xxx - return } - s.incrementalIntent.Store(nil) intent.SetState(ST_Finished) - s.incrementals.Set(intent) - committed = true - return + s.incrementalIntent.Store(nil) +} + +func (s *runnerStore) AddMetaFile(name string) { + s.Lock() + defer s.Unlock() + s.metaFiles[name] = struct{}{} +} + +func (s *runnerStore) RemoveMetaFile(name string) { + s.Lock() + defer s.Unlock() + delete(s.metaFiles, name) +} + +func (s *runnerStore) GetMetaFiles() map[string]struct{} { + s.RLock() + defer s.RUnlock() + files := make(map[string]struct{}) + for k, v := range s.metaFiles { + files[k] = v + } + return files } func (s *runnerStore) ExportStatsLocked() []zap.Field { diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index 289228826e99a..a8b0293b077e4 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -37,9 +37,15 @@ type TestRunner interface { ForceIncrementalCheckpoint(end types.TS) error MaxLSNInRange(end types.TS) uint64 + GetICKPIntentOnlyForTest() *CheckpointEntry + GCNeeded() bool } +func (r *runner) GetICKPIntentOnlyForTest() *CheckpointEntry { + return r.store.GetICKPIntent() +} + // DisableCheckpoint stops generating checkpoint func (r *runner) DisableCheckpoint() { r.disabled.Store(true) @@ -167,7 +173,7 @@ func (r *runner) ForceIncrementalCheckpoint(ts types.TS) (err error) { ) }() - r.incrementalCheckpointQueue.Enqueue(struct{}{}) + r.TryTriggerExecuteICKP() select { case <-r.ctx.Done(): @@ -177,6 +183,11 @@ func (r *runner) ForceIncrementalCheckpoint(ts types.TS) (err error) { err = moerr.NewInternalErrorNoCtx("timeout") return case <-intent.Wait(): + checkpointed := r.store.GetCheckpointed() + // if checkpointed < ts, something wrong may be happend and the previous intent was rollbacked + if checkpointed.LT(&ts) { + err = ErrPendingCheckpoint + } } return } @@ -210,7 +221,7 @@ func (r *runner) ForceCheckpointForBackup(end types.TS) (location string, err er entry.ckpLSN = lsn entry.truncateLSN = lsnToTruncate var file string - if file, err = r.saveCheckpoint(entry.start, entry.end, lsn, lsnToTruncate); err != nil { + if file, err = r.saveCheckpoint(entry.start, entry.end); err != nil { return } files = append(files, file) diff --git a/pkg/vm/engine/tae/db/checkpoint/types.go b/pkg/vm/engine/tae/db/checkpoint/types.go index b343a76184772..e3ac0f29974a4 100644 --- a/pkg/vm/engine/tae/db/checkpoint/types.go +++ b/pkg/vm/engine/tae/db/checkpoint/types.go @@ -25,6 +25,8 @@ import ( ) var ErrPendingCheckpoint = moerr.NewPrevCheckpointNotFinished() +var ErrCheckpointDisabled = moerr.NewInternalErrorNoCtxf("checkpoint disabled") +var ErrExecutorRestarted = moerr.NewInternalErrorNoCtxf("executor restarted") type State int8 diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 8c136074cb826..b73684cec38db 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -204,6 +204,9 @@ func (db *DB) ForceCheckpoint( timeout := time.After(wait) for { select { + case <-ctx.Done(): + err = context.Cause(ctx) + return case <-timeout: err = moerr.NewInternalError(ctx, "force checkpoint timeout") return diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 6f869696de8d4..fcbbac38cc41c 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -7744,6 +7744,70 @@ func TestGCCheckpoint1(t *testing.T) { } } +// 1. make some transactions +// 2. fault injection to save checkpoint +// 3. force checkpoint -> expect error +// 4. check the intent -> expect not nil, all checked, running +// 5. check the incremental checkpoints -> expect 0 +// 6. remove the fault injection +// 7. force checkpoint -> expect no error +// 8. check the incremental checkpoints -> expect 1, finished +// 9. check the intent -> expect nil +func Test_CheckpointChaos1(t *testing.T) { + defer testutils.AfterTest(t)() + ctx := context.Background() + opts := config.WithLongScanAndCKPOpts(nil) + + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + + i := 0 + nextDBName := func() string { + i++ + return fmt.Sprintf("db_%d", i) + } + + commitOneTxn := func() { + txn, _ := tae.StartTxn(nil) + _, err := txn.CreateDatabase(nextDBName(), "", "") + assert.Nil(t, err) + } + + for i := 0; i < 2; i++ { + commitOneTxn() + } + + fault.Enable() + defer fault.Disable() + msg := "checkpoint-chaos" + rmFn, err := objectio.InjectCheckpointSave(msg) + assert.NoError(t, err) + + now := tae.TxnMgr.Now() + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + err = tae.DB.ForceCheckpoint(ctx, now, time.Minute) + assert.Error(t, err) + + intent := tae.BGCheckpointRunner.GetICKPIntentOnlyForTest() + assert.Nil(t, intent) + + entries := tae.BGCheckpointRunner.GetAllIncrementalCheckpoints() + assert.Equal(t, 0, len(entries)) + + rmFn() + ctx = context.Background() + err = tae.DB.ForceCheckpoint(ctx, now, time.Minute) + assert.NoError(t, err) + + entries = tae.BGCheckpointRunner.GetAllIncrementalCheckpoints() + assert.Equal(t, 1, len(entries)) + assert.True(t, entries[0].IsFinished()) + + intent = tae.BGCheckpointRunner.GetICKPIntentOnlyForTest() + assert.Nil(t, intent) +} + func TestGCCatalog1(t *testing.T) { defer testutils.AfterTest(t)() ctx := context.Background()