diff --git a/domain/domain.go b/domain/domain.go index 00ee2b0a89ff8..a2be79fac63f1 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -656,7 +656,10 @@ func (do *Domain) updateStatsWorker(ctx context.Context, owner owner.Manager) { } } case <-deltaUpdateTicker.C: - statsHandle.DumpStatsDeltaToKV() + err := statsHandle.DumpStatsDeltaToKV() + if err != nil { + log.Error("[stats] dump stats delta fail: ", errors.ErrorStack(err)) + } case <-loadHistogramTicker.C: err := statsHandle.LoadNeededHistograms() if err != nil { diff --git a/plan/cbo_test.go b/plan/cbo_test.go index 5c9ff9c7ab2d8..9fa0d1f0f46c5 100644 --- a/plan/cbo_test.go +++ b/plan/cbo_test.go @@ -79,6 +79,7 @@ func (s *testAnalyzeSuite) TestEstimation(c *C) { testKit.MustExec("insert into t select * from t") testKit.MustExec("insert into t select * from t") h := dom.StatsHandle() + h.HandleDDLEvent(<-h.DDLEventCh()) h.DumpStatsDeltaToKV() testKit.MustExec("analyze table t") for i := 1; i <= 8; i++ { @@ -377,6 +378,7 @@ func (s *testAnalyzeSuite) TestOutdatedAnalyze(c *C) { testKit.MustExec(fmt.Sprintf("insert into t values (%d,%d)", i, i)) } h := dom.StatsHandle() + h.HandleDDLEvent(<-h.DDLEventCh()) h.DumpStatsDeltaToKV() testKit.MustExec("analyze table t") testKit.MustExec("insert into t select * from t") diff --git a/statistics/update.go b/statistics/update.go index 3c80bb5ca3a8a..b5775c37901f5 100644 --- a/statistics/update.go +++ b/statistics/update.go @@ -132,7 +132,7 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { } // DumpStatsDeltaToKV sweeps the whole list and updates the global map. Then we dumps every table that held in map to KV. -func (h *Handle) DumpStatsDeltaToKV() { +func (h *Handle) DumpStatsDeltaToKV() error { h.listHead.Lock() for collector := h.listHead.next; collector != nil; collector = collector.next { collector.tryToRemoveFromList() @@ -140,36 +140,40 @@ func (h *Handle) DumpStatsDeltaToKV() { } h.listHead.Unlock() for id, item := range h.globalMap { - err := h.dumpTableStatDeltaToKV(id, item) - if err == nil { + updated, err := h.dumpTableStatDeltaToKV(id, item) + if err != nil { + return errors.Trace(err) + } + if updated { delete(h.globalMap, id) - } else { - log.Warnf("Error happens when updating stats table, the error message is %s.", err.Error()) } } + return nil } // dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version. -func (h *Handle) dumpTableStatDeltaToKV(id int64, delta variable.TableDelta) error { +func (h *Handle) dumpTableStatDeltaToKV(id int64, delta variable.TableDelta) (bool, error) { if delta.Count == 0 { - return nil + return true, nil } goCtx := goctx.TODO() _, err := h.ctx.(sqlexec.SQLExecutor).Execute(goCtx, "begin") if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } - op := "+" + var sql string if delta.Delta < 0 { - op = "-" - delta.Delta = -delta.Delta + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", h.ctx.Txn().StartTS(), -delta.Delta, delta.Count, id, -delta.Delta) + } else { + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", h.ctx.Txn().StartTS(), delta.Delta, delta.Count, id) } - _, err = h.ctx.(sqlexec.SQLExecutor).Execute(goCtx, fmt.Sprintf("update mysql.stats_meta set version = %d, count = count %s %d, modify_count = modify_count + %d where table_id = %d", h.ctx.Txn().StartTS(), op, delta.Delta, delta.Count, id)) + _, err = h.ctx.(sqlexec.SQLExecutor).Execute(goCtx, sql) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } + updated := h.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 _, err = h.ctx.(sqlexec.SQLExecutor).Execute(goCtx, "commit") - return errors.Trace(err) + return updated, errors.Trace(err) } // QueryFeedback is used to represent the query feedback info. It contains the expected scan row count and diff --git a/statistics/update_test.go b/statistics/update_test.go index 474ce2f910689..f00ea08a9daa6 100644 --- a/statistics/update_test.go +++ b/statistics/update_test.go @@ -14,6 +14,7 @@ package statistics_test import ( + "fmt" "time" . "github.com/pingcap/check" @@ -364,3 +365,34 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { feedback := h.GetQueryFeedback() c.Assert(len(feedback), Equals, 0) } + +func (s *testStatsUpdateSuite) TestOutOfOrderUpdate(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + testKit.MustExec("use test") + testKit.MustExec("create table t (a int, b int)") + testKit.MustExec("insert into t values (1,2)") + + do := s.do + is := do.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tableInfo := tbl.Meta() + h := do.StatsHandle() + h.HandleDDLEvent(<-h.DDLEventCh()) + + // Simulate the case that another tidb has inserted some value, but delta info has not been dumped to kv yet. + testKit.MustExec("insert into t values (2,2),(4,5)") + c.Assert(h.DumpStatsDeltaToKV(), IsNil) + testKit.MustExec(fmt.Sprintf("update mysql.stats_meta set count = 1 where table_id = %d", tableInfo.ID)) + + testKit.MustExec("delete from t") + c.Assert(h.DumpStatsDeltaToKV(), IsNil) + testKit.MustQuery("select count from mysql.stats_meta").Check(testkit.Rows("1")) + + // Now another tidb has updated the delta info. + testKit.MustExec(fmt.Sprintf("update mysql.stats_meta set count = 3 where table_id = %d", tableInfo.ID)) + + c.Assert(h.DumpStatsDeltaToKV(), IsNil) + testKit.MustQuery("select count from mysql.stats_meta").Check(testkit.Rows("0")) +}