From 31d5996e6a38c032b0c0f4b51ad9012ff75a85b1 Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Tue, 10 Dec 2024 17:36:13 +0800 Subject: [PATCH] small code refactor (#20686) more detailed log Approved by: @triump2020 --- pkg/vm/engine/disttae/txn_table.go | 17 ++++-------- pkg/vm/engine/engine_util/reader.go | 12 +++++++- pkg/vm/engine/tae/db/db.go | 4 +-- pkg/vm/engine/tae/db/open.go | 43 ++++++++++++----------------- 4 files changed, 35 insertions(+), 41 deletions(-) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index a6ffb6312a0b5..498689afbe0ad 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -613,26 +613,19 @@ func (tbl *txnTable) doRanges(ctx context.Context, rangesParam engine.RangesPara tbl.enableLogFilterExpr.Store(true) } - if tbl.enableLogFilterExpr.Load() { + if ok, _ := objectio.RangesLogInjected(tbl.db.databaseName, tbl.tableDef.Name); ok || + err != nil || + tbl.enableLogFilterExpr.Load() || + cost > 5*time.Second { logutil.Info( "TXN-FILTER-RANGE-LOG", zap.String("name", tbl.tableDef.Name), zap.String("exprs", plan2.FormatExprs(rangesParam.BlockFilters)), - zap.Int("ranges-len", blocks.Len()), - zap.Uint64("tbl-id", tbl.tableId), - zap.String("txn", tbl.db.op.Txn().DebugString()), - ) - } - - if ok, _ := objectio.RangesLogInjected(tbl.db.databaseName, tbl.tableDef.Name); ok { - logutil.Info( - "INJECT-TRACE-RANGES", - zap.String("name", tbl.tableDef.Name), - zap.String("exprs", plan2.FormatExprs(rangesParam.BlockFilters)), zap.Uint64("tbl-id", tbl.tableId), zap.String("txn", tbl.db.op.Txn().DebugString()), zap.String("blocks", blocks.String()), zap.String("ps", fmt.Sprintf("%p", part)), + zap.Duration("cost", cost), zap.Error(err), ) } diff --git a/pkg/vm/engine/engine_util/reader.go b/pkg/vm/engine/engine_util/reader.go index af72dead65284..8ce9476d6792f 100644 --- a/pkg/vm/engine/engine_util/reader.go +++ b/pkg/vm/engine/engine_util/reader.go @@ -429,11 +429,18 @@ func (r *reader) Read( if isEnd { return } + blkStr := "nil" + if blkInfo != nil { + blkStr = blkInfo.String() + } if logLevel == 0 { logutil.Info( - "LOGREADER-INJECTED-1", + "DEBUG-SLOW-TXN-READER", zap.String("name", r.name), + zap.String("ts", r.ts.DebugString()), zap.Int("data-len", outBatch.RowCount()), + zap.Duration("duration", time.Since(start)), + zap.String("blk", blkStr), zap.Error(err), ) } else { @@ -444,8 +451,11 @@ func (r *reader) Read( logutil.Info( "LOGREADER-INJECTED-1", zap.String("name", r.name), + zap.String("ts", r.ts.DebugString()), + zap.Duration("duration", time.Since(start)), zap.Error(err), zap.String("data", common.MoBatchToString(outBatch, maxLogCnt)), + zap.String("blk", blkStr), ) } } diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 2626486bd97dc..26207ea09b355 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -77,7 +77,7 @@ type DB struct { LogtailMgr *logtail.Manager Wal wal.Driver - GCJobs *tasks.CancelableJobs + CronJobs *tasks.CancelableJobs BGScanner wb.IHeartbeater BGCheckpointRunner checkpoint.Runner @@ -273,7 +273,7 @@ func (db *DB) Close() error { } db.Closed.Store(ErrClosed) db.Controller.Stop() - db.GCJobs.Reset() + db.CronJobs.Reset() db.BGScanner.Stop() db.BGCheckpointRunner.Stop() db.Runtime.Scheduler.Stop() diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index c3aead017b9e8..df0ace26c563b 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -281,9 +281,9 @@ func Open( db.DiskCleaner = gc2.NewDiskCleaner(cleaner) db.DiskCleaner.Start() - db.GCJobs = tasks.NewCancelableJobs() + db.CronJobs = tasks.NewCancelableJobs() - db.GCJobs.AddJob( + db.CronJobs.AddJob( "GC-Transfer-Table", opts.CheckpointCfg.TransferInterval, func(ctx context.Context) { @@ -293,7 +293,7 @@ func Open( }, 1, ) - db.GCJobs.AddJob( + db.CronJobs.AddJob( "GC-Disk", opts.GCCfg.ScanGCInterval, func(ctx context.Context) { @@ -301,7 +301,7 @@ func Open( }, 1, ) - db.GCJobs.AddJob( + db.CronJobs.AddJob( "GC-Checkpoint", opts.CheckpointCfg.GCCheckpointInterval, func(ctx context.Context) { @@ -321,7 +321,7 @@ func Open( }, 1, ) - db.GCJobs.AddJob( + db.CronJobs.AddJob( "GC-Catalog-Cache", opts.CatalogCfg.GCInterval, func(ctx context.Context) { @@ -336,7 +336,7 @@ func Open( }, 1, ) - db.GCJobs.AddJob( + db.CronJobs.AddJob( "GC-Logtail", opts.CheckpointCfg.GCCheckpointInterval, func(ctx context.Context) { @@ -349,7 +349,7 @@ func Open( }, 1, ) - db.GCJobs.AddJob( + db.CronJobs.AddJob( "GC-LockMerge", options.DefaultLockMergePruneInterval, func(ctx context.Context) { @@ -358,8 +358,17 @@ func Open( 1, ) + db.CronJobs.AddJob( + "REPORT-MPOOL-STATS", + time.Second*10, + func(ctx context.Context) { + mpoolAllocatorSubTask() + }, + 1, + ) + if opts.CheckpointCfg.MetadataCheckInterval != 0 { - db.GCJobs.AddJob( + db.CronJobs.AddJob( "META-CHECK", opts.CheckpointCfg.MetadataCheckInterval, func(ctx context.Context) { @@ -372,8 +381,6 @@ func Open( db.Controller = NewController(db) db.Controller.Start() - go TaeMetricsTask(ctx) - // For debug or test //fmt.Println(db.Catalog.SimplePPString(common.PPL3)) return @@ -391,22 +398,6 @@ func Open( // db.Catalog.RecurLoop(p) // } -func TaeMetricsTask(ctx context.Context) { - logutil.Info("tae metrics task started") - defer logutil.Info("tae metrics task exit") - - timer := time.NewTicker(time.Second * 10) - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - mpoolAllocatorSubTask() - } - } - -} - func mpoolAllocatorSubTask() { v2.MemTAEDefaultAllocatorGauge.Set(float64(common.DefaultAllocator.CurrNB())) v2.MemTAEDefaultHighWaterMarkGauge.Set(float64(common.DefaultAllocator.Stats().HighWaterMark.Load()))