From 28aa4cb7f382ce1e277d9e1f4ea875a67c75e084 Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Wed, 11 Dec 2024 10:51:33 +0800 Subject: [PATCH 1/9] add temp code for stats --- pkg/perfcounter/context.go | 6 ++++++ pkg/util/trace/impl/motrace/statistic/stats_array.go | 8 ++++++++ pkg/vm/engine/disttae/stats.go | 12 ++++++++++++ 3 files changed, 26 insertions(+) diff --git a/pkg/perfcounter/context.go b/pkg/perfcounter/context.go index cccb5e0e49318..6f7887c7c4109 100644 --- a/pkg/perfcounter/context.go +++ b/pkg/perfcounter/context.go @@ -134,3 +134,9 @@ type ExecPipelineMarkKey struct{} func AttachExecPipelineKey(ctx context.Context, counter *CounterSet) context.Context { return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) } + +type StatsInBuildPlan struct{} + +func AttachStatsInBuildPlan(ctx context.Context, counter *CounterSet) context.Context { + return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) +} diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 2cfd8e2f3d7c6..727780eb1293d 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -296,6 +296,7 @@ type StatsInfo struct { BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` // The following attributes belong to independent statistics during the `buildPlan` stage, only for analysis reference. BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns + BUildPlanStatsIODuration int64 `json:"BuildPlanStatsIODuration"` // unit: ns BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns } @@ -514,6 +515,13 @@ func (stats *StatsInfo) AddBuildPlanStatsConsumption(d time.Duration) { atomic.AddInt64(&stats.PlanStage.BuildPlanStatsDuration, int64(d)) } +func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.BUildPlanStatsIODuration, int64(d)) +} + func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) { if stats == nil { return diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index bba9ec494928c..a5ef541d65cfa 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -31,9 +31,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/query" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/queryservice/client" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" ) @@ -279,6 +281,16 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) return info } + if ok = ctx.Value(perfcounter.BuildPlanMarkKey{}); ok { + + } + + stats := statistic.StatsInfoFromContext(ctx) + start := time.Now() + defer func() { + stats.AddBuildPlanStatsIOConsumption(time.Since(start)) + }() + // Get stats info from remote node. if gs.KeyRouter != nil { client := gs.engine.qc From ad7cdf882037aa240e43547450dac477f768b47d Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Wed, 11 Dec 2024 19:38:30 +0800 Subject: [PATCH 2/9] Build Plan for CalcStats to calculate the waiting time for Stats statistics --- pkg/frontend/compiler_context.go | 2 ++ pkg/frontend/mysql_cmd_executor.go | 7 +++++-- pkg/perfcounter/context.go | 19 ++++++++++++++++--- pkg/sql/compile/analyze_module.go | 10 ++++++---- pkg/sql/compile/sql_executor_context.go | 8 +++++++- pkg/sql/models/show_phyplan.go | 10 ++++++---- pkg/sql/plan/order_binder.go | 2 +- .../impl/motrace/statistic/stats_array.go | 4 ++-- pkg/vm/engine/disttae/stats.go | 12 ++++-------- 9 files changed, 49 insertions(+), 25 deletions(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index 622884fe4e365..edeb9d16ed74c 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -909,6 +909,7 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps return cached, err } newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return cached, err @@ -928,6 +929,7 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps } else { crs := new(perfcounter.CounterSet) newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) statsInfo, err = table.Stats(newCtx, true) if err != nil { diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index 15525b9fd65b4..2c98eb343565b 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -3732,12 +3732,14 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt int64(statsInfo.PlanStage.PlanDuration) + int64(statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + - statsInfo.PrepareRunStage.CompilePreRunOnceDuration - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) if totalTime < 0 { if !h.isInternalSubStmt { - ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d) = %d]", + ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d) = %d]", uuid.UUID(h.stmt.StatementID).String(), h.stmt.StatementType, statsInfo.ParseStage.ParseDuration, @@ -3746,6 +3748,7 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt operatorTimeConsumed, statsInfo.PrepareRunStage.ScopePrepareDuration+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption, totalTime, diff --git a/pkg/perfcounter/context.go b/pkg/perfcounter/context.go index 6f7887c7c4109..15d8c1e864d58 100644 --- a/pkg/perfcounter/context.go +++ b/pkg/perfcounter/context.go @@ -135,8 +135,21 @@ func AttachExecPipelineKey(ctx context.Context, counter *CounterSet) context.Con return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) } -type StatsInBuildPlan struct{} +// ------------------------------------------------------------------------------------------------ +type CalcTableStatsKey struct{} -func AttachStatsInBuildPlan(ctx context.Context, counter *CounterSet) context.Context { - return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) +func AttachCalcTableStatsKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableStatsKey{}, true) +} + +type CalcTableSizeKey struct{} + +func AttachCalcTableSizeKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableSizeKey{}, true) +} + +type CalcTableRowsKey struct{} + +func AttachCalcTableRowsKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableRowsKey{}, true) } diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index e4696a3e0fa1f..ee3cd72566868 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -511,25 +511,26 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S cpuTimeVal := gblStats.OperatorTimeConsumed + int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) buffer.WriteString("\tCPU Usage: \n") buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal)) - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n", + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n", statsInfo.ParseStage.ParseDuration, statsInfo.PlanStage.PlanDuration, statsInfo.CompileStage.CompileDuration, gblStats.OperatorTimeConsumed, gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)) //------------------------------------------------------------------------------------------------------- if option.Analyze { buffer.WriteString("\tQuery Build Plan Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanS3Request.List, statsInfo.PlanStage.BuildPlanS3Request.Head, @@ -538,8 +539,9 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S statsInfo.PlanStage.BuildPlanS3Request.Delete, statsInfo.PlanStage.BuildPlanS3Request.DeleteMul, )) + buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) - + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index bca4c3b1765d2..293e3ab7b7e07 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -117,6 +118,10 @@ func (c *compilerContext) ResolveAccountIds(accountNames []string) ([]uint32, er } func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*pb.StatsInfo, error) { + stats := statistic.StatsInfoFromContext(c.GetContext()) + start := time.Now() + defer stats.AddBuildPlanStatsConsumption(time.Since(start)) + dbName := obj.GetSchemaName() tableName := obj.GetObjName() @@ -147,7 +152,6 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* } } var statsInfo *pb.StatsInfo - stats := statistic.StatsInfoFromContext(ctx) // This is a partition table. if partitionInfo != nil { crs := new(perfcounter.CounterSet) @@ -158,6 +162,7 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* return nil, err } newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return nil, err @@ -176,6 +181,7 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* } else { crs := new(perfcounter.CounterSet) newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) statsInfo, err = table.Stats(newCtx, true) if err != nil { diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index 9384f79191a24..067e58735802e 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -88,25 +88,26 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio cpuTimeVal := gblStats.OperatorTimeConsumed + int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) buffer.WriteString("\tCPU Usage: \n") buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal)) - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n", + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n", statsInfo.ParseStage.ParseDuration, statsInfo.PlanStage.PlanDuration, statsInfo.CompileStage.CompileDuration, gblStats.OperatorTimeConsumed, gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)) //------------------------------------------------------------------------------------------------------- if option == AnalyzeOption { buffer.WriteString("\tQuery Build Plan Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanS3Request.List, statsInfo.PlanStage.BuildPlanS3Request.Head, @@ -115,8 +116,9 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio statsInfo.PlanStage.BuildPlanS3Request.Delete, statsInfo.PlanStage.BuildPlanS3Request.DeleteMul, )) + buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) - + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) diff --git a/pkg/sql/plan/order_binder.go b/pkg/sql/plan/order_binder.go index 7c7e75455640b..18a6cb3ab1e14 100644 --- a/pkg/sql/plan/order_binder.go +++ b/pkg/sql/plan/order_binder.go @@ -38,7 +38,7 @@ func (b *OrderBinder) BindExpr(astExpr tree.Expr) (*plan.Expr, error) { for _, selectField := range b.ctx.projectByAst { if selectField.aliasName != "" { continue - } + }// deptno if projectField, ok1 := selectField.ast.(*tree.UnresolvedName); ok1 && projectField.ColName() == colRef.ColName() { return nil, moerr.NewInvalidInputf(b.GetContext(), "Column '%s' in order clause is ambiguous", colRef.ColName()) } diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 727780eb1293d..92859aa713687 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -296,7 +296,7 @@ type StatsInfo struct { BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` // The following attributes belong to independent statistics during the `buildPlan` stage, only for analysis reference. BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns - BUildPlanStatsIODuration int64 `json:"BuildPlanStatsIODuration"` // unit: ns + BuildPlanStatsIOConsumption int64 `json:"BuildPlanStatsIOConsumption"` // unit: ns BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns } @@ -519,7 +519,7 @@ func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) { if stats == nil { return } - atomic.AddInt64(&stats.PlanStage.BUildPlanStatsIODuration, int64(d)) + atomic.AddInt64(&stats.PlanStage.BuildPlanStatsIOConsumption, int64(d)) } func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) { diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index a5ef541d65cfa..641d52c1703fc 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -281,16 +281,12 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) return info } - if ok = ctx.Value(perfcounter.BuildPlanMarkKey{}); ok { - + if _, ok = ctx.Value(perfcounter.CalcTableStatsKey{}).(bool); ok { + stats := statistic.StatsInfoFromContext(ctx) + start := time.Now() + defer stats.AddBuildPlanStatsIOConsumption(time.Since(start)) } - stats := statistic.StatsInfoFromContext(ctx) - start := time.Now() - defer func() { - stats.AddBuildPlanStatsIOConsumption(time.Since(start)) - }() - // Get stats info from remote node. if gs.KeyRouter != nil { client := gs.engine.qc From 5953c49f5e789505ba47a91b4570dbee563df819 Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Mon, 16 Dec 2024 11:55:02 +0800 Subject: [PATCH 3/9] update code --- pkg/vm/engine/disttae/stats.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 641d52c1703fc..4b11a6f86cb42 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -284,7 +284,9 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) if _, ok = ctx.Value(perfcounter.CalcTableStatsKey{}).(bool); ok { stats := statistic.StatsInfoFromContext(ctx) start := time.Now() - defer stats.AddBuildPlanStatsIOConsumption(time.Since(start)) + defer func() { + stats.AddBuildPlanStatsIOConsumption(time.Since(start)) + }() } // Get stats info from remote node. From 3f25f5ef82986f5682e1cf0504c330666d8eec09 Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Tue, 17 Dec 2024 09:39:16 +0800 Subject: [PATCH 4/9] delete unused code --- pkg/sql/plan/order_binder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/plan/order_binder.go b/pkg/sql/plan/order_binder.go index 18a6cb3ab1e14..7c7e75455640b 100644 --- a/pkg/sql/plan/order_binder.go +++ b/pkg/sql/plan/order_binder.go @@ -38,7 +38,7 @@ func (b *OrderBinder) BindExpr(astExpr tree.Expr) (*plan.Expr, error) { for _, selectField := range b.ctx.projectByAst { if selectField.aliasName != "" { continue - }// deptno + } if projectField, ok1 := selectField.ast.(*tree.UnresolvedName); ok1 && projectField.ColName() == colRef.ColName() { return nil, moerr.NewInvalidInputf(b.GetContext(), "Column '%s' in order clause is ambiguous", colRef.ColName()) } From dce20902526841b8ad53fab431bc998247a0c6f6 Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Tue, 24 Dec 2024 10:04:01 +0800 Subject: [PATCH 5/9] Statistics on Stats S3 resource consumption2 --- pkg/frontend/compiler_context.go | 47 +++++++++++++------------ pkg/frontend/mysql_cmd_executor.go | 7 ++-- pkg/sql/compile/analyze_module.go | 9 +++++ pkg/sql/compile/sql_executor_context.go | 46 ++++++++++++------------ pkg/sql/models/show_phyplan.go | 9 +++++ 5 files changed, 72 insertions(+), 46 deletions(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index edeb9d16ed74c..558e3ece23866 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -901,15 +901,17 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { - crs := new(perfcounter.CounterSet) + //crs := new(perfcounter.CounterSet) statsInfo = plan2.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := tcc.getRelation(dbName, partitionTable, sub, snapshot) if err != nil { return cached, err } - newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) - newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + //newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + //newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + + newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return cached, err @@ -917,33 +919,34 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps statsInfo.Merge(parStats) } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } else { - crs := new(perfcounter.CounterSet) - newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + //crs := new(perfcounter.CounterSet) + //newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + //newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return cached, err } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } if statsInfo != nil { diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index 2c98eb343565b..922f07bc1e785 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -1989,10 +1989,13 @@ func buildPlan(reqCtx context.Context, ses FeSession, ctx plan2.CompilerContext, v2.TxnStatementBuildPlanDurationHistogram.Observe(cost.Seconds()) }() - stats := statistic.StatsInfoFromContext(reqCtx) + // NOTE: The context used by buildPlan comes from the CompilerContext object + planContext := ctx.GetContext() + stats := statistic.StatsInfoFromContext(planContext) stats.PlanStart() crs := new(perfcounter.CounterSet) - reqCtx = perfcounter.AttachBuildPlanMarkKey(reqCtx, crs) + planContext = perfcounter.AttachBuildPlanMarkKey(planContext, crs) + ctx.SetContext(planContext) defer func() { stats.AddBuildPlanS3Request(statistic.S3Request{ List: crs.FileService.S3.List.Load(), diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index ee3cd72566868..2f40f7014e431 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -542,6 +542,15 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", + statsInfo.PlanStage.BuildPlanStatsS3.List, + statsInfo.PlanStage.BuildPlanStatsS3.Head, + statsInfo.PlanStage.BuildPlanStatsS3.Put, + statsInfo.PlanStage.BuildPlanStatsS3.Get, + statsInfo.PlanStage.BuildPlanStatsS3.Delete, + statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul, + )) + //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index 293e3ab7b7e07..909cf4ac61de6 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -154,15 +154,16 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { - crs := new(perfcounter.CounterSet) + //crs := new(perfcounter.CounterSet) statsInfo = plan.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := c.getRelation(dbName, partitionTable, snapshot) if err != nil { return nil, err } - newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) - newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + //newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + //newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return nil, err @@ -170,32 +171,33 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* statsInfo.Merge(parStats) } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } else { - crs := new(perfcounter.CounterSet) - newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + //crs := new(perfcounter.CounterSet) + //newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + //newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return nil, err } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } return statsInfo, nil } diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index 067e58735802e..503a7cf5be732 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -119,6 +119,15 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", + statsInfo.PlanStage.BuildPlanStatsS3.List, + statsInfo.PlanStage.BuildPlanStatsS3.Head, + statsInfo.PlanStage.BuildPlanStatsS3.Put, + statsInfo.PlanStage.BuildPlanStatsS3.Get, + statsInfo.PlanStage.BuildPlanStatsS3.Delete, + statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul, + )) + //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) From 57de3d56fd7bd636763c9afc38088c651b382e1e Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Tue, 24 Dec 2024 10:31:18 +0800 Subject: [PATCH 6/9] Statistics on Stats S3 resource consumption --- pkg/pb/statsinfo/statsinfo.go | 6 ++ pkg/sql/compile/sql_executor_context.go | 4 +- pkg/vm/engine/disttae/stats.go | 77 +++++++++++++++++-------- pkg/vm/engine/disttae/stats_test.go | 11 ++-- 4 files changed, 68 insertions(+), 30 deletions(-) diff --git a/pkg/pb/statsinfo/statsinfo.go b/pkg/pb/statsinfo/statsinfo.go index 1967584ed4517..4b77b48dbb5ec 100644 --- a/pkg/pb/statsinfo/statsinfo.go +++ b/pkg/pb/statsinfo/statsinfo.go @@ -15,11 +15,17 @@ package statsinfo import ( + "context" "math" "golang.org/x/exp/constraints" ) +type StatsInfoKeyV2 struct { + Ctx context.Context + Key StatsInfoKey +} + func (sc *StatsInfo) NeedUpdate(currentApproxObjNum int64) bool { if sc.ApproxObjectNumber == 0 || sc.AccurateObjectNumber == 0 { return true diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index 909cf4ac61de6..3afa9a2aeb361 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -120,7 +120,9 @@ func (c *compilerContext) ResolveAccountIds(accountNames []string) ([]uint32, er func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*pb.StatsInfo, error) { stats := statistic.StatsInfoFromContext(c.GetContext()) start := time.Now() - defer stats.AddBuildPlanStatsConsumption(time.Since(start)) + defer func() { + stats.AddBuildPlanStatsConsumption(time.Since(start)) + }() dbName := obj.GetSchemaName() tableName := obj.GetObjName() diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 4b11a6f86cb42..b9936a162f704 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -144,7 +144,7 @@ func WithUpdateWorkerFactor(f int) GlobalStatsOption { } // WithStatsUpdater set the update function to update stats info. -func WithStatsUpdater(f func(pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { +func WithStatsUpdater(f func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { return func(s *GlobalStats) { s.statsUpdater = f } @@ -176,7 +176,7 @@ type GlobalStats struct { // TODO(volgariver6): add metrics of the chan length. tailC chan *logtail.TableLogtail - updateC chan pb.StatsInfoKey + updateC chan pb.StatsInfoKeyV2 updatingMu struct { sync.Mutex @@ -216,7 +216,7 @@ type GlobalStats struct { // statsUpdate is the function which updates the stats info. // If it is nil, set it to doUpdate. - statsUpdater func(pb.StatsInfoKey, *pb.StatsInfo) bool + statsUpdater func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool // for test only currently. approxObjectNumUpdater func() int64 } @@ -228,7 +228,7 @@ func NewGlobalStats( ctx: ctx, engine: e, tailC: make(chan *logtail.TableLogtail, 10000), - updateC: make(chan pb.StatsInfoKey, 3000), + updateC: make(chan pb.StatsInfoKeyV2, 3000), logtailUpdate: newLogtailUpdate(), tableLogtailCounter: make(map[pb.StatsInfoKey]int64), KeyRouter: keyRouter, @@ -270,12 +270,22 @@ func (gs *GlobalStats) checkTriggerCond(key pb.StatsInfoKey, entryNum int64) boo } func (gs *GlobalStats) PrefetchTableMeta(ctx context.Context, key pb.StatsInfoKey) bool { - return gs.triggerUpdate(key, false) + wrapkey := pb.StatsInfoKeyV2{ + Ctx: ctx, + Key: key, + } + return gs.triggerUpdate(wrapkey, false) } func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo { gs.mu.Lock() defer gs.mu.Unlock() + + wrapkey := pb.StatsInfoKeyV2{ + Ctx: ctx, + Key: key, + } + info, ok := gs.mu.statsInfoMap[key] if ok && info != nil { return info @@ -324,7 +334,7 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) // If the trigger condition is not satisfied, the stats will not be updated // for long time. So we trigger the update here to get the stats info as soon // as possible. - gs.triggerUpdate(key, true) + gs.triggerUpdate(wrapkey, true) }() info, ok = gs.mu.statsInfoMap[key] @@ -385,7 +395,7 @@ func (gs *GlobalStats) consumeWorker(ctx context.Context) { return case tail := <-gs.tailC: - gs.consumeLogtail(tail) + gs.consumeLogtail(ctx, tail) } } } @@ -406,7 +416,7 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) { } } -func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKey, force bool) bool { +func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKeyV2, force bool) bool { if force { gs.updateC <- key v2.StatsTriggerForcedCounter.Add(1) @@ -422,15 +432,21 @@ func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKey, force bool) bool { } } -func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { +func (gs *GlobalStats) consumeLogtail(ctx context.Context, tail *logtail.TableLogtail) { key := pb.StatsInfoKey{ AccId: tail.Table.AccId, DatabaseID: tail.Table.DbId, TableID: tail.Table.TbId, } + + wrapkey := pb.StatsInfoKeyV2{ + Ctx: ctx, + Key: key, + } + if len(tail.CkpLocation) > 0 { if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } } else if tail.Table != nil { var triggered bool @@ -438,7 +454,7 @@ func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { if logtailreplay.IsMetaEntry(cmd.TableName) { triggered = true if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } break } @@ -451,7 +467,7 @@ func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { if !triggered && gs.checkTriggerCond(key, gs.tableLogtailCounter[key]) { gs.tableLogtailCounter[key] = 0 if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } } } @@ -610,38 +626,51 @@ func (gs *GlobalStats) broadcastStats(key pb.StatsInfoKey) { }) } -func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKey) { - if !gs.shouldUpdate(key) { +func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKeyV2) { + statser := statistic.StatsInfoFromContext(key.Ctx) + crs := new(perfcounter.CounterSet) + + if !gs.shouldUpdate(key.Key) { return } // wait until the table's logtail has been updated. - gs.waitLogtailUpdated(key.TableID) + gs.waitLogtailUpdated(key.Key.TableID) // updated is used to mark that the stats info is updated. var updated bool stats := plan2.NewStatsInfo() + + newCtx := perfcounter.AttachS3RequestKey(key.Ctx, crs) if gs.statsUpdater != nil { - updated = gs.statsUpdater(key, stats) - } + updated = gs.statsUpdater(newCtx, key.Key, stats) + } + statser.AddBuildPlanStatsS3Request(statistic.S3Request{ + List: crs.FileService.S3.List.Load(), + Head: crs.FileService.S3.Head.Load(), + Put: crs.FileService.S3.Put.Load(), + Get: crs.FileService.S3.Get.Load(), + Delete: crs.FileService.S3.Delete.Load(), + DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + }) gs.mu.Lock() defer gs.mu.Unlock() if updated { - gs.mu.statsInfoMap[key] = stats - gs.broadcastStats(key) - } else if _, ok := gs.mu.statsInfoMap[key]; !ok { - gs.mu.statsInfoMap[key] = nil + gs.mu.statsInfoMap[key.Key] = stats + gs.broadcastStats(key.Key) + } else if _, ok := gs.mu.statsInfoMap[key.Key]; !ok { + gs.mu.statsInfoMap[key.Key] = nil } // Notify all the waiters to read the new stats info. gs.mu.cond.Broadcast() - gs.doneUpdate(key, updated) + gs.doneUpdate(key.Key, updated) } -func (gs *GlobalStats) doUpdate(key pb.StatsInfoKey, stats *pb.StatsInfo) bool { +func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats *pb.StatsInfo) bool { table := gs.engine.GetLatestCatalogCache().GetTableById(key.AccId, key.DatabaseID, key.TableID) // table or its definition is nil, means that the table is created but not committed yet. if table == nil || table.TableDef == nil { @@ -666,7 +695,7 @@ func (gs *GlobalStats) doUpdate(key pb.StatsInfoKey, stats *pb.StatsInfo) bool { approxObjectNum, stats, ) - if err := UpdateStats(gs.ctx, req, gs.concurrentExecutor); err != nil { + if err := UpdateStats(ctx, req, gs.concurrentExecutor); err != nil { logutil.Errorf("failed to init stats info for table %v, err: %v", key, err) return false } diff --git a/pkg/vm/engine/disttae/stats_test.go b/pkg/vm/engine/disttae/stats_test.go index 48d89fad6b2cc..354e0c12e6724 100644 --- a/pkg/vm/engine/disttae/stats_test.go +++ b/pkg/vm/engine/disttae/stats_test.go @@ -23,6 +23,8 @@ import ( "time" "github.com/lni/goutils/leaktest" + "github.com/stretchr/testify/assert" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/clusterservice" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -32,7 +34,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" - "github.com/stretchr/testify/assert" ) func TestGetStats(t *testing.T) { @@ -41,7 +42,7 @@ func TestGetStats(t *testing.T) { defer cancel() gs := NewGlobalStats(ctx, nil, nil, WithUpdateWorkerFactor(4), - WithStatsUpdater(func(key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { + WithStatsUpdater(func(_ context.Context, key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { info.BlockNumber = 20 return true }), @@ -161,7 +162,7 @@ func TestUpdateStats(t *testing.T) { TableID: 1001, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.False(t, updated) }) }) @@ -180,7 +181,7 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.False(t, updated) }) }) @@ -199,7 +200,7 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.True(t, updated) }, WithApproxObjectNumUpdater(func() int64 { return 10 From b5cd997db0fca726d026f3e02ed6e9d37b39c5ae Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Tue, 24 Dec 2024 12:06:41 +0800 Subject: [PATCH 7/9] optimized code --- pkg/frontend/compiler_context.go | 26 ------------------- pkg/pb/statsinfo/statsinfo.go | 5 +++- pkg/sql/compile/sql_executor_context.go | 25 +----------------- pkg/vm/engine/disttae/stats.go | 34 ++++++++++++------------- 4 files changed, 22 insertions(+), 68 deletions(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index 558e3ece23866..77e13b6e51a29 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -901,15 +901,12 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { - //crs := new(perfcounter.CounterSet) statsInfo = plan2.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := tcc.getRelation(dbName, partitionTable, sub, snapshot) if err != nil { return cached, err } - //newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) - //newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) @@ -918,35 +915,12 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps } statsInfo.Merge(parStats) } - - //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - // List: crs.FileService.S3.List.Load(), - // Head: crs.FileService.S3.Head.Load(), - // Put: crs.FileService.S3.Put.Load(), - // Get: crs.FileService.S3.Get.Load(), - // Delete: crs.FileService.S3.Delete.Load(), - // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - //}) - } else { - //crs := new(perfcounter.CounterSet) - //newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - //newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) - newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return cached, err } - - //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - // List: crs.FileService.S3.List.Load(), - // Head: crs.FileService.S3.Head.Load(), - // Put: crs.FileService.S3.Put.Load(), - // Get: crs.FileService.S3.Get.Load(), - // Delete: crs.FileService.S3.Delete.Load(), - // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - //}) } if statsInfo != nil { diff --git a/pkg/pb/statsinfo/statsinfo.go b/pkg/pb/statsinfo/statsinfo.go index 4b77b48dbb5ec..f336c50fb8d07 100644 --- a/pkg/pb/statsinfo/statsinfo.go +++ b/pkg/pb/statsinfo/statsinfo.go @@ -21,7 +21,10 @@ import ( "golang.org/x/exp/constraints" ) -type StatsInfoKeyV2 struct { +// StatsInfoKeyWithContext associates a statistics key with a context. +// This struct is used to tie a statistics key (Key) with the context (Ctx) during an operation, +// allowing context-related actions and management while handling statistics information. +type StatsInfoKeyWithContext struct { Ctx context.Context Key StatsInfoKey } diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index 3afa9a2aeb361..c7f89d15d3de0 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -156,15 +156,13 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { - //crs := new(perfcounter.CounterSet) statsInfo = plan.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := c.getRelation(dbName, partitionTable, snapshot) if err != nil { return nil, err } - //newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) - //newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { @@ -173,33 +171,12 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* statsInfo.Merge(parStats) } - //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - // List: crs.FileService.S3.List.Load(), - // Head: crs.FileService.S3.Head.Load(), - // Put: crs.FileService.S3.Put.Load(), - // Get: crs.FileService.S3.Get.Load(), - // Delete: crs.FileService.S3.Delete.Load(), - // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - //}) } else { - //crs := new(perfcounter.CounterSet) - //newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - //newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) - newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return nil, err } - - //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - // List: crs.FileService.S3.List.Load(), - // Head: crs.FileService.S3.Head.Load(), - // Put: crs.FileService.S3.Put.Load(), - // Get: crs.FileService.S3.Get.Load(), - // Delete: crs.FileService.S3.Delete.Load(), - // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - //}) } return statsInfo, nil } diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index b9936a162f704..d26164e783cb7 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -176,7 +176,7 @@ type GlobalStats struct { // TODO(volgariver6): add metrics of the chan length. tailC chan *logtail.TableLogtail - updateC chan pb.StatsInfoKeyV2 + updateC chan pb.StatsInfoKeyWithContext updatingMu struct { sync.Mutex @@ -228,7 +228,7 @@ func NewGlobalStats( ctx: ctx, engine: e, tailC: make(chan *logtail.TableLogtail, 10000), - updateC: make(chan pb.StatsInfoKeyV2, 3000), + updateC: make(chan pb.StatsInfoKeyWithContext, 3000), logtailUpdate: newLogtailUpdate(), tableLogtailCounter: make(map[pb.StatsInfoKey]int64), KeyRouter: keyRouter, @@ -270,7 +270,7 @@ func (gs *GlobalStats) checkTriggerCond(key pb.StatsInfoKey, entryNum int64) boo } func (gs *GlobalStats) PrefetchTableMeta(ctx context.Context, key pb.StatsInfoKey) bool { - wrapkey := pb.StatsInfoKeyV2{ + wrapkey := pb.StatsInfoKeyWithContext{ Ctx: ctx, Key: key, } @@ -281,7 +281,7 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) gs.mu.Lock() defer gs.mu.Unlock() - wrapkey := pb.StatsInfoKeyV2{ + wrapkey := pb.StatsInfoKeyWithContext{ Ctx: ctx, Key: key, } @@ -416,7 +416,7 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) { } } -func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKeyV2, force bool) bool { +func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKeyWithContext, force bool) bool { if force { gs.updateC <- key v2.StatsTriggerForcedCounter.Add(1) @@ -439,7 +439,7 @@ func (gs *GlobalStats) consumeLogtail(ctx context.Context, tail *logtail.TableLo TableID: tail.Table.TbId, } - wrapkey := pb.StatsInfoKeyV2{ + wrapkey := pb.StatsInfoKeyWithContext{ Ctx: ctx, Key: key, } @@ -626,25 +626,25 @@ func (gs *GlobalStats) broadcastStats(key pb.StatsInfoKey) { }) } -func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKeyV2) { - statser := statistic.StatsInfoFromContext(key.Ctx) +func (gs *GlobalStats) updateTableStats(warpKey pb.StatsInfoKeyWithContext) { + statser := statistic.StatsInfoFromContext(warpKey.Ctx) crs := new(perfcounter.CounterSet) - if !gs.shouldUpdate(key.Key) { + if !gs.shouldUpdate(warpKey.Key) { return } // wait until the table's logtail has been updated. - gs.waitLogtailUpdated(key.Key.TableID) + gs.waitLogtailUpdated(warpKey.Key.TableID) // updated is used to mark that the stats info is updated. var updated bool stats := plan2.NewStatsInfo() - newCtx := perfcounter.AttachS3RequestKey(key.Ctx, crs) + newCtx := perfcounter.AttachS3RequestKey(warpKey.Ctx, crs) if gs.statsUpdater != nil { - updated = gs.statsUpdater(newCtx, key.Key, stats) + updated = gs.statsUpdater(newCtx, warpKey.Key, stats) } statser.AddBuildPlanStatsS3Request(statistic.S3Request{ List: crs.FileService.S3.List.Load(), @@ -658,16 +658,16 @@ func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKeyV2) { gs.mu.Lock() defer gs.mu.Unlock() if updated { - gs.mu.statsInfoMap[key.Key] = stats - gs.broadcastStats(key.Key) - } else if _, ok := gs.mu.statsInfoMap[key.Key]; !ok { - gs.mu.statsInfoMap[key.Key] = nil + gs.mu.statsInfoMap[warpKey.Key] = stats + gs.broadcastStats(warpKey.Key) + } else if _, ok := gs.mu.statsInfoMap[warpKey.Key]; !ok { + gs.mu.statsInfoMap[warpKey.Key] = nil } // Notify all the waiters to read the new stats info. gs.mu.cond.Broadcast() - gs.doneUpdate(key.Key, updated) + gs.doneUpdate(warpKey.Key, updated) } func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats *pb.StatsInfo) bool { From b5105ab9f92474b1cddfcc1a0a057fd48c9bf60f Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Tue, 24 Dec 2024 14:15:24 +0800 Subject: [PATCH 8/9] update ut test case --- pkg/sql/plan/function/func_mo_explain_phy_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/sql/plan/function/func_mo_explain_phy_test.go b/pkg/sql/plan/function/func_mo_explain_phy_test.go index 11dea2fb56550..c836505f95688 100644 --- a/pkg/sql/plan/function/func_mo_explain_phy_test.go +++ b/pkg/sql/plan/function/func_mo_explain_phy_test.go @@ -921,7 +921,7 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0, S3InputEstByRows((0+0)/8192):0.0000 CPU Usage: - Total CPU Time: 91033157ns - - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-IOAccess(214917833)-IOMerge(0) + - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-PlanStatsIO(0)-IOAccess(214917833)-IOMerge(0) Physical Plan Deployment: LOCAL SCOPES: Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) @@ -978,11 +978,14 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0, S3InputEstByRows((0+0)/8192):0.0000 CPU Usage: - Total CPU Time: 91033157ns - - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-IOAccess(214917833)-IOMerge(0) + - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-PlanStatsIO(0)-IOAccess(214917833)-IOMerge(0) Query Build Plan Stage: - CPU Time: 649910ns - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 + - Build Plan Duration: 649910ns - Call Stats Duration: 3457758ns + - Call Stats IO Consumption: 0ns + - Call Stats S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 Query Compile Stage: - CPU Time: 299370ns - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 From ce8642e737ee6c3f085e01e58d2c4989edadb2db Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Tue, 24 Dec 2024 17:35:07 +0800 Subject: [PATCH 9/9] Supplement the execution time statistics of statsInCache() function --- pkg/frontend/compiler_context.go | 10 +++++++-- pkg/sql/compile/analyze_module.go | 1 + pkg/sql/models/show_phyplan.go | 1 + .../plan/function/func_mo_explain_phy_test.go | 1 + .../impl/motrace/statistic/stats_array.go | 22 +++++++++++++------ 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index 77e13b6e51a29..c0a05f0452ead 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -842,11 +842,11 @@ func (tcc *TxnCompilerContext) GetPrimaryKeyDef(dbName string, tableName string, } func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snapshot) (*pb.StatsInfo, error) { - stats := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) + statser := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) start := time.Now() defer func() { v2.TxnStatementStatsDurationHistogram.Observe(time.Since(start).Seconds()) - stats.AddBuildPlanStatsConsumption(time.Since(start)) + statser.AddBuildPlanStatsConsumption(time.Since(start)) }() dbName := obj.GetSchemaName() @@ -937,6 +937,12 @@ func (tcc *TxnCompilerContext) UpdateStatsInCache(tid uint64, s *pb.StatsInfo) { // statsInCache get the *pb.StatsInfo from session cache. If the info is nil, just return nil and false, // else, check if the info needs to be updated. func (tcc *TxnCompilerContext) statsInCache(ctx context.Context, dbName string, table engine.Relation, snapshot *plan2.Snapshot) (*pb.StatsInfo, bool) { + statser := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) + start := time.Now() + defer func() { + statser.AddStatsStatsInCacheDuration(time.Since(start)) + }() + s := tcc.GetStatsCache().GetStatsInfo(table.GetTableID(ctx), true) if s == nil { return nil, false diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index 2f40f7014e431..721714902871a 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -541,6 +541,7 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S )) buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call StatsInCache Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsInCacheDuration)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanStatsS3.List, diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index 503a7cf5be732..4ae57b193ff28 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -118,6 +118,7 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio )) buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call StatsInCache Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsInCacheDuration)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanStatsS3.List, diff --git a/pkg/sql/plan/function/func_mo_explain_phy_test.go b/pkg/sql/plan/function/func_mo_explain_phy_test.go index c836505f95688..9dc1b06d053a7 100644 --- a/pkg/sql/plan/function/func_mo_explain_phy_test.go +++ b/pkg/sql/plan/function/func_mo_explain_phy_test.go @@ -984,6 +984,7 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 - Build Plan Duration: 649910ns - Call Stats Duration: 3457758ns + - Call StatsInCache Duration: 0ns - Call Stats IO Consumption: 0ns - Call Stats S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 Query Compile Stage: diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 92859aa713687..39ad628a5c747 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -290,14 +290,15 @@ type StatsInfo struct { // Planning Phase Statistics PlanStage struct { - PlanDuration time.Duration `json:"PlanDuration"` - PlanStartTime time.Time `json:"PlanStartTime"` - BuildPlanS3Request S3Request `json:"BuildPlanS3Request"` - BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` + PlanDuration time.Duration `json:"PlanDuration"` + PlanStartTime time.Time `json:"PlanStartTime"` + BuildPlanS3Request S3Request `json:"BuildPlanS3Request"` + BuildPlanStatsIOConsumption int64 `json:"BuildPlanStatsIOConsumption"` // unit: ns // The following attributes belong to independent statistics during the `buildPlan` stage, only for analysis reference. - BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns - BuildPlanStatsIOConsumption int64 `json:"BuildPlanStatsIOConsumption"` // unit: ns - BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns + BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` + BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns + BuildPlanStatsInCacheDuration int64 `json:"BuildPlanStatsInCacheDuration"` // unit: ns + BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns } // Compile phase statistics @@ -522,6 +523,13 @@ func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) { atomic.AddInt64(&stats.PlanStage.BuildPlanStatsIOConsumption, int64(d)) } +func (stats *StatsInfo) AddStatsStatsInCacheDuration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.BuildPlanStatsInCacheDuration, int64(d)) +} + func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) { if stats == nil { return