From 20981f09664548f01df53c9f9a943c095664870b Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Wed, 11 Dec 2024 10:51:33 +0800 Subject: [PATCH 1/9] add temp code for stats --- pkg/perfcounter/context.go | 6 ++++++ pkg/util/trace/impl/motrace/statistic/stats_array.go | 8 ++++++++ pkg/vm/engine/disttae/stats.go | 12 ++++++++++++ 3 files changed, 26 insertions(+) diff --git a/pkg/perfcounter/context.go b/pkg/perfcounter/context.go index cccb5e0e49318..6f7887c7c4109 100644 --- a/pkg/perfcounter/context.go +++ b/pkg/perfcounter/context.go @@ -134,3 +134,9 @@ type ExecPipelineMarkKey struct{} func AttachExecPipelineKey(ctx context.Context, counter *CounterSet) context.Context { return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) } + +type StatsInBuildPlan struct{} + +func AttachStatsInBuildPlan(ctx context.Context, counter *CounterSet) context.Context { + return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) +} diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 2cfd8e2f3d7c6..727780eb1293d 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -296,6 +296,7 @@ type StatsInfo struct { BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` // The following attributes belong to independent statistics during the `buildPlan` stage, only for analysis reference. BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns + BUildPlanStatsIODuration int64 `json:"BuildPlanStatsIODuration"` // unit: ns BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns } @@ -514,6 +515,13 @@ func (stats *StatsInfo) AddBuildPlanStatsConsumption(d time.Duration) { atomic.AddInt64(&stats.PlanStage.BuildPlanStatsDuration, int64(d)) } +func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.BUildPlanStatsIODuration, int64(d)) +} + func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) { if stats == nil { return diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 6f3d84de5356a..a0c80995efa74 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -31,9 +31,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/query" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" + "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/queryservice/client" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" ) @@ -252,6 +254,16 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) return info } + if ok = ctx.Value(perfcounter.BuildPlanMarkKey{}); ok { + + } + + stats := statistic.StatsInfoFromContext(ctx) + start := time.Now() + defer func() { + stats.AddBuildPlanStatsIOConsumption(time.Since(start)) + }() + // Get stats info from remote node. if gs.KeyRouter != nil { client := gs.engine.qc From 0371518a8c83005f92cd7f14f814180bbfd711d9 Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Wed, 11 Dec 2024 19:38:30 +0800 Subject: [PATCH 2/9] Build Plan for CalcStats to calculate the waiting time for Stats statistics --- pkg/frontend/compiler_context.go | 2 ++ pkg/frontend/mysql_cmd_executor.go | 12 ++++++++---- pkg/perfcounter/context.go | 19 ++++++++++++++++--- pkg/sql/compile/analyze_module.go | 10 ++++++---- pkg/sql/compile/sql_executor_context.go | 8 +++++++- pkg/sql/models/show_phyplan.go | 10 ++++++---- pkg/sql/plan/order_binder.go | 2 +- .../impl/motrace/statistic/stats_array.go | 4 ++-- pkg/vm/engine/disttae/stats.go | 12 ++++-------- 9 files changed, 52 insertions(+), 27 deletions(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index c0ef1a35beac7..9d2508716615c 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -867,6 +867,7 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps return cached, err } newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return cached, err @@ -886,6 +887,7 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps } else { crs := new(perfcounter.CounterSet) newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) statsInfo, err = table.Stats(newCtx, true) if err != nil { diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index 82abb71fd6d68..2080b0201e165 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -34,6 +34,9 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/google/uuid" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/clusterservice" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -71,8 +74,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/route" "github.com/matrixorigin/matrixone/pkg/vm/process" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) func createDropDatabaseErrorInfo() string { @@ -3722,12 +3723,14 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt int64(statsInfo.PlanStage.PlanDuration) + int64(statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + - statsInfo.PrepareRunStage.CompilePreRunOnceDuration - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) if totalTime < 0 { if !h.isInternalSubStmt { - ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d) = %d]", + ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d) = %d]", uuid.UUID(h.stmt.StatementID).String(), h.stmt.StatementType, statsInfo.ParseStage.ParseDuration, @@ -3736,6 +3739,7 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt operatorTimeConsumed, statsInfo.PrepareRunStage.ScopePrepareDuration+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption, totalTime, diff --git a/pkg/perfcounter/context.go b/pkg/perfcounter/context.go index 6f7887c7c4109..15d8c1e864d58 100644 --- a/pkg/perfcounter/context.go +++ b/pkg/perfcounter/context.go @@ -135,8 +135,21 @@ func AttachExecPipelineKey(ctx context.Context, counter *CounterSet) context.Con return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) } -type StatsInBuildPlan struct{} +// ------------------------------------------------------------------------------------------------ +type CalcTableStatsKey struct{} -func AttachStatsInBuildPlan(ctx context.Context, counter *CounterSet) context.Context { - return context.WithValue(ctx, ExecPipelineMarkKey{}, counter) +func AttachCalcTableStatsKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableStatsKey{}, true) +} + +type CalcTableSizeKey struct{} + +func AttachCalcTableSizeKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableSizeKey{}, true) +} + +type CalcTableRowsKey struct{} + +func AttachCalcTableRowsKey(ctx context.Context) context.Context { + return context.WithValue(ctx, CalcTableRowsKey{}, true) } diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index f9c8eabd0be4d..7046c02448c96 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -523,25 +523,26 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S cpuTimeVal := gblStats.OperatorTimeConsumed + int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) buffer.WriteString("\tCPU Usage: \n") buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal)) - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n", + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n", statsInfo.ParseStage.ParseDuration, statsInfo.PlanStage.PlanDuration, statsInfo.CompileStage.CompileDuration, gblStats.OperatorTimeConsumed, gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)) //------------------------------------------------------------------------------------------------------- if option.Analyze { buffer.WriteString("\tQuery Build Plan Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanS3Request.List, statsInfo.PlanStage.BuildPlanS3Request.Head, @@ -550,8 +551,9 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S statsInfo.PlanStage.BuildPlanS3Request.Delete, statsInfo.PlanStage.BuildPlanS3Request.DeleteMul, )) + buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) - + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index 03cdb597ead88..e9aca22ed78ac 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -117,6 +118,10 @@ func (c *compilerContext) ResolveAccountIds(accountNames []string) ([]uint32, er } func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*pb.StatsInfo, error) { + stats := statistic.StatsInfoFromContext(c.GetContext()) + start := time.Now() + defer stats.AddBuildPlanStatsConsumption(time.Since(start)) + dbName := obj.GetSchemaName() tableName := obj.GetObjName() @@ -147,7 +152,6 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* } } var statsInfo *pb.StatsInfo - stats := statistic.StatsInfoFromContext(ctx) // This is a partition table. if partitionInfo != nil { crs := new(perfcounter.CounterSet) @@ -158,6 +162,7 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* return nil, err } newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return nil, err @@ -176,6 +181,7 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* } else { crs := new(perfcounter.CounterSet) newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) statsInfo, err = table.Stats(newCtx, true) if err != nil { diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index 9384f79191a24..067e58735802e 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -88,25 +88,26 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio cpuTimeVal := gblStats.OperatorTimeConsumed + int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) + statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration - - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - + statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption - (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) buffer.WriteString("\tCPU Usage: \n") buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal)) - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n", + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n", statsInfo.ParseStage.ParseDuration, statsInfo.PlanStage.PlanDuration, statsInfo.CompileStage.CompileDuration, gblStats.OperatorTimeConsumed, gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, + statsInfo.PlanStage.BuildPlanStatsIOConsumption, statsInfo.IOAccessTimeConsumption, statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)) //------------------------------------------------------------------------------------------------------- if option == AnalyzeOption { buffer.WriteString("\tQuery Build Plan Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption)) buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", statsInfo.PlanStage.BuildPlanS3Request.List, statsInfo.PlanStage.BuildPlanS3Request.Head, @@ -115,8 +116,9 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio statsInfo.PlanStage.BuildPlanS3Request.Delete, statsInfo.PlanStage.BuildPlanS3Request.DeleteMul, )) + buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) - + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) diff --git a/pkg/sql/plan/order_binder.go b/pkg/sql/plan/order_binder.go index 7c7e75455640b..18a6cb3ab1e14 100644 --- a/pkg/sql/plan/order_binder.go +++ b/pkg/sql/plan/order_binder.go @@ -38,7 +38,7 @@ func (b *OrderBinder) BindExpr(astExpr tree.Expr) (*plan.Expr, error) { for _, selectField := range b.ctx.projectByAst { if selectField.aliasName != "" { continue - } + }// deptno if projectField, ok1 := selectField.ast.(*tree.UnresolvedName); ok1 && projectField.ColName() == colRef.ColName() { return nil, moerr.NewInvalidInputf(b.GetContext(), "Column '%s' in order clause is ambiguous", colRef.ColName()) } diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 727780eb1293d..92859aa713687 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -296,7 +296,7 @@ type StatsInfo struct { BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"` // The following attributes belong to independent statistics during the `buildPlan` stage, only for analysis reference. BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns - BUildPlanStatsIODuration int64 `json:"BuildPlanStatsIODuration"` // unit: ns + BuildPlanStatsIOConsumption int64 `json:"BuildPlanStatsIOConsumption"` // unit: ns BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns } @@ -519,7 +519,7 @@ func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) { if stats == nil { return } - atomic.AddInt64(&stats.PlanStage.BUildPlanStatsIODuration, int64(d)) + atomic.AddInt64(&stats.PlanStage.BuildPlanStatsIOConsumption, int64(d)) } func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) { diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index a0c80995efa74..b3a9639d5ae7b 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -254,16 +254,12 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) return info } - if ok = ctx.Value(perfcounter.BuildPlanMarkKey{}); ok { - + if _, ok = ctx.Value(perfcounter.CalcTableStatsKey{}).(bool); ok { + stats := statistic.StatsInfoFromContext(ctx) + start := time.Now() + defer stats.AddBuildPlanStatsIOConsumption(time.Since(start)) } - stats := statistic.StatsInfoFromContext(ctx) - start := time.Now() - defer func() { - stats.AddBuildPlanStatsIOConsumption(time.Since(start)) - }() - // Get stats info from remote node. if gs.KeyRouter != nil { client := gs.engine.qc From 5dafaf57311a3633af9b08b35115d4720a227d5b Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Mon, 16 Dec 2024 11:55:02 +0800 Subject: [PATCH 3/9] update code --- pkg/vm/engine/disttae/stats.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 641d52c1703fc..4b11a6f86cb42 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -284,7 +284,9 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) if _, ok = ctx.Value(perfcounter.CalcTableStatsKey{}).(bool); ok { stats := statistic.StatsInfoFromContext(ctx) start := time.Now() - defer stats.AddBuildPlanStatsIOConsumption(time.Since(start)) + defer func() { + stats.AddBuildPlanStatsIOConsumption(time.Since(start)) + }() } // Get stats info from remote node. From e74c91794c20eb93b95b8a6e8d8aedc43533e4dc Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Mon, 16 Dec 2024 16:26:22 +0800 Subject: [PATCH 4/9] add debug code,which will be delete in the feature --- pkg/frontend/compiler_context.go | 18 ++++++- pkg/sql/compile/analyze_module.go | 7 +++ pkg/sql/compile/sql_executor_context.go | 5 +- .../impl/motrace/statistic/stats_array.go | 50 +++++++++++++++++++ 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index 9d2508716615c..794d3d7d60c4c 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -803,10 +803,12 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps stats := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) start := time.Now() defer func() { - v2.TxnStatementStatsDurationHistogram.Observe(time.Since(start).Seconds()) stats.AddBuildPlanStatsConsumption(time.Since(start)) + v2.TxnStatementStatsDurationHistogram.Observe(time.Since(start).Seconds()) }() + //------------------------------------------------------------------------------------------------------------------ + start1 := time.Now() dbName := obj.GetSchemaName() tableName := obj.GetObjName() checkSub := true @@ -826,17 +828,26 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps DbName: dbName, } } + stats.AddStatsCalcPhase1Duration(time.Since(start1)) + //------------------------------------------------------------------------------------------------------------------ + start2 := time.Now() ctx, table, err := tcc.getRelation(dbName, tableName, sub, snapshot) if err != nil { return nil, err } + stats.AddStatsCalcPhase2Duration(time.Since(start2)) + //------------------------------------------------------------------------------------------------------------------ + + start3 := time.Now() cached, needUpdate := tcc.statsInCache(ctx, dbName, table, snapshot) + stats.AddStatsCalcPhase3Duration(time.Since(start3)) if cached == nil { return nil, nil } if !needUpdate { return cached, nil } + start4 := time.Now() tableDefs, err := table.TableDefs(ctx) if err != nil { return nil, err @@ -855,7 +866,9 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps break } } + stats.AddStatsCalcPhase4Duration(time.Since(start4)) + start5 := time.Now() var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { @@ -903,9 +916,12 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps DeleteMul: crs.FileService.S3.DeleteMulti.Load(), }) } + stats.AddStatsCalcPhase5Duration(time.Since(start5)) if statsInfo != nil { + start6 := time.Now() tcc.UpdateStatsInCache(table.GetTableID(ctx), statsInfo) + stats.AddStatsCalcPhase6Duration(time.Since(start6)) return statsInfo, nil } return cached, nil diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index ee3cd72566868..65dba741a6c41 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -542,6 +542,13 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase1 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase1Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase2 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase2Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase3 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase3Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase4 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase4Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase5 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase5Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase6 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase6Duration)) + //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index e9aca22ed78ac..005856abe7dc6 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -20,7 +20,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -119,8 +118,8 @@ func (c *compilerContext) ResolveAccountIds(accountNames []string) ([]uint32, er func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*pb.StatsInfo, error) { stats := statistic.StatsInfoFromContext(c.GetContext()) - start := time.Now() - defer stats.AddBuildPlanStatsConsumption(time.Since(start)) + //start := time.Now() + //defer stats.AddBuildPlanStatsConsumption(time.Since(start)) dbName := obj.GetSchemaName() tableName := obj.GetObjName() diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 92859aa713687..5fd9f3aad2480 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -298,6 +298,14 @@ type StatsInfo struct { BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns BuildPlanStatsIOConsumption int64 `json:"BuildPlanStatsIOConsumption"` // unit: ns BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns + + StatsCalcPhase1Duration int64 `json:"StatsCalcPhase1Duration"` // unit: ns + StatsCalcPhase2Duration int64 `json:"StatsCalcPhase2Duration"` // unit: ns + StatsCalcPhase3Duration int64 `json:"StatsCalcPhase3Duration"` // unit: ns + StatsCalcPhase4Duration int64 `json:"StatsCalcPhase4Duration"` // unit: ns + + StatsCalcPhase5Duration int64 `json:"StatsCalcPhase5Duration"` // unit: ns + StatsCalcPhase6Duration int64 `json:"StatsCalcPhase6Duration"` // unit: ns } // Compile phase statistics @@ -522,6 +530,48 @@ func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) { atomic.AddInt64(&stats.PlanStage.BuildPlanStatsIOConsumption, int64(d)) } +func (stats *StatsInfo) AddStatsCalcPhase1Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.StatsCalcPhase1Duration, int64(d)) +} + +func (stats *StatsInfo) AddStatsCalcPhase2Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.StatsCalcPhase2Duration, int64(d)) +} + +func (stats *StatsInfo) AddStatsCalcPhase3Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.StatsCalcPhase3Duration, int64(d)) +} + +func (stats *StatsInfo) AddStatsCalcPhase4Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.StatsCalcPhase4Duration, int64(d)) +} + +func (stats *StatsInfo) AddStatsCalcPhase5Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.StatsCalcPhase5Duration, int64(d)) +} + +func (stats *StatsInfo) AddStatsCalcPhase6Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.StatsCalcPhase6Duration, int64(d)) +} + func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) { if stats == nil { return From 089f74bdce49ebe410f6b09494c1e0eaeabbd24a Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Mon, 16 Dec 2024 22:00:40 +0800 Subject: [PATCH 5/9] add code for debug --- pkg/sql/models/show_phyplan.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index 067e58735802e..bda24ac74b336 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -119,6 +119,13 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase1 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase1Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase2 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase2Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase3 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase3Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase4 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase4Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase5 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase5Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase6 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase6Duration)) + //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) From 4b0689bd4ade6add37c7ed2c196a427cb37833ae Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Tue, 17 Dec 2024 09:39:16 +0800 Subject: [PATCH 6/9] delete unused code --- pkg/sql/plan/order_binder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/plan/order_binder.go b/pkg/sql/plan/order_binder.go index 18a6cb3ab1e14..7c7e75455640b 100644 --- a/pkg/sql/plan/order_binder.go +++ b/pkg/sql/plan/order_binder.go @@ -38,7 +38,7 @@ func (b *OrderBinder) BindExpr(astExpr tree.Expr) (*plan.Expr, error) { for _, selectField := range b.ctx.projectByAst { if selectField.aliasName != "" { continue - }// deptno + } if projectField, ok1 := selectField.ast.(*tree.UnresolvedName); ok1 && projectField.ColName() == colRef.ColName() { return nil, moerr.NewInvalidInputf(b.GetContext(), "Column '%s' in order clause is ambiguous", colRef.ColName()) } From 5217ca3b341943bd9da190006ce77e2975c9e773 Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Wed, 18 Dec 2024 16:12:07 +0800 Subject: [PATCH 7/9] add statsInCache stats analyze --- pkg/frontend/compiler_context.go | 7 +++++ pkg/sql/compile/analyze_module.go | 3 +++ .../impl/motrace/statistic/stats_array.go | 27 ++++++++++++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index 82ef8ab327268..d8d726bb2d352 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -976,11 +976,15 @@ func (tcc *TxnCompilerContext) UpdateStatsInCache(tid uint64, s *pb.StatsInfo) { // statsInCache get the *pb.StatsInfo from session cache. If the info is nil, just return nil and false, // else, check if the info needs to be updated. func (tcc *TxnCompilerContext) statsInCache(ctx context.Context, dbName string, table engine.Relation, snapshot *plan2.Snapshot) (*pb.StatsInfo, bool) { + start1 := time.Now() + stats := statistic.StatsInfoFromContext(tcc.execCtx.reqCtx) s := tcc.GetStatsCache().GetStatsInfo(table.GetTableID(ctx), true) if s == nil { return nil, false } + stats.AddLogic1InPhase3Duration(time.Since(start1)) + start2 := time.Now() var partitionInfo *plan2.PartitionByDef engineDefs, err := table.TableDefs(ctx) if err != nil { @@ -998,6 +1002,7 @@ func (tcc *TxnCompilerContext) statsInCache(ctx context.Context, dbName string, } } } + stats.AddLogic2InPhase3Duration(time.Since(start2)) second := time.Now().Unix() var diff int64 = 3 @@ -1010,6 +1015,7 @@ func (tcc *TxnCompilerContext) statsInCache(ctx context.Context, dbName string, } s.TimeSecond = second + start3 := time.Now() approxNumObjects := 0 if partitionInfo != nil { for _, PartitionTableName := range partitionInfo.PartitionTableNames { @@ -1022,6 +1028,7 @@ func (tcc *TxnCompilerContext) statsInCache(ctx context.Context, dbName string, } else { approxNumObjects = table.ApproxObjectsNum(ctx) } + stats.AddLogic3InPhase3Duration(time.Since(start3)) if approxNumObjects == 0 { return nil, false } diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index 65dba741a6c41..07d0dac56dd57 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -548,6 +548,9 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase4 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase4Duration)) buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase5 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase5Duration)) buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase6 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase6Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Logic1 In Phase3 Duration: %dns \n", statsInfo.PlanStage.Logic1InPhase3Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Logic2 In Phase3 Duration: %dns \n", statsInfo.PlanStage.Logic2InPhase3Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Logic3 In Phase3 Duration: %dns \n", statsInfo.PlanStage.Logic3InPhase3Duration)) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") diff --git a/pkg/util/trace/impl/motrace/statistic/stats_array.go b/pkg/util/trace/impl/motrace/statistic/stats_array.go index 5fd9f3aad2480..3959494018518 100644 --- a/pkg/util/trace/impl/motrace/statistic/stats_array.go +++ b/pkg/util/trace/impl/motrace/statistic/stats_array.go @@ -302,8 +302,12 @@ type StatsInfo struct { StatsCalcPhase1Duration int64 `json:"StatsCalcPhase1Duration"` // unit: ns StatsCalcPhase2Duration int64 `json:"StatsCalcPhase2Duration"` // unit: ns StatsCalcPhase3Duration int64 `json:"StatsCalcPhase3Duration"` // unit: ns - StatsCalcPhase4Duration int64 `json:"StatsCalcPhase4Duration"` // unit: ns + Logic1InPhase3Duration int64 `json:"Logic1InPhase3Duration"` // unit: ns + Logic2InPhase3Duration int64 `json:"Logic2InPhase3Duration"` // unit: ns + Logic3InPhase3Duration int64 `json:"Logic3InPhase3Duration"` // unit: ns + + StatsCalcPhase4Duration int64 `json:"StatsCalcPhase4Duration"` // unit: ns StatsCalcPhase5Duration int64 `json:"StatsCalcPhase5Duration"` // unit: ns StatsCalcPhase6Duration int64 `json:"StatsCalcPhase6Duration"` // unit: ns } @@ -551,6 +555,27 @@ func (stats *StatsInfo) AddStatsCalcPhase3Duration(d time.Duration) { atomic.AddInt64(&stats.PlanStage.StatsCalcPhase3Duration, int64(d)) } +func (stats *StatsInfo) AddLogic1InPhase3Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.Logic1InPhase3Duration, int64(d)) +} + +func (stats *StatsInfo) AddLogic2InPhase3Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.Logic2InPhase3Duration, int64(d)) +} + +func (stats *StatsInfo) AddLogic3InPhase3Duration(d time.Duration) { + if stats == nil { + return + } + atomic.AddInt64(&stats.PlanStage.Logic3InPhase3Duration, int64(d)) +} + func (stats *StatsInfo) AddStatsCalcPhase4Duration(d time.Duration) { if stats == nil { return From 59c1b2fecaa4bfccf802e06cd19d1aa8f5c4e428 Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Mon, 23 Dec 2024 14:35:14 +0800 Subject: [PATCH 8/9] Statistics on Stats S3 resource consumption --- pkg/pb/statsinfo/statsinfo.go | 6 ++ pkg/sql/compile/sql_executor_context.go | 7 ++- pkg/vm/engine/disttae/stats.go | 77 +++++++++++++++++-------- pkg/vm/engine/disttae/stats_test.go | 11 ++-- 4 files changed, 70 insertions(+), 31 deletions(-) diff --git a/pkg/pb/statsinfo/statsinfo.go b/pkg/pb/statsinfo/statsinfo.go index 1967584ed4517..4b77b48dbb5ec 100644 --- a/pkg/pb/statsinfo/statsinfo.go +++ b/pkg/pb/statsinfo/statsinfo.go @@ -15,11 +15,17 @@ package statsinfo import ( + "context" "math" "golang.org/x/exp/constraints" ) +type StatsInfoKeyV2 struct { + Ctx context.Context + Key StatsInfoKey +} + func (sc *StatsInfo) NeedUpdate(currentApproxObjNum int64) bool { if sc.ApproxObjectNumber == 0 || sc.AccurateObjectNumber == 0 { return true diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index b12aef8dad78b..b7e14465c204d 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -118,8 +119,10 @@ func (c *compilerContext) ResolveAccountIds(accountNames []string) ([]uint32, er func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*pb.StatsInfo, error) { stats := statistic.StatsInfoFromContext(c.GetContext()) - //start := time.Now() - //defer stats.AddBuildPlanStatsConsumption(time.Since(start)) + start := time.Now() + defer func() { + stats.AddBuildPlanStatsConsumption(time.Since(start)) + }() dbName := obj.GetSchemaName() tableName := obj.GetObjName() diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 4b11a6f86cb42..b9936a162f704 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -144,7 +144,7 @@ func WithUpdateWorkerFactor(f int) GlobalStatsOption { } // WithStatsUpdater set the update function to update stats info. -func WithStatsUpdater(f func(pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { +func WithStatsUpdater(f func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { return func(s *GlobalStats) { s.statsUpdater = f } @@ -176,7 +176,7 @@ type GlobalStats struct { // TODO(volgariver6): add metrics of the chan length. tailC chan *logtail.TableLogtail - updateC chan pb.StatsInfoKey + updateC chan pb.StatsInfoKeyV2 updatingMu struct { sync.Mutex @@ -216,7 +216,7 @@ type GlobalStats struct { // statsUpdate is the function which updates the stats info. // If it is nil, set it to doUpdate. - statsUpdater func(pb.StatsInfoKey, *pb.StatsInfo) bool + statsUpdater func(context.Context, pb.StatsInfoKey, *pb.StatsInfo) bool // for test only currently. approxObjectNumUpdater func() int64 } @@ -228,7 +228,7 @@ func NewGlobalStats( ctx: ctx, engine: e, tailC: make(chan *logtail.TableLogtail, 10000), - updateC: make(chan pb.StatsInfoKey, 3000), + updateC: make(chan pb.StatsInfoKeyV2, 3000), logtailUpdate: newLogtailUpdate(), tableLogtailCounter: make(map[pb.StatsInfoKey]int64), KeyRouter: keyRouter, @@ -270,12 +270,22 @@ func (gs *GlobalStats) checkTriggerCond(key pb.StatsInfoKey, entryNum int64) boo } func (gs *GlobalStats) PrefetchTableMeta(ctx context.Context, key pb.StatsInfoKey) bool { - return gs.triggerUpdate(key, false) + wrapkey := pb.StatsInfoKeyV2{ + Ctx: ctx, + Key: key, + } + return gs.triggerUpdate(wrapkey, false) } func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo { gs.mu.Lock() defer gs.mu.Unlock() + + wrapkey := pb.StatsInfoKeyV2{ + Ctx: ctx, + Key: key, + } + info, ok := gs.mu.statsInfoMap[key] if ok && info != nil { return info @@ -324,7 +334,7 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) // If the trigger condition is not satisfied, the stats will not be updated // for long time. So we trigger the update here to get the stats info as soon // as possible. - gs.triggerUpdate(key, true) + gs.triggerUpdate(wrapkey, true) }() info, ok = gs.mu.statsInfoMap[key] @@ -385,7 +395,7 @@ func (gs *GlobalStats) consumeWorker(ctx context.Context) { return case tail := <-gs.tailC: - gs.consumeLogtail(tail) + gs.consumeLogtail(ctx, tail) } } } @@ -406,7 +416,7 @@ func (gs *GlobalStats) updateWorker(ctx context.Context) { } } -func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKey, force bool) bool { +func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKeyV2, force bool) bool { if force { gs.updateC <- key v2.StatsTriggerForcedCounter.Add(1) @@ -422,15 +432,21 @@ func (gs *GlobalStats) triggerUpdate(key pb.StatsInfoKey, force bool) bool { } } -func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { +func (gs *GlobalStats) consumeLogtail(ctx context.Context, tail *logtail.TableLogtail) { key := pb.StatsInfoKey{ AccId: tail.Table.AccId, DatabaseID: tail.Table.DbId, TableID: tail.Table.TbId, } + + wrapkey := pb.StatsInfoKeyV2{ + Ctx: ctx, + Key: key, + } + if len(tail.CkpLocation) > 0 { if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } } else if tail.Table != nil { var triggered bool @@ -438,7 +454,7 @@ func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { if logtailreplay.IsMetaEntry(cmd.TableName) { triggered = true if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } break } @@ -451,7 +467,7 @@ func (gs *GlobalStats) consumeLogtail(tail *logtail.TableLogtail) { if !triggered && gs.checkTriggerCond(key, gs.tableLogtailCounter[key]) { gs.tableLogtailCounter[key] = 0 if gs.shouldTrigger(key) { - gs.triggerUpdate(key, false) + gs.triggerUpdate(wrapkey, false) } } } @@ -610,38 +626,51 @@ func (gs *GlobalStats) broadcastStats(key pb.StatsInfoKey) { }) } -func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKey) { - if !gs.shouldUpdate(key) { +func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKeyV2) { + statser := statistic.StatsInfoFromContext(key.Ctx) + crs := new(perfcounter.CounterSet) + + if !gs.shouldUpdate(key.Key) { return } // wait until the table's logtail has been updated. - gs.waitLogtailUpdated(key.TableID) + gs.waitLogtailUpdated(key.Key.TableID) // updated is used to mark that the stats info is updated. var updated bool stats := plan2.NewStatsInfo() + + newCtx := perfcounter.AttachS3RequestKey(key.Ctx, crs) if gs.statsUpdater != nil { - updated = gs.statsUpdater(key, stats) - } + updated = gs.statsUpdater(newCtx, key.Key, stats) + } + statser.AddBuildPlanStatsS3Request(statistic.S3Request{ + List: crs.FileService.S3.List.Load(), + Head: crs.FileService.S3.Head.Load(), + Put: crs.FileService.S3.Put.Load(), + Get: crs.FileService.S3.Get.Load(), + Delete: crs.FileService.S3.Delete.Load(), + DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + }) gs.mu.Lock() defer gs.mu.Unlock() if updated { - gs.mu.statsInfoMap[key] = stats - gs.broadcastStats(key) - } else if _, ok := gs.mu.statsInfoMap[key]; !ok { - gs.mu.statsInfoMap[key] = nil + gs.mu.statsInfoMap[key.Key] = stats + gs.broadcastStats(key.Key) + } else if _, ok := gs.mu.statsInfoMap[key.Key]; !ok { + gs.mu.statsInfoMap[key.Key] = nil } // Notify all the waiters to read the new stats info. gs.mu.cond.Broadcast() - gs.doneUpdate(key, updated) + gs.doneUpdate(key.Key, updated) } -func (gs *GlobalStats) doUpdate(key pb.StatsInfoKey, stats *pb.StatsInfo) bool { +func (gs *GlobalStats) doUpdate(ctx context.Context, key pb.StatsInfoKey, stats *pb.StatsInfo) bool { table := gs.engine.GetLatestCatalogCache().GetTableById(key.AccId, key.DatabaseID, key.TableID) // table or its definition is nil, means that the table is created but not committed yet. if table == nil || table.TableDef == nil { @@ -666,7 +695,7 @@ func (gs *GlobalStats) doUpdate(key pb.StatsInfoKey, stats *pb.StatsInfo) bool { approxObjectNum, stats, ) - if err := UpdateStats(gs.ctx, req, gs.concurrentExecutor); err != nil { + if err := UpdateStats(ctx, req, gs.concurrentExecutor); err != nil { logutil.Errorf("failed to init stats info for table %v, err: %v", key, err) return false } diff --git a/pkg/vm/engine/disttae/stats_test.go b/pkg/vm/engine/disttae/stats_test.go index 48d89fad6b2cc..354e0c12e6724 100644 --- a/pkg/vm/engine/disttae/stats_test.go +++ b/pkg/vm/engine/disttae/stats_test.go @@ -23,6 +23,8 @@ import ( "time" "github.com/lni/goutils/leaktest" + "github.com/stretchr/testify/assert" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/clusterservice" "github.com/matrixorigin/matrixone/pkg/common/mpool" @@ -32,7 +34,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" - "github.com/stretchr/testify/assert" ) func TestGetStats(t *testing.T) { @@ -41,7 +42,7 @@ func TestGetStats(t *testing.T) { defer cancel() gs := NewGlobalStats(ctx, nil, nil, WithUpdateWorkerFactor(4), - WithStatsUpdater(func(key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { + WithStatsUpdater(func(_ context.Context, key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { info.BlockNumber = 20 return true }), @@ -161,7 +162,7 @@ func TestUpdateStats(t *testing.T) { TableID: 1001, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.False(t, updated) }) }) @@ -180,7 +181,7 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.False(t, updated) }) }) @@ -199,7 +200,7 @@ func TestUpdateStats(t *testing.T) { TableID: tid, } stats := plan2.NewStatsInfo() - updated := e.globalStats.doUpdate(k, stats) + updated := e.globalStats.doUpdate(ctx, k, stats) assert.True(t, updated) }, WithApproxObjectNumUpdater(func() int64 { return 10 From 42558d234341d05f190e4334ca5589506824028a Mon Sep 17 00:00:00 2001 From: qingxinhome <758355272@qq.com> Date: Mon, 23 Dec 2024 20:59:30 +0800 Subject: [PATCH 9/9] Statistics on Stats S3 resource consumption 2 --- pkg/frontend/compiler_context.go | 47 +++++++++++++------------ pkg/frontend/mysql_cmd_executor.go | 6 ++-- pkg/sql/compile/analyze_module.go | 9 +++++ pkg/sql/compile/sql_executor_context.go | 46 ++++++++++++------------ pkg/sql/models/show_phyplan.go | 8 +++++ 5 files changed, 70 insertions(+), 46 deletions(-) diff --git a/pkg/frontend/compiler_context.go b/pkg/frontend/compiler_context.go index d8d726bb2d352..569c342d5d26d 100644 --- a/pkg/frontend/compiler_context.go +++ b/pkg/frontend/compiler_context.go @@ -914,15 +914,17 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { - crs := new(perfcounter.CounterSet) + //crs := new(perfcounter.CounterSet) statsInfo = plan2.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := tcc.getRelation(dbName, partitionTable, sub, snapshot) if err != nil { return cached, err } - newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) - newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + //newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + //newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + + newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return cached, err @@ -930,33 +932,34 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps statsInfo.Merge(parStats) } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } else { - crs := new(perfcounter.CounterSet) - newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + //crs := new(perfcounter.CounterSet) + //newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + //newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return cached, err } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } stats.AddStatsCalcPhase5Duration(time.Since(start5)) diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index 2c98eb343565b..a48c3a4591f2c 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -1989,10 +1989,12 @@ func buildPlan(reqCtx context.Context, ses FeSession, ctx plan2.CompilerContext, v2.TxnStatementBuildPlanDurationHistogram.Observe(cost.Seconds()) }() - stats := statistic.StatsInfoFromContext(reqCtx) + planContext := ctx.GetContext() + stats := statistic.StatsInfoFromContext(planContext) stats.PlanStart() crs := new(perfcounter.CounterSet) - reqCtx = perfcounter.AttachBuildPlanMarkKey(reqCtx, crs) + planContext = perfcounter.AttachBuildPlanMarkKey(planContext, crs) + ctx.SetContext(planContext) defer func() { stats.AddBuildPlanS3Request(statistic.S3Request{ List: crs.FileService.S3.List.Load(), diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index 07d0dac56dd57..f6a7ec66404de 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -552,6 +552,15 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S buffer.WriteString(fmt.Sprintf("\t\t- Logic2 In Phase3 Duration: %dns \n", statsInfo.PlanStage.Logic2InPhase3Duration)) buffer.WriteString(fmt.Sprintf("\t\t- Logic3 In Phase3 Duration: %dns \n", statsInfo.PlanStage.Logic3InPhase3Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", + statsInfo.PlanStage.BuildPlanStatsS3.List, + statsInfo.PlanStage.BuildPlanStatsS3.Head, + statsInfo.PlanStage.BuildPlanStatsS3.Put, + statsInfo.PlanStage.BuildPlanStatsS3.Get, + statsInfo.PlanStage.BuildPlanStatsS3.Delete, + statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul, + )) + //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index b7e14465c204d..3afa9a2aeb361 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -156,15 +156,16 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* var statsInfo *pb.StatsInfo // This is a partition table. if partitionInfo != nil { - crs := new(perfcounter.CounterSet) + //crs := new(perfcounter.CounterSet) statsInfo = plan.NewStatsInfo() for _, partitionTable := range partitionInfo.PartitionTableNames { parCtx, parTable, err := c.getRelation(dbName, partitionTable, snapshot) if err != nil { return nil, err } - newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) - newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + //newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs) + //newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx) + newParCtx := perfcounter.AttachCalcTableStatsKey(parCtx) parStats, err := parTable.Stats(newParCtx, true) if err != nil { return nil, err @@ -172,32 +173,33 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* statsInfo.Merge(parStats) } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } else { - crs := new(perfcounter.CounterSet) - newCtx := perfcounter.AttachS3RequestKey(ctx, crs) - newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + //crs := new(perfcounter.CounterSet) + //newCtx := perfcounter.AttachS3RequestKey(ctx, crs) + //newCtx = perfcounter.AttachCalcTableStatsKey(newCtx) + newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err = table.Stats(newCtx, true) if err != nil { return nil, err } - stats.AddBuildPlanStatsS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), - }) + //stats.AddBuildPlanStatsS3Request(statistic.S3Request{ + // List: crs.FileService.S3.List.Load(), + // Head: crs.FileService.S3.Head.Load(), + // Put: crs.FileService.S3.Put.Load(), + // Get: crs.FileService.S3.Get.Load(), + // Delete: crs.FileService.S3.Delete.Load(), + // DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + //}) } return statsInfo, nil } diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index bda24ac74b336..c3f94d9e4d1cd 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -125,6 +125,14 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase4 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase4Duration)) buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase5 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase5Duration)) buffer.WriteString(fmt.Sprintf("\t\t- Calc StatsPhase6 Duration: %dns \n", statsInfo.PlanStage.StatsCalcPhase6Duration)) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", + statsInfo.PlanStage.BuildPlanStatsS3.List, + statsInfo.PlanStage.BuildPlanStatsS3.Head, + statsInfo.PlanStage.BuildPlanStatsS3.Put, + statsInfo.PlanStage.BuildPlanStatsS3.Get, + statsInfo.PlanStage.BuildPlanStatsS3.Delete, + statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul, + )) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n")