From 74fd2a1a509d9901fb6a607332ba43783a127c6b Mon Sep 17 00:00:00 2001 From: qingxinhome <70939751+qingxinhome@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:30:48 +0800 Subject: [PATCH 1/2] Optimize analyze code 2.0 dev (#20638) Optimize and enhance the analyzer module Approved by: @m-schen, @badboynt1, @ouyuanning, @XuPeng-SH, @aunjgr, @sukki37 --- pkg/sql/compile/analyze_module.go | 12 ------------ pkg/sql/models/phy_plan.go | 10 ++++------ pkg/sql/models/phy_plan_test.go | 8 +++----- pkg/sql/models/show_phyplan_test.go | 2 -- pkg/vm/process/operator_analyzer.go | 2 ++ 5 files changed, 9 insertions(+), 25 deletions(-) diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index f9c8eabd0be4d..e4696a3e0fa1f 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -427,21 +427,9 @@ func (c *Compile) GenPhyPlan(runC *Compile) { } } - // record the number of local cn s3 requests - c.anal.phyPlan.S3IOInputCount += runC.counterSet.FileService.S3.Put.Load() - c.anal.phyPlan.S3IOInputCount += runC.counterSet.FileService.S3.List.Load() - - c.anal.phyPlan.S3IOOutputCount += runC.counterSet.FileService.S3.Head.Load() - c.anal.phyPlan.S3IOOutputCount += runC.counterSet.FileService.S3.Get.Load() - c.anal.phyPlan.S3IOOutputCount += runC.counterSet.FileService.S3.Delete.Load() - c.anal.phyPlan.S3IOOutputCount += runC.counterSet.FileService.S3.DeleteMulti.Load() - //------------------------------------------------------------------------------------------- - // record the number of remote cn s3 requests for _, remotePhy := range runC.anal.remotePhyPlans { c.anal.phyPlan.RemoteScope = append(c.anal.phyPlan.RemoteScope, remotePhy.LocalScope[0]) - c.anal.phyPlan.S3IOInputCount += remotePhy.S3IOInputCount - c.anal.phyPlan.S3IOOutputCount += remotePhy.S3IOOutputCount } } diff --git a/pkg/sql/models/phy_plan.go b/pkg/sql/models/phy_plan.go index 57b79e64e7e08..dadf3ddca89d4 100644 --- a/pkg/sql/models/phy_plan.go +++ b/pkg/sql/models/phy_plan.go @@ -21,12 +21,10 @@ import ( ) type PhyPlan struct { - Version string `json:"version"` - RetryTime int `json:"retryTime,omitempty"` - LocalScope []PhyScope `json:"scope,omitempty"` - RemoteScope []PhyScope `json:"RemoteScope,omitempty"` - S3IOInputCount int64 `json:"S3IOInputCount,omitempty"` - S3IOOutputCount int64 `json:"S3IOOutputCount,omitempty"` + Version string `json:"version"` + RetryTime int `json:"retryTime,omitempty"` + LocalScope []PhyScope `json:"scope,omitempty"` + RemoteScope []PhyScope `json:"RemoteScope,omitempty"` } type PhyScope struct { diff --git a/pkg/sql/models/phy_plan_test.go b/pkg/sql/models/phy_plan_test.go index 26e28207942ed..d02a25d30be71 100644 --- a/pkg/sql/models/phy_plan_test.go +++ b/pkg/sql/models/phy_plan_test.go @@ -209,11 +209,9 @@ func TestPhyPlanJSON(t *testing.T) { //------------------------------------------------------------------------------------------------------------------ phyPlan := &PhyPlan{ - Version: "1.0.0", - LocalScope: []PhyScope{phyScope1}, - RemoteScope: []PhyScope{phyScope1}, - S3IOInputCount: 5, - S3IOOutputCount: 0, + Version: "1.0.0", + LocalScope: []PhyScope{phyScope1}, + RemoteScope: []PhyScope{phyScope1}, } // Convert to JSON diff --git a/pkg/sql/models/show_phyplan_test.go b/pkg/sql/models/show_phyplan_test.go index 6074778458e7b..7129955b5b5ce 100644 --- a/pkg/sql/models/show_phyplan_test.go +++ b/pkg/sql/models/show_phyplan_test.go @@ -176,8 +176,6 @@ func TestExplainPhyPlan(t *testing.T) { phyPlan := NewPhyPlan() phyPlan.LocalScope = []PhyScope{phyScope1} phyPlan.RemoteScope = []PhyScope{phyScope1} - phyPlan.S3IOInputCount = 5 - phyPlan.S3IOOutputCount = 0 statsInfo := new(statistic.StatsInfo) statsInfo.ParseStage.ParseDuration = 72872 diff --git a/pkg/vm/process/operator_analyzer.go b/pkg/vm/process/operator_analyzer.go index 83c5c97d3b943..c207aac49fcd1 100644 --- a/pkg/vm/process/operator_analyzer.go +++ b/pkg/vm/process/operator_analyzer.go @@ -160,6 +160,7 @@ func (opAlyzr *operatorAnalyzer) InputBlock() { opAlyzr.opStats.InputBlocks += 1 } +// If the operator input batch is First, then the InputSize and InputRows will be counted func (opAlyzr *operatorAnalyzer) Input(bat *batch.Batch) { if opAlyzr.opStats == nil { panic("operatorAnalyzer.Input: operatorAnalyzer.opStats is nil") @@ -171,6 +172,7 @@ func (opAlyzr *operatorAnalyzer) Input(bat *batch.Batch) { } } +// If the operator input batch is Last, then the OutputSize and OutputRows will be counted func (opAlyzr *operatorAnalyzer) Output(bat *batch.Batch) { if opAlyzr.opStats == nil { panic("operatorAnalyzer.Output: operatorAnalyzer.opStats is nil") From 880d78c51e5fe9145ece0e71392432e83a9bb988 Mon Sep 17 00:00:00 2001 From: nitao Date: Wed, 11 Dec 2024 18:22:08 +0800 Subject: [PATCH 2/2] cp to 2.0 "optimize stats for runtime filters when table not flushed" (#20719) cp to 2.0 "optimize stats for runtime filters when table not flushed" Approved by: @ouyuanning, @sukki37 --- pkg/sql/plan/runtime_filter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/sql/plan/runtime_filter.go b/pkg/sql/plan/runtime_filter.go index fdfddfc0e5266..7c46884d336ae 100644 --- a/pkg/sql/plan/runtime_filter.go +++ b/pkg/sql/plan/runtime_filter.go @@ -163,7 +163,11 @@ func (builder *QueryBuilder) generateRuntimeFilters(nodeID int32) { sortOrder := GetSortOrder(tableDef, probeCol.ColPos) if node.JoinType != plan.Node_INDEX { probeNdv := getExprNdv(probeExprs[0], builder) - if probeNdv == -1 || node.Stats.HashmapStats.HashmapSize/probeNdv >= 0.1 { + if probeNdv <= 1 { + //maybe not flushed yet, set at least 100 to continue calculation + probeNdv = 100 + } + if node.Stats.HashmapStats.HashmapSize/probeNdv >= 0.1 { return } if sortOrder != 0 {