Skip to content

Commit

Permalink
statistics, executor: refactor statistics on columns (#21817)
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate authored Dec 24, 2020
1 parent d337bf4 commit 092a01a
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 55 deletions.
65 changes: 51 additions & 14 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
switch task.taskType {
case colTask:
task.colExec.job = task.job
resultCh <- analyzeColumnsPushdown(task.colExec)
for _, result := range analyzeColumnsPushdown(task.colExec) {
resultCh <- result
}
case idxTask:
task.idxExec.job = task.job
resultCh <- analyzeIndexPushdown(task.idxExec)
Expand Down Expand Up @@ -400,7 +402,7 @@ func (e *AnalyzeIndexExec) buildStats(ranges []*ranger.Range, considerNull bool)
return hist, cms, topN, nil
}

func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult {
func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) []analyzeResult {
var ranges []*ranger.Range
if hc := colExec.handleCols; hc != nil {
if hc.IsInt() {
Expand All @@ -413,7 +415,31 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult {
}
hists, cms, topNs, extStats, err := colExec.buildStats(ranges, true)
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
return []analyzeResult{{Err: err, job: colExec.job}}
}

if hasPkHist(colExec.handleCols) {
PKresult := analyzeResult{
TableID: colExec.tableID,
Hist: hists[:1],
Cms: cms[:1],
TopNs: topNs[:1],
ExtStats: nil,
job: nil,
StatsVer: statistics.Version1,
}
PKresult.Count = int64(PKresult.Hist[0].TotalRowCount())
restResult := analyzeResult{
TableID: colExec.tableID,
Hist: hists[1:],
Cms: cms[1:],
TopNs: topNs[1:],
ExtStats: extStats,
job: colExec.job,
StatsVer: colExec.analyzeVer,
}
restResult.Count = PKresult.Count
return []analyzeResult{PKresult, restResult}
}
result := analyzeResult{
TableID: colExec.tableID,
Expand All @@ -422,14 +448,13 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) analyzeResult {
TopNs: topNs,
ExtStats: extStats,
job: colExec.job,
StatsVer: colExec.ctx.GetSessionVars().AnalyzeVersion,
StatsVer: colExec.analyzeVer,
}
hist := hists[0]
result.Count = hist.NullCount
if hist.Len() > 0 {
result.Count += hist.Buckets[hist.Len()-1].Count
result.Count = int64(result.Hist[0].TotalRowCount())
if result.StatsVer == statistics.Version2 {
result.Count += int64(topNs[0].TotalCount())
}
return result
return []analyzeResult{result}
}

// AnalyzeColumnsExec represents Analyze columns push down executor.
Expand All @@ -444,6 +469,7 @@ type AnalyzeColumnsExec struct {
resultHandler *tableResultHandler
opts map[ast.AnalyzeOptionType]uint64
job *statistics.AnalyzeJob
analyzeVer int
}

func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
Expand Down Expand Up @@ -555,11 +581,14 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
topNs = append(topNs, nil)
}
for i, col := range e.colsInfo {
err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone)
if err != nil {
return nil, nil, nil, nil, err
if e.analyzeVer < 2 {
// In analyze version 2, we don't collect TopN this way. We will collect TopN from samples in `BuildColumnHistAndTopN()` below.
err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone)
if err != nil {
return nil, nil, nil, nil, err
}
topNs = append(topNs, collectors[i].TopN)
}
topNs = append(topNs, collectors[i].TopN)
for j, s := range collectors[i].Samples {
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
Expand All @@ -572,7 +601,15 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
collectors[i].Samples[j].Value.SetBytes(collectors[i].Samples[j].Value.GetBytes())
}
}
hg, err := statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType)
var hg *statistics.Histogram
var err error
var topn *statistics.TopN
if e.analyzeVer < 2 {
hg, err = statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType)
} else {
hg, topn, err = statistics.BuildColumnHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), col.ID, collectors[i], &col.FieldType)
topNs = append(topNs, topn)
}
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down
11 changes: 10 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (s *testSuite1) TestExtractTopN(c *C) {
tblInfo := table.Meta()
tblStats := s.dom.StatsHandle().GetTableStats(tblInfo)
colStats := tblStats.Columns[tblInfo.Columns[1].ID]
c.Assert(len(colStats.TopN.TopN), Equals, 1)
c.Assert(len(colStats.TopN.TopN), Equals, 10)
item := colStats.TopN.TopN[0]
c.Assert(item.Count, Equals, uint64(11))
idxStats := tblStats.Indices[tblInfo.Indices[0].ID]
Expand All @@ -696,6 +696,15 @@ func (s *testSuite1) TestExtractTopN(c *C) {
c.Assert(idxItem.Count, Equals, uint64(11))
// The columns are: DBName, table name, column name, is index, value, count.
tk.MustQuery("show stats_topn").Sort().Check(testkit.Rows("test t b 0 0 11",
"test t b 0 1 1",
"test t b 0 2 1",
"test t b 0 3 1",
"test t b 0 4 1",
"test t b 0 5 1",
"test t b 0 6 1",
"test t b 0 7 1",
"test t b 0 8 1",
"test t b 0 9 1",
"test t index_b 1 0 11",
"test t index_b 1 1 1",
"test t index_b 1 2 1",
Expand Down
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,7 +2020,8 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
Flags: sc.PushDownFlags(),
TimeZoneOffset: offset,
},
opts: opts,
opts: opts,
analyzeVer: b.ctx.GetSessionVars().AnalyzeVersion,
}
depth := int32(opts[ast.AnalyzeOptCMSketchDepth])
width := int32(opts[ast.AnalyzeOptCMSketchWidth])
Expand Down
12 changes: 12 additions & 0 deletions statistics/analyze_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func GetAllAnalyzeJobs() []*AnalyzeJob {

// Start marks status of the analyze job as running and update the start time.
func (job *AnalyzeJob) Start() {
if job == nil {
return
}
job.Mutex.Lock()
job.State = running
now := time.Now()
Expand All @@ -102,6 +105,9 @@ func (job *AnalyzeJob) Start() {

// Update updates the row count of analyze job.
func (job *AnalyzeJob) Update(rowCount int64) {
if job == nil {
return
}
job.Mutex.Lock()
job.RowCount += rowCount
job.updateTime = time.Now()
Expand All @@ -110,6 +116,9 @@ func (job *AnalyzeJob) Update(rowCount int64) {

// Finish update the status of analyze job to finished or failed according to `meetError`.
func (job *AnalyzeJob) Finish(meetError bool) {
if job == nil {
return
}
job.Mutex.Lock()
if meetError {
job.State = failed
Expand All @@ -121,6 +130,9 @@ func (job *AnalyzeJob) Finish(meetError bool) {
}

func (job *AnalyzeJob) getUpdateTime() time.Time {
if job == nil {
return time.Time{}
}
job.Mutex.Lock()
defer job.Mutex.Unlock()
return job.updateTime
Expand Down
Loading

0 comments on commit 092a01a

Please sign in to comment.