Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract io time consumed for calc stats #20724

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/frontend/compiler_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps
return cached, err
}
newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs)
newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx)
parStats, err := parTable.Stats(newParCtx, true)
if err != nil {
return cached, err
Expand All @@ -886,6 +887,7 @@ func (tcc *TxnCompilerContext) Stats(obj *plan2.ObjectRef, snapshot *plan2.Snaps
} else {
crs := new(perfcounter.CounterSet)
newCtx := perfcounter.AttachS3RequestKey(ctx, crs)
newCtx = perfcounter.AttachCalcTableStatsKey(newCtx)

statsInfo, err = table.Stats(newCtx, true)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/clusterservice"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -71,8 +74,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/route"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func createDropDatabaseErrorInfo() string {
Expand Down Expand Up @@ -3722,12 +3723,14 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt
int64(statsInfo.PlanStage.PlanDuration) +
int64(statsInfo.CompileStage.CompileDuration) +
statsInfo.PrepareRunStage.ScopePrepareDuration +
statsInfo.PrepareRunStage.CompilePreRunOnceDuration - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock -
statsInfo.PrepareRunStage.CompilePreRunOnceDuration -
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock -
statsInfo.PlanStage.BuildPlanStatsIOConsumption -
(statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)

if totalTime < 0 {
if !h.isInternalSubStmt {
ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d) = %d]",
ses.Infof(ctx, "negative cpu statement_id:%s, statement_type:%s, statsInfo:[Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d) = %d]",
uuid.UUID(h.stmt.StatementID).String(),
h.stmt.StatementType,
statsInfo.ParseStage.ParseDuration,
Expand All @@ -3736,6 +3739,7 @@ func (h *marshalPlanHandler) Stats(ctx context.Context, ses FeSession) (statsByt
operatorTimeConsumed,
statsInfo.PrepareRunStage.ScopePrepareDuration+statsInfo.PrepareRunStage.CompilePreRunOnceDuration,
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock,
statsInfo.PlanStage.BuildPlanStatsIOConsumption,
statsInfo.IOAccessTimeConsumption,
statsInfo.S3FSPrefetchFileIOMergerTimeConsumption,
totalTime,
Expand Down
19 changes: 19 additions & 0 deletions pkg/perfcounter/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,22 @@ type ExecPipelineMarkKey struct{}
func AttachExecPipelineKey(ctx context.Context, counter *CounterSet) context.Context {
return context.WithValue(ctx, ExecPipelineMarkKey{}, counter)
}

// ------------------------------------------------------------------------------------------------
type CalcTableStatsKey struct{}

func AttachCalcTableStatsKey(ctx context.Context) context.Context {
return context.WithValue(ctx, CalcTableStatsKey{}, true)
}

type CalcTableSizeKey struct{}

func AttachCalcTableSizeKey(ctx context.Context) context.Context {
return context.WithValue(ctx, CalcTableSizeKey{}, true)
}

type CalcTableRowsKey struct{}

func AttachCalcTableRowsKey(ctx context.Context) context.Context {
return context.WithValue(ctx, CalcTableRowsKey{}, true)
}
10 changes: 6 additions & 4 deletions pkg/sql/compile/analyze_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,25 +511,26 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S
cpuTimeVal := gblStats.OperatorTimeConsumed +
int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) +
statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration -
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock -
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption -
(statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)

buffer.WriteString("\tCPU Usage: \n")
buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal))
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n",
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n",
statsInfo.ParseStage.ParseDuration,
statsInfo.PlanStage.PlanDuration,
statsInfo.CompileStage.CompileDuration,
gblStats.OperatorTimeConsumed,
gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration,
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock,
statsInfo.PlanStage.BuildPlanStatsIOConsumption,
statsInfo.IOAccessTimeConsumption,
statsInfo.S3FSPrefetchFileIOMergerTimeConsumption))

//-------------------------------------------------------------------------------------------------------
if option.Analyze {
buffer.WriteString("\tQuery Build Plan Stage:\n")
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration))
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption))
buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n",
statsInfo.PlanStage.BuildPlanS3Request.List,
statsInfo.PlanStage.BuildPlanS3Request.Head,
Expand All @@ -538,8 +539,9 @@ func explainResourceOverview(queryResult *util.RunResult, statsInfo *statistic.S
statsInfo.PlanStage.BuildPlanS3Request.Delete,
statsInfo.PlanStage.BuildPlanS3Request.DeleteMul,
))
buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration)))
buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration))

buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption))
//-------------------------------------------------------------------------------------------------------
buffer.WriteString("\tQuery Compile Stage:\n")
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration))
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/compile/sql_executor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -117,6 +118,10 @@ func (c *compilerContext) ResolveAccountIds(accountNames []string) ([]uint32, er
}

func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*pb.StatsInfo, error) {
stats := statistic.StatsInfoFromContext(c.GetContext())
start := time.Now()
defer stats.AddBuildPlanStatsConsumption(time.Since(start))

dbName := obj.GetSchemaName()
tableName := obj.GetObjName()

Expand Down Expand Up @@ -147,7 +152,6 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*
}
}
var statsInfo *pb.StatsInfo
stats := statistic.StatsInfoFromContext(ctx)
// This is a partition table.
if partitionInfo != nil {
crs := new(perfcounter.CounterSet)
Expand All @@ -158,6 +162,7 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*
return nil, err
}
newParCtx := perfcounter.AttachS3RequestKey(parCtx, crs)
newParCtx = perfcounter.AttachCalcTableStatsKey(newParCtx)
parStats, err := parTable.Stats(newParCtx, true)
if err != nil {
return nil, err
Expand All @@ -176,6 +181,7 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*
} else {
crs := new(perfcounter.CounterSet)
newCtx := perfcounter.AttachS3RequestKey(ctx, crs)
newCtx = perfcounter.AttachCalcTableStatsKey(newCtx)

statsInfo, err = table.Stats(newCtx, true)
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/models/show_phyplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,26 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio
cpuTimeVal := gblStats.OperatorTimeConsumed +
int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) +
statsInfo.PrepareRunStage.ScopePrepareDuration + statsInfo.PrepareRunStage.CompilePreRunOnceDuration -
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock -
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock - statsInfo.PlanStage.BuildPlanStatsIOConsumption -
(statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)

buffer.WriteString("\tCPU Usage: \n")
buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal))
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-IOAccess(%d)-IOMerge(%d)\n",
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n",
statsInfo.ParseStage.ParseDuration,
statsInfo.PlanStage.PlanDuration,
statsInfo.CompileStage.CompileDuration,
gblStats.OperatorTimeConsumed,
gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration,
statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock,
statsInfo.PlanStage.BuildPlanStatsIOConsumption,
statsInfo.IOAccessTimeConsumption,
statsInfo.S3FSPrefetchFileIOMergerTimeConsumption))

//-------------------------------------------------------------------------------------------------------
if option == AnalyzeOption {
buffer.WriteString("\tQuery Build Plan Stage:\n")
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.PlanStage.PlanDuration))
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption))
buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n",
statsInfo.PlanStage.BuildPlanS3Request.List,
statsInfo.PlanStage.BuildPlanS3Request.Head,
Expand All @@ -115,8 +116,9 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio
statsInfo.PlanStage.BuildPlanS3Request.Delete,
statsInfo.PlanStage.BuildPlanS3Request.DeleteMul,
))
buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration)))
buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration))

buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption))
//-------------------------------------------------------------------------------------------------------
buffer.WriteString("\tQuery Compile Stage:\n")
buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/plan/order_binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (b *OrderBinder) BindExpr(astExpr tree.Expr) (*plan.Expr, error) {
for _, selectField := range b.ctx.projectByAst {
if selectField.aliasName != "" {
continue
}
}// deptno
if projectField, ok1 := selectField.ast.(*tree.UnresolvedName); ok1 && projectField.ColName() == colRef.ColName() {
return nil, moerr.NewInvalidInputf(b.GetContext(), "Column '%s' in order clause is ambiguous", colRef.ColName())
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/trace/impl/motrace/statistic/stats_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ type StatsInfo struct {
BuildPlanStatsS3 S3Request `json:"BuildPlanStatsS3"`
// The following attributes belong to independent statistics during the `buildPlan` stage, only for analysis reference.
BuildPlanStatsDuration int64 `json:"BuildPlanStatsDuration"` // unit: ns
BuildPlanStatsIOConsumption int64 `json:"BuildPlanStatsIOConsumption"` // unit: ns
BuildPlanResolveVarDuration int64 `json:"BuildPlanResolveVarDuration"` // unit: ns
}

Expand Down Expand Up @@ -514,6 +515,13 @@ func (stats *StatsInfo) AddBuildPlanStatsConsumption(d time.Duration) {
atomic.AddInt64(&stats.PlanStage.BuildPlanStatsDuration, int64(d))
}

func (stats *StatsInfo) AddBuildPlanStatsIOConsumption(d time.Duration) {
if stats == nil {
return
}
atomic.AddInt64(&stats.PlanStage.BuildPlanStatsIOConsumption, int64(d))
}

func (stats *StatsInfo) AddBuildPlanResolveVarConsumption(d time.Duration) {
if stats == nil {
return
Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/disttae/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/query"
pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/queryservice/client"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
)
Expand Down Expand Up @@ -252,6 +254,12 @@ func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool)
return info
}

if _, ok = ctx.Value(perfcounter.CalcTableStatsKey{}).(bool); ok {
stats := statistic.StatsInfoFromContext(ctx)
start := time.Now()
defer stats.AddBuildPlanStatsIOConsumption(time.Since(start))
}

// Get stats info from remote node.
if gs.KeyRouter != nil {
client := gs.engine.qc
Expand Down
Loading