Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan/statistics: concurrent build columns #2713

Merged
merged 14 commits into from
Mar 22, 2017
Merged
79 changes: 56 additions & 23 deletions plan/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"sort"
"strings"
"sync/atomic"

"github.com/golang/protobuf/proto"
"github.com/juju/errors"
Expand Down Expand Up @@ -394,14 +395,7 @@ func (t *Table) build4SortedColumn(sc *variable.StatementContext, offset int, re
Repeats: make([]int64, 1, bucketCount),
}
var valuesPerBucket, lastNumber, bucketIdx int64 = 1, 0, 0
knowCount := true
if t.Count < 0 {
t.Count = 0
knowCount = false
}
if knowCount {
valuesPerBucket = t.Count/bucketCount + 1
}
count := int64(0)
for {
row, err := records.Next()
if err != nil {
Expand All @@ -424,9 +418,7 @@ func (t *Table) build4SortedColumn(sc *variable.StatementContext, offset int, re
if err != nil {
return errors.Trace(err)
}
if !knowCount {
t.Count++
}
count++
if cmp == 0 {
// The new item has the same value as current bucket value, to ensure that
// a same value only stored in a single bucket, we do not increase bucketIdx even if it exceeds
Expand All @@ -441,10 +433,10 @@ func (t *Table) build4SortedColumn(sc *variable.StatementContext, offset int, re
col.NDV++
} else {
// All buckets are full, we should merge buckets.
if !knowCount && bucketIdx+1 == bucketCount {
if bucketIdx+1 == bucketCount {
col.mergeBuckets(bucketIdx)
valuesPerBucket *= 2
bucketIdx = (bucketIdx + 1) / 2
bucketIdx = bucketIdx / 2
if bucketIdx == 0 {
lastNumber = 0
} else {
Expand All @@ -466,6 +458,7 @@ func (t *Table) build4SortedColumn(sc *variable.StatementContext, offset int, re
col.NDV++
}
}
atomic.StoreInt64(&t.Count, count)
if isPK {
t.Columns[offset] = col
} else {
Expand Down Expand Up @@ -510,6 +503,48 @@ type Builder struct {
PkOffset int // PkOffset is the offset of primary key of integer type in the table.
}

func (b *Builder) buildMultiColumns(t *Table, offsets []int, baseOffset int, isSorted bool, done chan error) {
for i, offset := range offsets {
var err error
if isSorted {
err = t.build4SortedColumn(b.Sc, offset, b.IdxRecords[i+baseOffset], b.NumBuckets, false)
} else {
err = t.buildColumn(b.Sc, offset, b.ColumnSamples[i+baseOffset], b.NumBuckets)
}
if err != nil {
done <- err
return
}
}
done <- nil
}

const buildColumnConcurrency = 8

func (b *Builder) splitAndConcurrentBuild(t *Table, offsets []int, isSorted bool) error {
offsetCnt := len(offsets)
groupSize := (offsetCnt + buildColumnConcurrency - 1) / buildColumnConcurrency
splittedOffsets := make([][]int, 0, buildColumnConcurrency)
for i := 0; i < offsetCnt; i += groupSize {
end := i + groupSize
if end > offsetCnt {
end = offsetCnt
}
splittedOffsets = append(splittedOffsets, offsets[i:end])
}
doneCh := make(chan error, len(splittedOffsets))
for i, offsets := range splittedOffsets {
go b.buildMultiColumns(t, offsets, i*groupSize, isSorted, doneCh)
}
for range splittedOffsets {
err := <-doneCh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If error happened, then this function return, leave doneCh alone.
worker goroutine still waiting for write to doneCh, and would block forever, then goroutine leak .....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tiancaiamao The length of channel is len(splittedOffsets). It will never be blocked. PTAL

if err != nil {
return errors.Trace(err)
}
}
return nil
}

// NewTable creates a table statistics.
func (b *Builder) NewTable() (*Table, error) {
if b.Count == 0 {
Expand All @@ -522,23 +557,21 @@ func (b *Builder) NewTable() (*Table, error) {
Columns: make([]*Column, len(b.TblInfo.Columns)),
Indices: make([]*Column, len(b.TblInfo.Indices)),
}
for i, offset := range b.ColOffsets {
err := t.buildColumn(b.Sc, offset, b.ColumnSamples[i], b.NumBuckets)
if err != nil {
return nil, errors.Trace(err)
}
err := b.splitAndConcurrentBuild(t, b.ColOffsets, false)
if err != nil {
return nil, errors.Trace(err)
}
if b.PkOffset != -1 {
err := t.build4SortedColumn(b.Sc, b.PkOffset, b.PkRecords, b.NumBuckets, true)
if err != nil {
return nil, errors.Trace(err)
}
}
for i, offset := range b.IdxOffsets {
err := t.build4SortedColumn(b.Sc, offset, b.IdxRecords[i], b.NumBuckets, false)
if err != nil {
return nil, errors.Trace(err)
}
err = b.splitAndConcurrentBuild(t, b.IdxOffsets, true)
if err != nil {
return nil, errors.Trace(err)
}
for _, offset := range b.IdxOffsets {
if len(b.TblInfo.Indices[offset].Columns) == 1 {
for j, col := range b.TblInfo.Columns {
if col.Name.L == b.TblInfo.Indices[offset].Columns[0].Name.L && t.Columns[j] == nil {
Expand Down
8 changes: 4 additions & 4 deletions plan/statistics/statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,21 @@ func (s *testStatisticsSuite) TestTable(c *C) {
c.Check(count, Equals, int64(1))
count, err = col.LessRowCount(sc, types.NewIntDatum(20000))
c.Check(err, IsNil)
c.Check(count, Equals, int64(19980))
c.Check(count, Equals, int64(19984))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the test result be changed ?

Copy link
Contributor Author

@alivxxx alivxxx Mar 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there was a bug when caculate the bucketIdx after merge buckets.

count, err = col.BetweenRowCount(sc, types.NewIntDatum(30000), types.NewIntDatum(35000))
c.Check(err, IsNil)
c.Check(count, Equals, int64(4696))
c.Check(count, Equals, int64(4618))

col = t.Columns[2]
count, err = col.EqualRowCount(sc, types.NewIntDatum(10000))
c.Check(err, IsNil)
c.Check(count, Equals, int64(1))
count, err = col.LessRowCount(sc, types.NewIntDatum(20000))
c.Check(err, IsNil)
c.Check(count, Equals, int64(20136))
c.Check(count, Equals, int64(20224))
count, err = col.BetweenRowCount(sc, types.NewIntDatum(30000), types.NewIntDatum(35000))
c.Check(err, IsNil)
c.Check(count, Equals, int64(5083))
c.Check(count, Equals, int64(5120))

str := t.String()
c.Check(len(str), Greater, 0)
Expand Down