From 5099a1bcb738aae8058e7907f3681d376ab1b474 Mon Sep 17 00:00:00 2001 From: xuyifan <675434007@qq.com> Date: Wed, 8 Dec 2021 21:37:31 +0800 Subject: [PATCH] add mutex for globalMap and feedback --- statistics/feedback.go | 19 +++++++ statistics/handle/handle.go | 29 +++++++--- statistics/handle/update.go | 82 +++++++++++++++------------ statistics/handle/update_list_test.go | 2 +- 4 files changed, 87 insertions(+), 45 deletions(-) diff --git a/statistics/feedback.go b/statistics/feedback.go index 88824afae0e08..12e986f95bff5 100644 --- a/statistics/feedback.go +++ b/statistics/feedback.go @@ -130,6 +130,25 @@ func (m *QueryFeedbackMap) append(k QueryFeedbackKey, qs []*QueryFeedback) bool return true } +// SiftFeedbacks eliminates feedbacks which are overlapped with others. It is a tradeoff between +// feedback accuracy and its overhead. +func (m *QueryFeedbackMap) SiftFeedbacks() { + sc := &stmtctx.StatementContext{TimeZone: time.UTC} + for k, qs := range m.Feedbacks { + fbs := make([]Feedback, 0, len(qs)*2) + for _, q := range qs { + fbs = append(fbs, q.Feedback...) + } + if len(fbs) == 0 { + delete(m.Feedbacks, k) + continue + } + m.Feedbacks[k] = m.Feedbacks[k][:1] + m.Feedbacks[k][0].Feedback, _ = NonOverlappedFeedbacks(sc, fbs) + } + m.Size = len(m.Feedbacks) +} + // Merge combines 2 collections of feedbacks. func (m *QueryFeedbackMap) Merge(r *QueryFeedbackMap) { for k, qs := range r.Feedbacks { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 872cdc3daf16f..15d90c6ed5204 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -99,9 +99,15 @@ type Handle struct { // listHead contains all the stats collector required by session. listHead *SessionStatsCollector // globalMap contains all the delta map from collectors when we dump them to KV. - globalMap tableDeltaMap + globalMap struct { + sync.Mutex + data tableDeltaMap + } // feedback is used to store query feedback info. - feedback *statistics.QueryFeedbackMap + feedback struct { + sync.Mutex + data *statistics.QueryFeedbackMap + } lease atomic2.Duration @@ -159,6 +165,7 @@ func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string, // Clear the statsCache, only for test. func (h *Handle) Clear() { + // TODO: Here h.mu seems to protect all the fields of Handle. Is is reasonable? h.mu.Lock() h.statsCache.Lock() h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) @@ -167,13 +174,17 @@ func (h *Handle) Clear() { for len(h.ddlEventCh) > 0 { <-h.ddlEventCh } - h.feedback = statistics.NewQueryFeedbackMap() + h.feedback.Lock() + h.feedback.data = statistics.NewQueryFeedbackMap() + h.feedback.Unlock() h.mu.ctx.GetSessionVars().InitChunkSize = 1 h.mu.ctx.GetSessionVars().MaxChunkSize = 1 h.mu.ctx.GetSessionVars().EnableChunkRPC = false h.mu.ctx.GetSessionVars().SetProjectionConcurrency(0) h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)} - h.globalMap = make(tableDeltaMap) + h.globalMap.Lock() + h.globalMap.data = make(tableDeltaMap) + h.globalMap.Unlock() h.mu.rateMap = make(errorRateDeltaMap) h.mu.Unlock() } @@ -188,8 +199,6 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (* handle := &Handle{ ddlEventCh: make(chan *util.Event, 100), listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, - globalMap: make(tableDeltaMap), - feedback: statistics.NewQueryFeedbackMap(), idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)}, pool: pool, } @@ -199,6 +208,8 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (* handle.mu.ctx = ctx handle.mu.rateMap = make(errorRateDeltaMap) handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) + handle.globalMap.data = make(tableDeltaMap) + handle.feedback.data = statistics.NewQueryFeedbackMap() err := handle.RefreshVars() if err != nil { return nil, err @@ -218,10 +229,12 @@ func (h *Handle) SetLease(lease time.Duration) { // GetQueryFeedback gets the query feedback. It is only used in test. func (h *Handle) GetQueryFeedback() *statistics.QueryFeedbackMap { + h.feedback.Lock() defer func() { - h.feedback = statistics.NewQueryFeedbackMap() + h.feedback.data = statistics.NewQueryFeedbackMap() + h.feedback.Unlock() }() - return h.feedback + return h.feedback.data } // DurationToTS converts duration to timestamp. diff --git a/statistics/handle/update.go b/statistics/handle/update.go index e8672794a2509..e154755b5cc8d 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -64,6 +64,12 @@ func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[i m[id] = item } +func (m tableDeltaMap) merge(deltaMap tableDeltaMap) { + for id, item := range deltaMap { + m.update(id, item.Delta, item.Count, &item.ColSize) + } +} + type errorRateDelta struct { PkID int64 PkErrorRate *statistics.ErrorRate @@ -125,14 +131,12 @@ func (m errorRateDeltaMap) clear(tableID int64, histID int64, isIndex bool) { m[tableID] = item } -func (h *Handle) merge(s *SessionStatsCollector, rateMap errorRateDeltaMap) { - for id, item := range s.mapper { - h.globalMap.update(id, item.Delta, item.Count, &item.ColSize) - } +func merge(s *SessionStatsCollector, deltaMap tableDeltaMap, rateMap errorRateDeltaMap, feedback *statistics.QueryFeedbackMap) { + deltaMap.merge(s.mapper) s.mapper = make(tableDeltaMap) rateMap.merge(s.rateMap) s.rateMap = make(errorRateDeltaMap) - h.feedback.Merge(s.feedback) + feedback.Merge(s.feedback) s.feedback = statistics.NewQueryFeedbackMap() } @@ -375,13 +379,15 @@ const ( // sweepList will loop over the list, merge each session's local stats into handle // and remove closed session's collector. func (h *Handle) sweepList() { + deltaMap := make(tableDeltaMap) + errorRateMap := make(errorRateDeltaMap) + feedback := statistics.NewQueryFeedbackMap() prev := h.listHead prev.Lock() - errorRateMap := make(errorRateDeltaMap) for curr := prev.next; curr != nil; curr = curr.next { curr.Lock() - // Merge the session stats into handle and error rate map. - h.merge(curr, errorRateMap) + // Merge the session stats into deltaMap, errorRateMap and feedback respectively. + merge(curr, deltaMap, errorRateMap, feedback) if curr.deleted { prev.next = curr.next // Since the session is already closed, we can safely unlock it here. @@ -393,37 +399,34 @@ func (h *Handle) sweepList() { } } prev.Unlock() + h.globalMap.Lock() + h.globalMap.data.merge(deltaMap) + h.globalMap.Unlock() h.mu.Lock() h.mu.rateMap.merge(errorRateMap) h.mu.Unlock() - h.siftFeedbacks() -} - -// siftFeedbacks eliminates feedbacks which are overlapped with others. It is a tradeoff between -// feedback accuracy and its overhead. -func (h *Handle) siftFeedbacks() { - sc := &stmtctx.StatementContext{TimeZone: time.UTC} - for k, qs := range h.feedback.Feedbacks { - fbs := make([]statistics.Feedback, 0, len(qs)*2) - for _, q := range qs { - fbs = append(fbs, q.Feedback...) - } - if len(fbs) == 0 { - delete(h.feedback.Feedbacks, k) - continue - } - h.feedback.Feedbacks[k] = h.feedback.Feedbacks[k][:1] - h.feedback.Feedbacks[k][0].Feedback, _ = statistics.NonOverlappedFeedbacks(sc, fbs) - } - h.feedback.Size = len(h.feedback.Feedbacks) + h.feedback.Lock() + h.feedback.data.Merge(feedback) + h.feedback.data.SiftFeedbacks() + h.feedback.Unlock() } // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { h.sweepList() + h.globalMap.Lock() + deltaMap := h.globalMap.data + h.globalMap.data = make(tableDeltaMap) + h.globalMap.Unlock() + defer func() { + h.globalMap.Lock() + deltaMap.merge(h.globalMap.data) + h.globalMap.data = deltaMap + h.globalMap.Unlock() + }() currentTime := time.Now() - for id, item := range h.globalMap { + for id, item := range deltaMap { if mode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) { continue } @@ -432,17 +435,17 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { return errors.Trace(err) } if updated { - h.globalMap.update(id, -item.Delta, -item.Count, nil) + deltaMap.update(id, -item.Delta, -item.Count, nil) } if err = h.dumpTableStatColSizeToKV(id, item); err != nil { return errors.Trace(err) } if updated { - delete(h.globalMap, id) + delete(deltaMap, id) } else { - m := h.globalMap[id] + m := deltaMap[id] m.ColSize = nil - h.globalMap[id] = m + deltaMap[id] = m } } return nil @@ -522,8 +525,12 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e // DumpStatsFeedbackToKV dumps the stats feedback to KV. func (h *Handle) DumpStatsFeedbackToKV() error { + h.feedback.Lock() + feedback := h.feedback.data + h.feedback.data = statistics.NewQueryFeedbackMap() + h.feedback.Unlock() var err error - for _, fbs := range h.feedback.Feedbacks { + for _, fbs := range feedback.Feedbacks { for _, fb := range fbs { if fb.Tp == statistics.PkType { err = h.DumpFeedbackToKV(fb) @@ -548,7 +555,6 @@ func (h *Handle) DumpStatsFeedbackToKV() error { } } } - h.feedback = statistics.NewQueryFeedbackMap() return errors.Trace(err) } @@ -581,8 +587,12 @@ func (h *Handle) DumpFeedbackToKV(fb *statistics.QueryFeedback) error { // feedback locally on this tidb-server, so it could be used more timely. func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { h.sweepList() + h.feedback.Lock() + feedback := h.feedback.data + h.feedback.data = statistics.NewQueryFeedbackMap() + h.feedback.Unlock() OUTER: - for _, fbs := range h.feedback.Feedbacks { + for _, fbs := range feedback.Feedbacks { for _, fb := range fbs { h.mu.Lock() table, ok := h.getTableByPhysicalID(is, fb.PhysicalID) diff --git a/statistics/handle/update_list_test.go b/statistics/handle/update_list_test.go index 8c67c4fa0a8d4..1138bfc9161f0 100644 --- a/statistics/handle/update_list_test.go +++ b/statistics/handle/update_list_test.go @@ -25,8 +25,8 @@ func TestInsertAndDelete(t *testing.T) { t.Parallel() h := Handle{ listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)}, - feedback: statistics.NewQueryFeedbackMap(), } + h.feedback.data = statistics.NewQueryFeedbackMap() var items []*SessionStatsCollector for i := 0; i < 5; i++ { items = append(items, h.NewSessionStatsCollector())