Skip to content

Commit

Permalink
stats: refine the behavior when updating stats delta (pingcap#5647)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and Haibin Xie committed Feb 5, 2018
1 parent 2f8b1d2 commit 5b3402d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
5 changes: 4 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,10 @@ func (do *Domain) updateStatsWorker(ctx context.Context, lease time.Duration) {
}
}
case <-deltaUpdateTicker.C:
statsHandle.DumpStatsDeltaToKV()
err := statsHandle.DumpStatsDeltaToKV()
if err != nil {
log.Error("[stats] dump stats delta fail: ", errors.ErrorStack(err))
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions plan/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (s *testAnalyzeSuite) TestOutdatedAnalyze(c *C) {
testKit.MustExec(fmt.Sprintf("insert into t values (%d,%d)", i, i))
}
h := dom.StatsHandle()
h.HandleDDLEvent(<-h.DDLEventCh())
h.DumpStatsDeltaToKV()
testKit.MustExec("analyze table t")
testKit.MustExec("insert into t select * from t")
Expand Down
32 changes: 18 additions & 14 deletions statistics/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,43 +102,47 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map. Then we dumps every table that held in map to KV.
func (h *Handle) DumpStatsDeltaToKV() {
func (h *Handle) DumpStatsDeltaToKV() error {
h.listHead.Lock()
for collector := h.listHead.next; collector != nil; collector = collector.next {
collector.tryToRemoveFromList()
h.globalMap.merge(collector)
}
h.listHead.Unlock()
for id, item := range h.globalMap {
err := h.dumpTableStatDeltaToKV(id, item)
if err == nil {
updated, err := h.dumpTableStatDeltaToKV(id, item)
if err != nil {
return errors.Trace(err)
}
if updated {
delete(h.globalMap, id)
} else {
log.Warnf("Error happens when updating stats table, the error message is %s.", err.Error())
}
}
return nil
}

// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version.
func (h *Handle) dumpTableStatDeltaToKV(id int64, delta variable.TableDelta) error {
func (h *Handle) dumpTableStatDeltaToKV(id int64, delta variable.TableDelta) (bool, error) {
if delta.Count == 0 {
return nil
return true, nil
}
_, err := h.ctx.(sqlexec.SQLExecutor).Execute("begin")
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
op := "+"
var sql string
if delta.Delta < 0 {
op = "-"
delta.Delta = -delta.Delta
sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", h.ctx.Txn().StartTS(), -delta.Delta, delta.Count, id, -delta.Delta)
} else {
sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", h.ctx.Txn().StartTS(), delta.Delta, delta.Count, id)
}
_, err = h.ctx.(sqlexec.SQLExecutor).Execute(fmt.Sprintf("update mysql.stats_meta set version = %d, count = count %s %d, modify_count = modify_count + %d where table_id = %d", h.ctx.Txn().StartTS(), op, delta.Delta, delta.Count, id))
_, err = h.ctx.(sqlexec.SQLExecutor).Execute(sql)
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}
updated := h.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0
_, err = h.ctx.(sqlexec.SQLExecutor).Execute("commit")
return errors.Trace(err)
return updated, errors.Trace(err)
}

const (
Expand Down
33 changes: 33 additions & 0 deletions statistics/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package statistics_test

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -305,3 +307,34 @@ func (s *testStatsUpdateSuite) TestAutoUpdate(c *C) {
c.Assert(hg.NDV, Equals, int64(1))
c.Assert(len(hg.Buckets), Equals, 1)
}

func (s *testStatsUpdateSuite) TestOutOfOrderUpdate(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")
testKit.MustExec("create table t (a int, b int)")
testKit.MustExec("insert into t values (1,2)")

do := s.do
is := do.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := tbl.Meta()
h := do.StatsHandle()
h.HandleDDLEvent(<-h.DDLEventCh())

// Simulate the case that another tidb has inserted some value, but delta info has not been dumped to kv yet.
testKit.MustExec("insert into t values (2,2),(4,5)")
c.Assert(h.DumpStatsDeltaToKV(), IsNil)
testKit.MustExec(fmt.Sprintf("update mysql.stats_meta set count = 1 where table_id = %d", tableInfo.ID))

testKit.MustExec("delete from t")
c.Assert(h.DumpStatsDeltaToKV(), IsNil)
testKit.MustQuery("select count from mysql.stats_meta").Check(testkit.Rows("1"))

// Now another tidb has updated the delta info.
testKit.MustExec(fmt.Sprintf("update mysql.stats_meta set count = 3 where table_id = %d", tableInfo.ID))

c.Assert(h.DumpStatsDeltaToKV(), IsNil)
testKit.MustQuery("select count from mysql.stats_meta").Check(testkit.Rows("0"))
}

0 comments on commit 5b3402d

Please sign in to comment.