Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve debug info #20730

Merged
merged 12 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions pkg/vm/engine/tae/db/checkpoint/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) {
var checkpointVersion int
// in version 1, checkpoint metadata doesn't contain 'version'.
vecLen := len(bats[0].Vecs)
logutil.Infof("checkpoint version: %d, list and load duration: %v", vecLen, time.Since(t0))
logutil.Info(
"Replay-Checkpoint",
zap.Int("col-cnt", vecLen),
zap.Duration("load-cost", time.Since(t0)),
)
if vecLen < CheckpointSchemaColumnCountV1 {
checkpointVersion = 1
} else if vecLen < CheckpointSchemaColumnCountV2 {
Expand Down Expand Up @@ -258,7 +262,7 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) {
}

// ReplayThreeTablesObjectlist replays the object list the three tables, and check the LSN and TS.
func (c *CkpReplayer) ReplayThreeTablesObjectlist() (
func (c *CkpReplayer) ReplayThreeTablesObjectlist(phase string) (
maxTs types.TS,
maxLSN uint64,
isLSNValid bool,
Expand All @@ -282,9 +286,19 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist() (
dataFactory := c.dataF
maxGlobal := r.MaxGlobalCheckpoint()
if maxGlobal != nil {
logutil.Infof("replay checkpoint %v", maxGlobal)
err = datas[c.globalCkpIdx].ApplyReplayTo(r.catalog, dataFactory, true)
c.applyCount++
logger := logutil.Info
if err != nil {
logger = logutil.Error
}
logger(
"Replay-3-Table-From-Global",
zap.String("phase", phase),
zap.String("checkpoint", maxGlobal.String()),
zap.Duration("cost", time.Since(t0)),
zap.Error(err),
)
if err != nil {
return
}
Expand All @@ -308,6 +322,7 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist() (
e.String())
}
}
logger := logutil.Info
for i := 0; i < len(entries); i++ {
checkpointEntry := entries[i]
if checkpointEntry == nil {
Expand All @@ -316,8 +331,17 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist() (
if checkpointEntry.end.LE(&maxTs) {
continue
}
logutil.Infof("replay checkpoint %v", checkpointEntry)
err = datas[i].ApplyReplayTo(r.catalog, dataFactory, true)
start := time.Now()
if err = datas[i].ApplyReplayTo(r.catalog, dataFactory, true); err != nil {
logger = logutil.Error
}
logger(
"Replay-3-Table-From-Incremental",
zap.String("phase", phase),
zap.String("checkpoint", checkpointEntry.String()),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
c.applyCount++
if err != nil {
return
Expand All @@ -342,7 +366,10 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist() (
return
}

func (c *CkpReplayer) ReplayCatalog(readTxn txnif.AsyncTxn) (err error) {
func (c *CkpReplayer) ReplayCatalog(
readTxn txnif.AsyncTxn,
phase string,
) (err error) {
start := time.Now()

defer func() {
Expand All @@ -351,8 +378,8 @@ func (c *CkpReplayer) ReplayCatalog(readTxn txnif.AsyncTxn) (err error) {
logger = logutil.Error
}
logger(
"open-tae",
zap.String("replay", "checkpoint-catalog"),
"Replay-Catalog",
zap.String("phase", phase),
zap.Duration("cost", time.Since(start)),
zap.Error(err),
)
Expand All @@ -379,7 +406,7 @@ func (c *CkpReplayer) ReplayCatalog(readTxn txnif.AsyncTxn) (err error) {
}

// ReplayObjectlist replays the data part of the checkpoint.
func (c *CkpReplayer) ReplayObjectlist() (err error) {
func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) {
if len(c.ckpEntries) == 0 {
return
}
Expand Down Expand Up @@ -425,8 +452,8 @@ func (c *CkpReplayer) ReplayObjectlist() (err error) {
r.catalog.GetUsageMemo().(*logtail.TNUsageMemo).PrepareReplay(ckpDatas, ckpVers)
r.source.Init(maxTs)
logutil.Info(
"open-tae",
zap.String("replay", "checkpoint-objectlist"),
"Replay-Checkpoints",
zap.String("phase", phase),
zap.Duration("apply-cost", c.applyDuration),
zap.Duration("read-cost", c.readDuration),
zap.Int("apply-count", c.applyCount),
Expand Down
84 changes: 66 additions & 18 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package db

import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/util/fault"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils"
Expand Down Expand Up @@ -121,7 +121,8 @@ func (db *DB) FlushTable(
func (db *DB) ForceCheckpoint(
ctx context.Context,
ts types.TS,
flushDuration time.Duration) (err error) {
flushDuration time.Duration,
) (err error) {
// FIXME: cannot disable with a running job
db.BGCheckpointRunner.DisableCheckpoint()
defer db.BGCheckpointRunner.EnableCheckpoint()
Expand All @@ -131,7 +132,23 @@ func (db *DB) ForceCheckpoint(
}
t0 := time.Now()
err = db.BGCheckpointRunner.ForceFlush(ts, ctx, flushDuration)
logutil.Infof("[Force Checkpoint] flush takes %v: %v", time.Since(t0), err)
forceFlushCost := time.Since(t0)

defer func() {
logger := logutil.Info
if err != nil {
logger = logutil.Error
}
logger(
"Control-Force-Checkpoint",
zap.Error(err),
zap.Duration("total-cost", time.Since(t0)),
zap.String("ts", ts.ToString()),
zap.Duration("flush-duration", flushDuration),
zap.Duration("force-flush-cost", forceFlushCost),
)
}()

if err != nil {
return err
}
Expand All @@ -145,7 +162,8 @@ func (db *DB) ForceCheckpoint(
for {
select {
case <-timeout:
return moerr.NewInternalError(ctx, fmt.Sprintf("timeout for: %v", err))
err = moerr.NewInternalError(ctx, "force checkpoint timeout")
return
default:
err = db.BGCheckpointRunner.ForceIncrementalCheckpoint(ts, true)
if dbutils.IsRetrieableCheckpoint(err) {
Expand All @@ -154,30 +172,45 @@ func (db *DB) ForceCheckpoint(
time.Sleep(time.Duration(interval))
break
}
logutil.Debugf("[Force Checkpoint] takes %v", time.Since(t0))
return err
return
}
}
}

func (db *DB) ForceGlobalCheckpoint(
ctx context.Context,
ts types.TS,
flushDuration, versionInterval time.Duration) (err error) {
flushDuration, versionInterval time.Duration,
) (err error) {
// FIXME: cannot disable with a running job
db.BGCheckpointRunner.DisableCheckpoint()
defer db.BGCheckpointRunner.EnableCheckpoint()
db.BGCheckpointRunner.CleanPenddingCheckpoint()
t0 := time.Now()
err = db.BGCheckpointRunner.ForceFlush(ts, ctx, flushDuration)
logutil.Infof("[Force Global Checkpoint] flush takes %v: %v", time.Since(t0), err)
forceFlushCost := time.Since(t0)
defer func() {
logger := logutil.Info
if err != nil {
logger = logutil.Error
}
logger(
"Control-ForceGlobalCheckpoint",
zap.Duration("total-cost", time.Since(t0)),
zap.Duration("force-flush-cost", forceFlushCost),
zap.Duration("flush-duration", flushDuration),
zap.Duration("version-interval", versionInterval),
zap.Error(err),
)
}()

if err != nil {
return err
}
if err = db.BGCheckpointRunner.ForceGlobalCheckpointSynchronously(ctx, ts, versionInterval); err != nil {
return err
return
}
logutil.Infof("[Force Global Checkpoint] takes %v", time.Since(t0))

err = db.BGCheckpointRunner.ForceGlobalCheckpointSynchronously(
ctx, ts, versionInterval,
)
return err
}

Expand All @@ -192,14 +225,29 @@ func (db *DB) ForceCheckpointForBackup(
db.BGCheckpointRunner.CleanPenddingCheckpoint()
t0 := time.Now()
err = db.BGCheckpointRunner.ForceFlush(ts, ctx, flushDuration)
logutil.Infof("[Force Checkpoint] flush takes %v: %v", time.Since(t0), err)
forceFlushCost := time.Since(t0)

defer func() {
logger := logutil.Info
if err != nil {
logger = logutil.Error
}
logger(
"Control-ForeCheckpointForBackup",
zap.Duration("total-cost", time.Since(t0)),
zap.Duration("force-flush-cost", forceFlushCost),
zap.Duration("flush-duration", flushDuration),
zap.String("location", location),
zap.Error(err),
)
}()

if err != nil {
return
}
if location, err = db.BGCheckpointRunner.ForceCheckpointForBackup(ts); err != nil {
return
}
logutil.Debugf("[Force Checkpoint] takes %v", time.Since(t0))

location, err = db.BGCheckpointRunner.ForceCheckpointForBackup(ts)

return
}

Expand Down
42 changes: 19 additions & 23 deletions pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (

const (
WALDir = "wal"

Phase_Open = "open-tae"
)

func fillRuntimeOptions(opts *options.Options) {
Expand Down Expand Up @@ -78,9 +80,9 @@ func Open(
dbLocker, err := createDBLock(dirname)

logutil.Info(
"open-tae",
common.OperationField("Start"),
common.OperandField("open"),
Phase_Open,
zap.String("db-dirname", dirname),
zap.Error(err),
)
totalTime := time.Now()

Expand All @@ -92,10 +94,9 @@ func Open(
dbLocker.Close()
}
logutil.Info(
"open-tae", common.OperationField("End"),
common.OperandField("open"),
common.AnyField("cost", time.Since(totalTime)),
common.AnyField("err", err),
Phase_Open,
zap.Duration("open-tae-cost", time.Since(totalTime)),
zap.Error(err),
)
}()

Expand All @@ -105,10 +106,9 @@ func Open(
wbuf := &bytes.Buffer{}
werr := toml.NewEncoder(wbuf).Encode(opts)
logutil.Info(
"open-tae",
common.OperationField("Config"),
common.AnyField("toml", wbuf.String()),
common.ErrorField(werr),
Phase_Open,
zap.String("config", wbuf.String()),
zap.Error(werr),
)
serviceDir := path.Join(dirname, "data")
if opts.Fs == nil {
Expand Down Expand Up @@ -205,7 +205,7 @@ func Open(
}

// 1. replay three tables objectlist
checkpointed, ckpLSN, valid, err := ckpReplayer.ReplayThreeTablesObjectlist()
checkpointed, ckpLSN, valid, err := ckpReplayer.ReplayThreeTablesObjectlist(Phase_Open)
if err != nil {
panic(err)
}
Expand All @@ -219,20 +219,18 @@ func Open(
store.BindTxn(txn)
}
// 2. replay all table Entries
if err = ckpReplayer.ReplayCatalog(txn); err != nil {
if err = ckpReplayer.ReplayCatalog(txn, Phase_Open); err != nil {
panic(err)
}

// 3. replay other tables' objectlist
if err = ckpReplayer.ReplayObjectlist(); err != nil {
if err = ckpReplayer.ReplayObjectlist(Phase_Open); err != nil {
panic(err)
}
logutil.Info(
"open-tae",
common.OperationField("replay"),
common.OperandField("checkpoints"),
common.AnyField("cost", time.Since(now)),
common.AnyField("checkpointed", checkpointed.ToString()),
Phase_Open,
zap.Duration("replay-checkpoints-cost", time.Since(now)),
zap.String("max-checkpoint", checkpointed.ToString()),
)

now = time.Now()
Expand All @@ -241,10 +239,8 @@ func Open(

// checkObjectState(db)
logutil.Info(
"open-tae",
common.OperationField("replay"),
common.OperandField("wal"),
common.AnyField("cost", time.Since(now)),
Phase_Open,
zap.Duration("replay-wal-cost", time.Since(now)),
)

db.DBLocker, dbLocker = dbLocker, nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/vm/engine/tae/db/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"go.uber.org/zap"

"sync"

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase"
Expand Down Expand Up @@ -120,11 +120,12 @@ func (replayer *Replayer) Replay() {
close(replayer.txnCmdChan)
replayer.wg.Wait()
replayer.postReplayWal()
logutil.Info("open-tae", common.OperationField("replay"),
common.OperandField("wal"),
common.AnyField("apply logentries cost", replayer.applyDuration),
common.AnyField("read count", replayer.readCount),
common.AnyField("apply count", replayer.applyCount))
logutil.Info(
"Replay-Wal",
zap.Duration("apply-cost", replayer.applyDuration),
zap.Int("read-count", replayer.readCount),
zap.Int("apply-count", replayer.applyCount),
)
}

func (replayer *Replayer) OnReplayEntry(group uint32, lsn uint64, payload []byte, typ uint16, info any) {
Expand Down
Loading
Loading