Skip to content

Commit

Permalink
refactor ickp impl 2 (#20881)
Browse files Browse the repository at this point in the history
refactor ickp executor to make it stoppable

Approved by: @triump2020, @LeftHandCold
  • Loading branch information
XuPeng-SH authored Dec 24, 2024
1 parent 4803a23 commit e4dae1c
Show file tree
Hide file tree
Showing 12 changed files with 537 additions and 209 deletions.
25 changes: 25 additions & 0 deletions pkg/objectio/injects.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
FJ_TransferSlow = "fj/transfer/slow"
FJ_FlushTimeout = "fj/flush/timeout"

FJ_CheckpointSave = "fj/checkpoint/save"

FJ_TraceRanges = "fj/trace/ranges"
FJ_TracePartitionState = "fj/trace/partitionstate"
FJ_PrefetchThreshold = "fj/prefetch/threshold"
Expand Down Expand Up @@ -262,6 +264,29 @@ func InjectLog1(
return
}

func CheckpointSaveInjected() (string, bool) {
_, sarg, injected := fault.TriggerFault(FJ_CheckpointSave)
return sarg, injected
}

func InjectCheckpointSave(msg string) (rmFault func(), err error) {
if err = fault.AddFaultPoint(
context.Background(),
FJ_CheckpointSave,
":::",
"echo",
0,
msg,
false,
); err != nil {
return
}
rmFault = func() {
fault.RemoveFaultPoint(context.Background(), FJ_CheckpointSave)
}
return
}

func Debug19524Injected() bool {
_, _, injected := fault.TriggerFault(FJ_Debug19524)
return injected
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara
zap.String("exprs", plan2.FormatExprs(rangesParam.BlockFilters)),
zap.Uint64("tbl-id", tbl.tableId),
zap.String("txn", tbl.db.op.Txn().DebugString()),
zap.String("blocks", blocks.String()),
zap.Int("blocks", blocks.Len()),
zap.String("ps", fmt.Sprintf("%p", part)),
zap.Duration("cost", cost),
zap.Error(err),
Expand Down
221 changes: 221 additions & 0 deletions pkg/vm/engine/tae/db/checkpoint/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package checkpoint

import (
"context"
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"go.uber.org/zap"
)

type checkpointJob struct {
doneCh chan struct{}
runner *runner

runICKPFunc func(context.Context, *runner) error
}

func (job *checkpointJob) RunICKP(ctx context.Context) (err error) {
if job.runICKPFunc != nil {
return job.runICKPFunc(ctx, job.runner)
}
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}

entry, rollback := job.runner.store.TakeICKPIntent()
if entry == nil {
return
}

var (
errPhase string
lsnToTruncate uint64
lsn uint64
fatal bool
fields []zap.Field
now = time.Now()
)

logutil.Info(
"ICKP-Execute-Start",
zap.String("entry", entry.String()),
)

defer func() {
if err != nil {
var logger func(msg string, fields ...zap.Field)
if fatal {
logger = logutil.Fatal
} else {
logger = logutil.Error
}
logger(
"ICKP-Execute-Error",
zap.String("entry", entry.String()),
zap.Error(err),
zap.String("phase", errPhase),
zap.Duration("cost", time.Since(now)),
)
} else {
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.Uint64("truncate", lsnToTruncate))
fields = append(fields, zap.Uint64("lsn", lsn))
fields = append(fields, zap.Uint64("reserve", job.runner.options.reservedWALEntryCount))
fields = append(fields, zap.String("entry", entry.String()))
fields = append(fields, zap.Duration("age", entry.Age()))
logutil.Info(
"ICKP-Execute-End",
fields...,
)
}
}()

var files []string
var file string
if fields, files, err = job.runner.doIncrementalCheckpoint(entry); err != nil {
errPhase = "do-ckp"
rollback()
return
}

lsn = job.runner.source.GetMaxLSN(entry.start, entry.end)
if lsn > job.runner.options.reservedWALEntryCount {
lsnToTruncate = lsn - job.runner.options.reservedWALEntryCount
}
entry.SetLSN(lsn, lsnToTruncate)

if prepared := job.runner.store.PrepareCommitICKPIntent(entry); !prepared {
errPhase = "prepare"
rollback()
err = moerr.NewInternalErrorNoCtxf("cannot prepare ickp")
return
}

if file, err = job.runner.saveCheckpoint(
entry.start, entry.end,
); err != nil {
errPhase = "save-ckp"
job.runner.store.RollbackICKPIntent(entry)
rollback()
return
}

job.runner.store.CommitICKPIntent(entry)
v2.TaskCkpEntryPendingDurationHistogram.Observe(entry.Age().Seconds())

files = append(files, file)

// PXU TODO: if crash here, the checkpoint log entry will be lost
var logEntry wal.LogEntry
if logEntry, err = job.runner.wal.RangeCheckpoint(1, lsnToTruncate, files...); err != nil {
errPhase = "wal-ckp"
fatal = true
return
}
if err = logEntry.WaitDone(); err != nil {
errPhase = "wait-wal-ckp-done"
fatal = true
return
}

job.runner.postCheckpointQueue.Enqueue(entry)
job.runner.globalCheckpointQueue.Enqueue(&globalCheckpointContext{
end: entry.end,
interval: job.runner.options.globalVersionInterval,
ckpLSN: lsn,
truncateLSN: lsnToTruncate,
})

return nil
}

func (job *checkpointJob) WaitC() <-chan struct{} {
return job.doneCh
}

func (job *checkpointJob) Done() {
close(job.doneCh)
}

type checkpointExecutor struct {
ctx context.Context
cancel context.CancelCauseFunc
active atomic.Bool
running atomic.Pointer[checkpointJob]

runner *runner
runICKPFunc func(context.Context, *runner) error
}

func newCheckpointExecutor(
runner *runner,
) *checkpointExecutor {
ctx, cancel := context.WithCancelCause(context.Background())
e := &checkpointExecutor{
runner: runner,
ctx: ctx,
cancel: cancel,
}
e.active.Store(true)
return e
}

func (e *checkpointExecutor) StopWithCause(cause error) {
e.active.Store(false)
if cause == nil {
cause = ErrCheckpointDisabled
}
e.cancel(cause)
job := e.running.Load()
if job != nil {
<-job.WaitC()
}
e.running.Store(nil)
e.runner = nil
}

func (e *checkpointExecutor) RunICKP() (err error) {
if !e.active.Load() {
err = ErrCheckpointDisabled
return
}
if e.running.Load() != nil {
err = ErrPendingCheckpoint
}
job := &checkpointJob{
doneCh: make(chan struct{}),
runner: e.runner,
runICKPFunc: e.runICKPFunc,
}
if !e.running.CompareAndSwap(nil, job) {
err = ErrPendingCheckpoint
return
}
defer func() {
job.Done()
e.running.Store(nil)
}()
err = job.RunICKP(e.ctx)
return
}
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/checkpoint/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type RunnerReader interface {
GetDirtyCollector() logtail.Collector
}

func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch {
func (r *runner) collectCheckpointMetadata(start, end types.TS) *containers.Batch {
bat := makeRespBatchFromSchema(CheckpointSchema)
entries := r.GetAllIncrementalCheckpoints()
for _, entry := range entries {
Expand Down
4 changes: 1 addition & 3 deletions pkg/vm/engine/tae/db/checkpoint/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) {
}
metaFiles := make([]*MetaFile, 0)
compactedFiles := make([]*MetaFile, 0)
r.checkpointMetaFiles.Lock()
for i, dir := range dirs {
r.checkpointMetaFiles.files[dir.Name] = struct{}{}
r.store.AddMetaFile(dir.Name)
start, end, ext := blockio.DecodeCheckpointMetadataFileName(dir.Name)
metaFile := &MetaFile{
start: start,
Expand All @@ -112,7 +111,6 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) {
}
metaFiles = append(metaFiles, metaFile)
}
r.checkpointMetaFiles.Unlock()
sort.Slice(metaFiles, func(i, j int) bool {
return metaFiles[i].end.LT(&metaFiles[j].end)
})
Expand Down
Loading

0 comments on commit e4dae1c

Please sign in to comment.