Skip to content

Commit

Permalink
planner, executor: support fast analyze in planner and executor's bui…
Browse files Browse the repository at this point in the history
…lder. (#10040)
  • Loading branch information
lzmhhh123 authored and zz-jason committed Apr 10, 2019
1 parent f778a0c commit 5b469e0
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 10 deletions.
80 changes: 80 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"runtime"
"strconv"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -108,12 +111,14 @@ type taskType int
const (
colTask taskType = iota
idxTask
fastTask
)

type analyzeTask struct {
taskType taskType
idxExec *AnalyzeIndexExec
colExec *AnalyzeColumnsExec
fastExec *AnalyzeFastExec
}

var errAnalyzeWorkerPanic = errors.New("analyze worker panic")
Expand All @@ -137,6 +142,10 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
resultCh <- analyzeColumnsPushdown(task.colExec)
case idxTask:
resultCh <- analyzeIndexPushdown(task.idxExec)
case fastTask:
for _, result := range analyzeFastExec(task.fastExec) {
resultCh <- result
}
}
}
}
Expand Down Expand Up @@ -435,3 +444,74 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
}
return hists, cms, nil
}

func analyzeFastExec(exec *AnalyzeFastExec) []statistics.AnalyzeResult {
hists, cms, err := exec.buildStats()
if err != nil {
return []statistics.AnalyzeResult{{Err: err}}
}
var results []statistics.AnalyzeResult
hasIdxInfo := len(exec.idxsInfo)
hasPKInfo := 0
if exec.pkInfo != nil {
hasPKInfo = 1
}
if hasIdxInfo > 0 {
for i := hasPKInfo + len(exec.colsInfo); i < len(hists); i++ {
idxResult := statistics.AnalyzeResult{
PhysicalTableID: exec.PhysicalTableID,
Hist: []*statistics.Histogram{hists[i]},
Cms: []*statistics.CMSketch{cms[i]},
IsIndex: 1,
Count: hists[i].NullCount,
}
if hists[i].Len() > 0 {
idxResult.Count += hists[i].Buckets[hists[i].Len()-1].Count
}
results = append(results, idxResult)
}
}
hist := hists[0]
colResult := statistics.AnalyzeResult{
PhysicalTableID: exec.PhysicalTableID,
Hist: hists[:hasPKInfo+len(exec.colsInfo)],
Cms: cms[:hasPKInfo+len(exec.colsInfo)],
Count: hist.NullCount,
}
if hist.Len() > 0 {
colResult.Count += hist.Buckets[hist.Len()-1].Count
}
results = append(results, colResult)
return results
}

// AnalyzeFastTask is the task for build stats.
type AnalyzeFastTask struct {
Location *tikv.KeyLocation
SampSize uint64
LRowCount uint64
RRowCount uint64
}

// AnalyzeFastExec represents Fast Analyze executor.
type AnalyzeFastExec struct {
ctx sessionctx.Context
PhysicalTableID int64
pkInfo *model.ColumnInfo
colsInfo []*model.ColumnInfo
idxsInfo []*model.IndexInfo
concurrency int
maxNumBuckets uint64
table table.Table
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
sampLocRowCount uint64
tasks chan *AnalyzeFastTask
scanTasks []*tikv.KeyLocation
}

func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*statistics.CMSketch, err error) {
// TODO: do fast analyze.
return nil, nil, nil
}
84 changes: 76 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,25 +1390,93 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
return e
}

func (b *executorBuilder) buildAnalyzeFastColumn(e *AnalyzeExec, task plannercore.AnalyzeColumnsTask, maxNumBuckets uint64) {
findTask := false
for _, eTask := range e.tasks {
if eTask.fastExec.PhysicalTableID == task.PhysicalTableID {
eTask.fastExec.colsInfo = append(eTask.fastExec.colsInfo, task.ColsInfo...)
findTask = true
break
}
}
if !findTask {
var concurrency int
concurrency, b.err = getBuildStatsConcurrency(e.ctx)
if b.err != nil {
return
}
e.tasks = append(e.tasks, &analyzeTask{
taskType: fastTask,
fastExec: &AnalyzeFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: maxNumBuckets,
table: task.Table,
concurrency: concurrency,
},
})
}
}

func (b *executorBuilder) buildAnalyzeFastIndex(e *AnalyzeExec, task plannercore.AnalyzeIndexTask, maxNumBuckets uint64) {
findTask := false
for _, eTask := range e.tasks {
if eTask.fastExec.PhysicalTableID == task.PhysicalTableID {
eTask.fastExec.idxsInfo = append(eTask.fastExec.idxsInfo, task.IndexInfo)
findTask = true
break
}
}
if !findTask {
var concurrency int
concurrency, b.err = getBuildStatsConcurrency(e.ctx)
if b.err != nil {
return
}
e.tasks = append(e.tasks, &analyzeTask{
taskType: fastTask,
fastExec: &AnalyzeFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
idxsInfo: []*model.IndexInfo{task.IndexInfo},
maxNumBuckets: maxNumBuckets,
table: task.Table,
concurrency: concurrency,
},
})
}
}

func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor {
e := &AnalyzeExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
}
enableFastAnalyze := b.ctx.GetSessionVars().EnableFastAnalyze
for _, task := range v.ColTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
if enableFastAnalyze {
b.buildAnalyzeFastColumn(e, task, v.MaxNumBuckets)
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
return nil
}
}
for _, task := range v.IdxTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
if enableFastAnalyze {
b.buildAnalyzeFastIndex(e, task, v.MaxNumBuckets)
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,15 @@ type AnalyzeColumnsTask struct {
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
Table table.Table
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
IndexInfo *model.IndexInfo
Table table.Table
}

// Analyze represents an analyze plan
Expand Down
22 changes: 20 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -771,14 +772,27 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
if err != nil {
return nil, err
}
table, ok := b.is.TableByID(tbl.TableInfo.ID)
if !ok {
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tbl.TableInfo.Name.O)
}
for _, idx := range idxInfo {
for _, id := range physicalIDs {
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{PhysicalTableID: id, IndexInfo: idx})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
PhysicalTableID: id,
IndexInfo: idx,
Table: table,
})
}
}
if len(colInfo) > 0 || pkInfo != nil {
for _, id := range physicalIDs {
p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{PhysicalTableID: id, PKInfo: pkInfo, ColsInfo: colInfo})
p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{
PhysicalTableID: id,
PKInfo: pkInfo,
ColsInfo: colInfo,
Table: table,
})
}
}
}
Expand Down Expand Up @@ -827,6 +841,10 @@ const (
)

func (b *PlanBuilder) buildAnalyze(as *ast.AnalyzeTableStmt) (Plan, error) {
// If enable fast analyze, the storage must be tikv.Storage.
if _, isTikvStorage := b.ctx.GetStore().(tikv.Storage); !isTikvStorage && b.ctx.GetSessionVars().EnableFastAnalyze {
return nil, errors.Errorf("Only support fast analyze in tikv storage.")
}
for _, tbl := range as.TableNames {
user := b.ctx.GetSessionVars().User
var insertErr, selectErr error
Expand Down
11 changes: 11 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -817,6 +818,16 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.SplitRegion = handler.handleSplitRegion(r)
// DebugGetRegionProperties is for fast analyze in mock tikv.
case tikvrpc.CmdDebugGetRegionProperties:
r := req.DebugGetRegionProperties
region, _ := c.Cluster.GetRegionByID(r.RegionId)
scanResp := handler.handleKvScan(&kvrpcpb.ScanRequest{StartKey: region.StartKey, EndKey: region.EndKey})
resp.DebugGetRegionProperties = &debugpb.GetRegionPropertiesResponse{
Props: []*debugpb.Property{{
Name: "num_rows",
Value: string(len(scanResp.Pairs)),
}}}
default:
return nil, errors.Errorf("unsupport this request type %v", req.Type)
}
Expand Down

0 comments on commit 5b469e0

Please sign in to comment.