Skip to content

Commit

Permalink
Merge branch 'master' into extract-agg-prune
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored Sep 27, 2018
2 parents 7d5dd00 + eb0c663 commit 554edf8
Show file tree
Hide file tree
Showing 65 changed files with 848 additions and 223 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ profile.coverprofile
explain_test
cmd/explaintest/explain-test.out
_tools/
*.fail.go
2 changes: 2 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,8 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err
tbInfo.Charset = op.StrValue
case ast.TableOptionCollate:
tbInfo.Collate = op.StrValue
case ast.TableOptionCompression:
tbInfo.Compression = op.StrValue
case ast.TableOptionShardRowID:
if hasAutoIncrementColumn(tbInfo) && op.UintValue != 0 {
return errUnsupportedShardRowIDBits
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.PhysicalTable) (maxR
}
defer terror.Call(result.Close)

chk := chunk.NewChunkWithCapacity(getColumnsTypes(columns), 1)
chk := chunk.New(getColumnsTypes(columns), 1, 1)
err = result.Next(ctx, chk)
if err != nil {
return maxRowID, false, errors.Trace(err)
Expand Down
8 changes: 4 additions & 4 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *testSuite) TestSelectNormal(c *C) {
response.Fetch(context.TODO())

// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *testSuite) TestSelectStreaming(c *C) {
response.Fetch(context.TODO())

// Test Next.
chk := chunk.NewChunkWithCapacity(colTypes, 32)
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
Expand Down Expand Up @@ -259,7 +259,7 @@ func BenchmarkReadRowsData(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)

buffer := populateBuffer()

Expand All @@ -277,7 +277,7 @@ func BenchmarkDecodeToChunk(b *testing.B) {
for i := 0; i < numCols; i++ {
colTypes[i] = &types.FieldType{Tp: mysql.TypeLonglong}
}
chk := chunk.NewChunkWithCapacity(colTypes, numRows)
chk := chunk.New(colTypes, numRows, numRows)

for rowOrdinal := 0; rowOrdinal < numRows; rowOrdinal++ {
for colOrdinal := 0; colOrdinal < numCols; colOrdinal++ {
Expand Down
28 changes: 24 additions & 4 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/ngaut/pools"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -330,14 +331,25 @@ func (do *Domain) Reload() error {
return nil
}

// LogTopNSlowQuery keeps topN recent slow queries in domain.
func (do *Domain) LogTopNSlowQuery(query *SlowQueryInfo) {
// LogSlowQuery keeps topN recent slow queries in domain.
func (do *Domain) LogSlowQuery(query *SlowQueryInfo) {
select {
case do.slowQuery.ch <- query:
default:
}
}

// ShowSlowQuery returns the slow queries.
func (do *Domain) ShowSlowQuery(showSlow *ast.ShowSlow) []*SlowQueryInfo {
msg := &showSlowMessage{
request: showSlow,
}
msg.Add(1)
do.slowQuery.msgCh <- msg
msg.Wait()
return msg.result
}

func (do *Domain) topNSlowQueryLoop() {
defer recoverInDomain("topNSlowQueryLoop", false)
defer do.wg.Done()
Expand All @@ -346,12 +358,20 @@ func (do *Domain) topNSlowQueryLoop() {
for {
select {
case now := <-ticker.C:
do.slowQuery.Refresh(now)
do.slowQuery.RemoveExpired(now)
case info, ok := <-do.slowQuery.ch:
if !ok {
return
}
do.slowQuery.Append(info)
case msg := <-do.slowQuery.msgCh:
req := msg.request
if req.Tp == ast.ShowSlowTop {
msg.result = do.slowQuery.QueryTop(int(req.Count), req.Kind)
} else if req.Tp == ast.ShowSlowRecent {
msg.result = do.slowQuery.QueryRecent(int(req.Count))
}
msg.Done()
}
}
}
Expand Down Expand Up @@ -499,7 +519,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout),
statsLease: statsLease,
infoHandle: infoschema.NewHandle(store),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
}
}

Expand Down
27 changes: 27 additions & 0 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,33 @@ func (*testSuite) TestT(c *C) {
succ = dom.SchemaValidator.Check(ts, schemaVer, nil)
c.Assert(succ, Equals, ResultSucc)

// For slow query.
dom.LogSlowQuery(&SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true})
dom.LogSlowQuery(&SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
dom.LogSlowQuery(&SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
// Collecting slow queries is asynchronous, wait a while to ensure it's done.
time.Sleep(5 * time.Millisecond)

res := dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 2})
c.Assert(res, HasLen, 2)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 2, Kind: ast.ShowSlowKindInternal})
c.Assert(res, HasLen, 1)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true})

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowTop, Count: 4, Kind: ast.ShowSlowKindAll})
c.Assert(res, HasLen, 3)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(*res[2], Equals, SlowQueryInfo{SQL: "aaa", Duration: time.Second, Internal: true})

res = dom.ShowSlowQuery(&ast.ShowSlow{Tp: ast.ShowSlowRecent, Count: 2})
c.Assert(res, HasLen, 2)
c.Assert(*res[0], Equals, SlowQueryInfo{SQL: "ccc", Duration: 2 * time.Second})
c.Assert(*res[1], Equals, SlowQueryInfo{SQL: "bbb", Duration: 3 * time.Second})

err = store.Close()
c.Assert(err, IsNil)
}
107 changes: 98 additions & 9 deletions domain/topn_slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ package domain

import (
"container/heap"
"sort"
"sync"
"time"

"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/util/execdetails"
)

Expand All @@ -40,11 +43,11 @@ func (h *slowQueryHeap) Pop() interface{} {
return x
}

func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) {
func (h *slowQueryHeap) RemoveExpired(now time.Time, period time.Duration) {
// Remove outdated slow query element.
idx := 0
for i := 0; i < len(h.data); i++ {
outdateTime := h.data[i].Start.Add(recent)
outdateTime := h.data[i].Start.Add(period)
if outdateTime.After(now) {
h.data[idx] = h.data[i]
idx++
Expand All @@ -59,28 +62,79 @@ func (h *slowQueryHeap) Refresh(now time.Time, recent time.Duration) {
heap.Init(h)
}

func (h *slowQueryHeap) Query(count int) []*SlowQueryInfo {
// The sorted array still maintains the heap property.
sort.Sort(h)

// The result shoud be in decrease order.
return takeLastN(h.data, count)
}

type slowQueryQueue struct {
data []*SlowQueryInfo
size int
}

func (q *slowQueryQueue) Enqueue(info *SlowQueryInfo) {
if len(q.data) < q.size {
q.data = append(q.data, info)
return
}

q.data = append(q.data, info)[1:]
return
}

func (q *slowQueryQueue) Query(count int) []*SlowQueryInfo {
// Queue is empty.
if len(q.data) == 0 {
return nil
}
return takeLastN(q.data, count)
}

func takeLastN(data []*SlowQueryInfo, count int) []*SlowQueryInfo {
if count > len(data) {
count = len(data)
}
ret := make([]*SlowQueryInfo, 0, count)
for i := len(data) - 1; i >= 0 && len(ret) < count; i-- {
ret = append(ret, data[i])
}
return ret
}

// topNSlowQueries maintains two heaps to store recent slow queries: one for user's and one for internal.
// N = 30, recent = 7 days by default.
// N = 30, period = 7 days by default.
// It also maintains a recent queue, in a FIFO manner.
type topNSlowQueries struct {
recent slowQueryQueue
user slowQueryHeap
internal slowQueryHeap
topN int
recent time.Duration
period time.Duration
ch chan *SlowQueryInfo
msgCh chan *showSlowMessage
}

func newTopNSlowQueries(topN int, recent time.Duration) *topNSlowQueries {
func newTopNSlowQueries(topN int, period time.Duration, queueSize int) *topNSlowQueries {
ret := &topNSlowQueries{
topN: topN,
recent: recent,
period: period,
ch: make(chan *SlowQueryInfo, 1000),
msgCh: make(chan *showSlowMessage, 10),
}
ret.user.data = make([]*SlowQueryInfo, 0, topN)
ret.internal.data = make([]*SlowQueryInfo, 0, topN)
ret.recent.size = queueSize
ret.recent.data = make([]*SlowQueryInfo, 0, queueSize)
return ret
}

func (q *topNSlowQueries) Append(info *SlowQueryInfo) {
// Put into the recent queue.
q.recent.Enqueue(info)

var h *slowQueryHeap
if info.Internal {
h = &q.internal
Expand All @@ -101,9 +155,44 @@ func (q *topNSlowQueries) Append(info *SlowQueryInfo) {
}
}

func (q *topNSlowQueries) Refresh(now time.Time) {
q.user.Refresh(now, q.recent)
q.internal.Refresh(now, q.recent)
func (q *topNSlowQueries) RemoveExpired(now time.Time) {
q.user.RemoveExpired(now, q.period)
q.internal.RemoveExpired(now, q.period)
}

type showSlowMessage struct {
request *ast.ShowSlow
result []*SlowQueryInfo
sync.WaitGroup
}

type queryType int

const (
queryTypeTop queryType = iota
queryTypeRecent
)

func (q *topNSlowQueries) QueryRecent(count int) []*SlowQueryInfo {
return q.recent.Query(count)
}

func (q *topNSlowQueries) QueryTop(count int, kind ast.ShowSlowKind) []*SlowQueryInfo {
var ret []*SlowQueryInfo
switch kind {
case ast.ShowSlowKindDefault:
ret = q.user.Query(count)
case ast.ShowSlowKindInternal:
ret = q.internal.Query(count)
case ast.ShowSlowKindAll:
tmp := make([]*SlowQueryInfo, 0, len(q.user.data)+len(q.internal.data))
tmp = append(tmp, q.user.data...)
tmp = append(tmp, q.internal.data...)
tmp1 := slowQueryHeap{tmp}
sort.Sort(&tmp1)
ret = takeLastN(tmp, count)
}
return ret
}

func (q *topNSlowQueries) Close() {
Expand Down
43 changes: 38 additions & 5 deletions domain/topn_slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var _ = Suite(&testTopNSlowQuerySuite{})
type testTopNSlowQuerySuite struct{}

func (t *testTopNSlowQuerySuite) TestPush(c *C) {
slowQuery := newTopNSlowQueries(10, 0)
slowQuery := newTopNSlowQueries(10, 0, 10)
// Insert data into the heap.
slowQuery.Append(&SlowQueryInfo{Duration: 300 * time.Millisecond})
slowQuery.Append(&SlowQueryInfo{Duration: 400 * time.Millisecond})
Expand Down Expand Up @@ -69,9 +69,9 @@ func (t *testTopNSlowQuerySuite) TestPush(c *C) {
c.Assert(slowQuery.user.data[0].Duration, Equals, 1300*time.Millisecond)
}

func (t *testTopNSlowQuerySuite) TestRefresh(c *C) {
func (t *testTopNSlowQuerySuite) TestRemoveExpired(c *C) {
now := time.Now()
slowQuery := newTopNSlowQueries(6, 3*time.Second)
slowQuery := newTopNSlowQueries(6, 3*time.Second, 10)

slowQuery.Append(&SlowQueryInfo{Start: now, Duration: 6})
slowQuery.Append(&SlowQueryInfo{Start: now.Add(1 * time.Second), Duration: 5})
Expand All @@ -80,7 +80,7 @@ func (t *testTopNSlowQuerySuite) TestRefresh(c *C) {
slowQuery.Append(&SlowQueryInfo{Start: now.Add(4 * time.Second), Duration: 2})
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

slowQuery.Refresh(now.Add(5 * time.Second))
slowQuery.RemoveExpired(now.Add(5 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 2)
c.Assert(slowQuery.user.data[0].Duration, Equals, 2*time.Nanosecond)

Expand All @@ -91,7 +91,7 @@ func (t *testTopNSlowQuerySuite) TestRefresh(c *C) {
c.Assert(len(slowQuery.user.data), Equals, 6)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)

slowQuery.Refresh(now.Add(6 * time.Second))
slowQuery.RemoveExpired(now.Add(6 * time.Second))
c.Assert(len(slowQuery.user.data), Equals, 4)
c.Assert(slowQuery.user.data[0].Duration, Equals, 0*time.Nanosecond)
}
Expand All @@ -108,3 +108,36 @@ func checkHeap(q *slowQueryHeap, c *C) {
}
}
}

func (t *testTopNSlowQuerySuite) TestQueue(c *C) {
q := newTopNSlowQueries(10, time.Minute, 5)
q.Append(&SlowQueryInfo{SQL: "aaa"})
q.Append(&SlowQueryInfo{SQL: "bbb"})
q.Append(&SlowQueryInfo{SQL: "ccc"})

query := q.recent.Query(1)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
query = q.recent.Query(2)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"})
query = q.recent.Query(6)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ccc"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "bbb"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "aaa"})

q.Append(&SlowQueryInfo{SQL: "ddd"})
q.Append(&SlowQueryInfo{SQL: "eee"})
q.Append(&SlowQueryInfo{SQL: "fff"})
q.Append(&SlowQueryInfo{SQL: "ggg"})

query = q.recent.Query(3)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"})
query = q.recent.Query(6)
c.Assert(*query[0], Equals, SlowQueryInfo{SQL: "ggg"})
c.Assert(*query[1], Equals, SlowQueryInfo{SQL: "fff"})
c.Assert(*query[2], Equals, SlowQueryInfo{SQL: "eee"})
c.Assert(*query[3], Equals, SlowQueryInfo{SQL: "ddd"})
c.Assert(*query[4], Equals, SlowQueryInfo{SQL: "ccc"})
}
Loading

0 comments on commit 554edf8

Please sign in to comment.