From d28c3d5cfaac0531fd7e29f4fea4978d3cdeaf67 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Sun, 22 Dec 2024 22:43:05 +0800 Subject: [PATCH] update 5 --- pkg/vm/engine/tae/db/checkpoint/testutils.go | 10 ++++++++++ pkg/vm/engine/tae/db/checkpoint/types.go | 3 +++ 2 files changed, 13 insertions(+) diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index 2f015c8ecf627..983d5abd6de80 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -144,6 +144,14 @@ func (r *runner) ForceIncrementalCheckpoint(ts types.TS) (err error) { if intent == nil { return } + + entry := intent.(*CheckpointEntry) + + if entry.end.LT(&ts) || !entry.AllChecked() { + err = ErrPendingCheckpoint + return + } + // TODO: use context timeout := time.After(time.Minute * 2) now := time.Now() @@ -160,6 +168,8 @@ func (r *runner) ForceIncrementalCheckpoint(ts types.TS) (err error) { ) }() + r.incrementalCheckpointQueue.Enqueue(struct{}{}) + select { case <-r.ctx.Done(): err = context.Cause(r.ctx) diff --git a/pkg/vm/engine/tae/db/checkpoint/types.go b/pkg/vm/engine/tae/db/checkpoint/types.go index 1a7d3c54ed9f9..b343a76184772 100644 --- a/pkg/vm/engine/tae/db/checkpoint/types.go +++ b/pkg/vm/engine/tae/db/checkpoint/types.go @@ -17,12 +17,15 @@ package checkpoint import ( "context" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) +var ErrPendingCheckpoint = moerr.NewPrevCheckpointNotFinished() + type State int8 const (