Skip to content

Commit

Permalink
small code refactor (#20686)
Browse files Browse the repository at this point in the history
more detailed log

Approved by: @triump2020
  • Loading branch information
XuPeng-SH authored Dec 10, 2024
1 parent 6f2799d commit 31d5996
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 41 deletions.
17 changes: 5 additions & 12 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,26 +613,19 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara
tbl.enableLogFilterExpr.Store(true)
}

if tbl.enableLogFilterExpr.Load() {
if ok, _ := objectio.RangesLogInjected(tbl.db.databaseName, tbl.tableDef.Name); ok ||
err != nil ||
tbl.enableLogFilterExpr.Load() ||
cost > 5*time.Second {
logutil.Info(
"TXN-FILTER-RANGE-LOG",
zap.String("name", tbl.tableDef.Name),
zap.String("exprs", plan2.FormatExprs(rangesParam.BlockFilters)),
zap.Int("ranges-len", blocks.Len()),
zap.Uint64("tbl-id", tbl.tableId),
zap.String("txn", tbl.db.op.Txn().DebugString()),
)
}

if ok, _ := objectio.RangesLogInjected(tbl.db.databaseName, tbl.tableDef.Name); ok {
logutil.Info(
"INJECT-TRACE-RANGES",
zap.String("name", tbl.tableDef.Name),
zap.String("exprs", plan2.FormatExprs(rangesParam.BlockFilters)),
zap.Uint64("tbl-id", tbl.tableId),
zap.String("txn", tbl.db.op.Txn().DebugString()),
zap.String("blocks", blocks.String()),
zap.String("ps", fmt.Sprintf("%p", part)),
zap.Duration("cost", cost),
zap.Error(err),
)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/vm/engine/engine_util/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,18 @@ func (r *reader) Read(
if isEnd {
return
}
blkStr := "nil"
if blkInfo != nil {
blkStr = blkInfo.String()
}
if logLevel == 0 {
logutil.Info(
"LOGREADER-INJECTED-1",
"DEBUG-SLOW-TXN-READER",
zap.String("name", r.name),
zap.String("ts", r.ts.DebugString()),
zap.Int("data-len", outBatch.RowCount()),
zap.Duration("duration", time.Since(start)),
zap.String("blk", blkStr),
zap.Error(err),
)
} else {
Expand All @@ -444,8 +451,11 @@ func (r *reader) Read(
logutil.Info(
"LOGREADER-INJECTED-1",
zap.String("name", r.name),
zap.String("ts", r.ts.DebugString()),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
zap.String("data", common.MoBatchToString(outBatch, maxLogCnt)),
zap.String("blk", blkStr),
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type DB struct {
LogtailMgr *logtail.Manager
Wal wal.Driver

GCJobs *tasks.CancelableJobs
CronJobs *tasks.CancelableJobs

BGScanner wb.IHeartbeater
BGCheckpointRunner checkpoint.Runner
Expand Down Expand Up @@ -273,7 +273,7 @@ func (db *DB) Close() error {
}
db.Closed.Store(ErrClosed)
db.Controller.Stop()
db.GCJobs.Reset()
db.CronJobs.Reset()
db.BGScanner.Stop()
db.BGCheckpointRunner.Stop()
db.Runtime.Scheduler.Stop()
Expand Down
43 changes: 17 additions & 26 deletions pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ func Open(
db.DiskCleaner = gc2.NewDiskCleaner(cleaner)
db.DiskCleaner.Start()

db.GCJobs = tasks.NewCancelableJobs()
db.CronJobs = tasks.NewCancelableJobs()

db.GCJobs.AddJob(
db.CronJobs.AddJob(
"GC-Transfer-Table",
opts.CheckpointCfg.TransferInterval,
func(ctx context.Context) {
Expand All @@ -293,15 +293,15 @@ func Open(
},
1,
)
db.GCJobs.AddJob(
db.CronJobs.AddJob(
"GC-Disk",
opts.GCCfg.ScanGCInterval,
func(ctx context.Context) {
db.DiskCleaner.GC(ctx)
},
1,
)
db.GCJobs.AddJob(
db.CronJobs.AddJob(
"GC-Checkpoint",
opts.CheckpointCfg.GCCheckpointInterval,
func(ctx context.Context) {
Expand All @@ -321,7 +321,7 @@ func Open(
},
1,
)
db.GCJobs.AddJob(
db.CronJobs.AddJob(
"GC-Catalog-Cache",
opts.CatalogCfg.GCInterval,
func(ctx context.Context) {
Expand All @@ -336,7 +336,7 @@ func Open(
},
1,
)
db.GCJobs.AddJob(
db.CronJobs.AddJob(
"GC-Logtail",
opts.CheckpointCfg.GCCheckpointInterval,
func(ctx context.Context) {
Expand All @@ -349,7 +349,7 @@ func Open(
},
1,
)
db.GCJobs.AddJob(
db.CronJobs.AddJob(
"GC-LockMerge",
options.DefaultLockMergePruneInterval,
func(ctx context.Context) {
Expand All @@ -358,8 +358,17 @@ func Open(
1,
)

db.CronJobs.AddJob(
"REPORT-MPOOL-STATS",
time.Second*10,
func(ctx context.Context) {
mpoolAllocatorSubTask()
},
1,
)

if opts.CheckpointCfg.MetadataCheckInterval != 0 {
db.GCJobs.AddJob(
db.CronJobs.AddJob(
"META-CHECK",
opts.CheckpointCfg.MetadataCheckInterval,
func(ctx context.Context) {
Expand All @@ -372,8 +381,6 @@ func Open(
db.Controller = NewController(db)
db.Controller.Start()

go TaeMetricsTask(ctx)

// For debug or test
//fmt.Println(db.Catalog.SimplePPString(common.PPL3))
return
Expand All @@ -391,22 +398,6 @@ func Open(
// db.Catalog.RecurLoop(p)
// }

func TaeMetricsTask(ctx context.Context) {
logutil.Info("tae metrics task started")
defer logutil.Info("tae metrics task exit")

timer := time.NewTicker(time.Second * 10)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
mpoolAllocatorSubTask()
}
}

}

func mpoolAllocatorSubTask() {
v2.MemTAEDefaultAllocatorGauge.Set(float64(common.DefaultAllocator.CurrNB()))
v2.MemTAEDefaultHighWaterMarkGauge.Set(float64(common.DefaultAllocator.Stats().HighWaterMark.Load()))
Expand Down

0 comments on commit 31d5996

Please sign in to comment.