Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support building stats for fast analyze. #10258

Merged
merged 16 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -160,8 +161,8 @@ var errAnalyzeWorkerPanic = errors.New("analyze worker panic")
func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- analyzeResult, isCloseChanThread bool) {
defer func() {
e.wg.Done()
e.wg.Wait()
if isCloseChanThread {
e.wg.Wait()
close(resultCh)
}
if r := recover(); r != nil {
Expand Down Expand Up @@ -190,6 +191,8 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
task.job.Start()
resultCh <- analyzeIndexPushdown(task.idxExec)
case fastTask:
task.fastExec.job = task.job
task.job.Start()
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
for _, result := range analyzeFastExec(task.fastExec) {
resultCh <- result
}
Expand Down Expand Up @@ -507,7 +510,7 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
hists, cms, err := exec.buildStats()
if err != nil {
return []analyzeResult{{Err: err}}
return []analyzeResult{{Err: err, job: exec.job}}
}
var results []analyzeResult
hasPKInfo := 0
Expand All @@ -522,6 +525,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Cms: []*statistics.CMSketch{cms[i]},
IsIndex: 1,
Count: hists[i].NullCount,
job: exec.job,
}
if hists[i].Len() > 0 {
idxResult.Count += hists[i].Buckets[hists[i].Len()-1].Count
Expand All @@ -535,6 +539,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Hist: hists[:hasPKInfo+len(exec.colsInfo)],
Cms: cms[:hasPKInfo+len(exec.colsInfo)],
Count: hist.NullCount,
job: exec.job,
}
if hist.Len() > 0 {
colResult.Count += hist.Buckets[hist.Len()-1].Count
Expand All @@ -560,6 +565,7 @@ type AnalyzeFastExec struct {
idxsInfo []*model.IndexInfo
concurrency int
maxNumBuckets uint64
tblInfo *model.TableInfo
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
Expand All @@ -569,6 +575,7 @@ type AnalyzeFastExec struct {
scanTasks []*tikv.KeyLocation
collectors []*statistics.SampleCollector
randSeed int64
job *statistics.AnalyzeJob
}

func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild *bool, err *error, sampTasks *[]*AnalyzeFastTask) {
Expand Down Expand Up @@ -627,12 +634,12 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
}
}

// buildSampTask return tow variable, the first bool is whether the task meeting region error
// buildSampTask returns tow variables, the first bool is whether the task meets region error
// and need to rebuild.
func (e *AnalyzeFastExec) buildSampTask() (needRebuild bool, err error) {
// Do get regions row count.
bo := tikv.NewBackoffer(context.Background(), 500)
atomic.StoreUint64(&e.rowCount, 0)
e.rowCount = 0
needRebuildForRoutine := make([]bool, e.concurrency)
errs := make([]error, e.concurrency)
sampTasksForRoutine := make([][]*AnalyzeFastTask, e.concurrency)
Expand Down Expand Up @@ -721,6 +728,11 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if err != nil {
return err
}
var rowID int64
rowID, err = tablecodec.DecodeRowKey(sKey)
if err != nil {
return err
}
// Update the primary key collector.
if hasPKInfo > 0 {
v, ok := values[e.pkInfo.ID]
Expand All @@ -735,7 +747,7 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if e.collectors[0].Samples[samplePos] == nil {
e.collectors[0].Samples[samplePos] = &statistics.SampleItem{}
}
e.collectors[0].Samples[samplePos].Ordinal = int(samplePos)
e.collectors[0].Samples[samplePos].RowID = rowID
e.collectors[0].Samples[samplePos].Value = v
}
// Update the columns' collectors.
Expand All @@ -747,7 +759,7 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if e.collectors[hasPKInfo+j].Samples[samplePos] == nil {
e.collectors[hasPKInfo+j].Samples[samplePos] = &statistics.SampleItem{}
}
e.collectors[hasPKInfo+j].Samples[samplePos].Ordinal = int(samplePos)
e.collectors[hasPKInfo+j].Samples[samplePos].RowID = rowID
e.collectors[hasPKInfo+j].Samples[samplePos].Value = v
}
// Update the indexes' collectors.
Expand All @@ -773,7 +785,7 @@ func (e *AnalyzeFastExec) updateCollectorSamples(sValue []byte, sKey kv.Key, sam
if e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos] == nil {
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos] = &statistics.SampleItem{}
}
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos].Ordinal = int(samplePos)
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos].RowID = rowID
e.collectors[len(e.colsInfo)+hasPKInfo+j].Samples[samplePos].Value = types.NewBytesDatum(bytes)
}
return nil
Expand Down Expand Up @@ -844,9 +856,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) error {
}

func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *error) {
defer func() {
e.wg.Done()
}()
defer e.wg.Done()
var snapshot kv.Snapshot
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
Expand All @@ -869,9 +879,6 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
if *err != nil {
return
}
if maxRowID <= minRowID {
continue
}

keys := make([]kv.Key, 0, task.SampSize)
for i := 0; i < int(task.SampSize); i++ {
Expand All @@ -897,8 +904,37 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
}

func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollector, tp *types.FieldType) (*statistics.Histogram, error) {
// TODO: build histogram and cmsketch here for one collector.
return nil, nil
// build collector properties.
collector.Samples = collector.Samples[:e.sampCursor]
sort.Slice(collector.Samples, func(i, j int) bool { return collector.Samples[i].RowID < collector.Samples[j].RowID })
collector.CalcTotalSize()
data := make([][]byte, 0, len(collector.Samples))
for i, sample := range collector.Samples {
sample.Ordinal = i
if sample.Value.IsNull() {
collector.NullCount++
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
continue
}
bytes, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, sample.Value)
if err != nil {
return nil, err
}
data = append(data, bytes)
}
stats := domain.GetDomain(e.ctx).StatsHandle()
rowCount := int64(e.rowCount)
if stats.Lease > 0 {
rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount)
}
// build CMSketch
var ndv, scaleRatio uint64
collector.CMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make 20 a constant variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We keep it constant now. And modify it in the future.

// build Histogram
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.maxNumBuckets), ID, collector, tp, rowCount, int64(ndv), int64(scaleRatio))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return hist, nil
}

func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, error) {
Expand Down Expand Up @@ -974,6 +1010,8 @@ func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*st
return nil, nil, errors.Errorf(errMsg, maxBuildTimes)
}

defer e.job.Update(int64(e.rowCount))

// If total row count of the table is smaller than 2*MaxSampleSize, we
// translate all the sample tasks to scan tasks.
if e.rowCount < uint64(MaxSampleSize)*2 {
Expand Down Expand Up @@ -1009,6 +1047,7 @@ type AnalyzeTestFastExec struct {
IdxsInfo []*model.IndexInfo
Concurrency int
Collectors []*statistics.SampleCollector
TblInfo *model.TableInfo
}

// TestFastSample only test the fast sample in unit test.
Expand All @@ -1020,6 +1059,8 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {
e.concurrency = e.Concurrency
e.physicalTableID = e.PhysicalTableID
e.wg = &sync.WaitGroup{}
e.job = &statistics.AnalyzeJob{}
e.tblInfo = e.TblInfo
_, _, err := e.buildStats()
e.Collectors = e.collectors
return err
Expand Down
85 changes: 84 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
IdxsInfo: indicesInfo,
Concurrency: 1,
PhysicalTableID: tbl.(table.PhysicalTable).GetPhysicalID(),
TblInfo: tblInfo,
}
err = mockExec.TestFastSample()
c.Assert(err, IsNil)
Expand All @@ -197,5 +198,87 @@ func (s *testSuite1) TestAnalyzeFastSample(c *C) {
vals[i] = append(vals[i], s)
}
}
c.Assert(fmt.Sprintln(vals), Equals, "[[0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54] [0 34 35 57 4 24 6 25 58 9 10 11 12 30 14 52 29 17 44 54]]\n")
c.Assert(fmt.Sprintln(vals), Equals, "[[0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58] [0 4 6 9 10 11 12 14 17 24 25 29 30 34 35 44 52 54 57 58]]\n")
}

func (s *testSuite1) TestFastAnalyze(c *C) {
cluster := mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(cluster)
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(cluster),
)
c.Assert(err, IsNil)
var dom *domain.Domain
dom, err = session.BootstrapSession(store)
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
executor.MaxSampleSize = 1000
executor.RandSeed = 123

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, index index_b(b))")
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
for i := 0; i < 3000; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i))
}
tblInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tid := tblInfo.Meta().ID

// construct 5 regions split by {600, 1200, 1800, 2400}
splitKeys := generateTableSplitKeyForInt(tid, []int{600, 1200, 1800, 2400})
manipulateCluster(cluster, splitKeys)

tk.MustExec("analyze table t with 5 buckets")

is := executor.GetInfoSchema(tk.Se.(sessionctx.Context))
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := table.Meta()
tbl := dom.StatsHandle().GetTableStats(tableInfo)
sTbl := fmt.Sprintln(tbl)
matched := false
if sTbl == "Table:37 Count:3001\n"+
"column:1 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:2 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" ||
sTbl == "Table:37 Count:3001\n"+
"column:2 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"column:1 ndv:3000 totColSize:0\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n"+
"index:1 ndv:3000\n"+
"num: 603 lower_bound: 6 upper_bound: 612 repeats: 1\n"+
"num: 603 lower_bound: 621 upper_bound: 1205 repeats: 1\n"+
"num: 603 lower_bound: 1207 upper_bound: 1830 repeats: 1\n"+
"num: 603 lower_bound: 1831 upper_bound: 2387 repeats: 1\n"+
"num: 588 lower_bound: 2390 upper_bound: 2997 repeats: 1\n" {
matched = true
}
c.Assert(matched, Equals, true)
}
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,9 +1435,11 @@ func (b *executorBuilder) buildAnalyzeFastColumn(e *AnalyzeExec, task plannercor
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "fast analyze columns"},
})
}
}
Expand All @@ -1464,9 +1466,11 @@ func (b *executorBuilder) buildAnalyzeFastIndex(e *AnalyzeExec, task plannercore
physicalTableID: task.PhysicalTableID,
idxsInfo: []*model.IndexInfo{task.IndexInfo},
maxNumBuckets: maxNumBuckets,
tblInfo: task.TblInfo,
concurrency: concurrency,
wg: &sync.WaitGroup{},
},
job: &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: "fast analyze index " + task.IndexInfo.Name.O},
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,20 @@ type analyzeInfo struct {
PartitionName string
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
}

// AnalyzeColumnsTask is used for analyze columns.
type AnalyzeColumnsTask struct {
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
TblInfo *model.TableInfo
analyzeInfo
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
IndexInfo *model.IndexInfo
TblInfo *model.TableInfo
analyzeInfo
}

Expand Down
6 changes: 4 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
IndexInfo: idx,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -822,6 +823,7 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
PKInfo: pkInfo,
ColsInfo: colInfo,
analyzeInfo: info,
TblInfo: tbl.TableInfo,
})
}
}
Expand All @@ -843,7 +845,7 @@ func (b *PlanBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) (Plan, error)
}
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
return p, nil
Expand All @@ -860,7 +862,7 @@ func (b *PlanBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt) (Plan, erro
if idx.State == model.StatePublic {
for i, id := range physicalIDs {
info := analyzeInfo{DBName: as.TableNames[0].Schema.O, TableName: as.TableNames[0].Name.O, PartitionName: names[i], PhysicalTableID: id}
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{IndexInfo: idx, analyzeInfo: info, TblInfo: tblInfo})
}
}
}
Expand Down
Loading