Skip to content

Commit

Permalink
add mutex for globalMap and feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyifangreeneyes committed Dec 8, 2021
1 parent d83ee8c commit 5099a1b
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 45 deletions.
19 changes: 19 additions & 0 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,25 @@ func (m *QueryFeedbackMap) append(k QueryFeedbackKey, qs []*QueryFeedback) bool
return true
}

// SiftFeedbacks eliminates feedbacks which are overlapped with others. It is a tradeoff between
// feedback accuracy and its overhead.
func (m *QueryFeedbackMap) SiftFeedbacks() {
sc := &stmtctx.StatementContext{TimeZone: time.UTC}
for k, qs := range m.Feedbacks {
fbs := make([]Feedback, 0, len(qs)*2)
for _, q := range qs {
fbs = append(fbs, q.Feedback...)
}
if len(fbs) == 0 {
delete(m.Feedbacks, k)
continue
}
m.Feedbacks[k] = m.Feedbacks[k][:1]
m.Feedbacks[k][0].Feedback, _ = NonOverlappedFeedbacks(sc, fbs)
}
m.Size = len(m.Feedbacks)
}

// Merge combines 2 collections of feedbacks.
func (m *QueryFeedbackMap) Merge(r *QueryFeedbackMap) {
for k, qs := range r.Feedbacks {
Expand Down
29 changes: 21 additions & 8 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,15 @@ type Handle struct {
// listHead contains all the stats collector required by session.
listHead *SessionStatsCollector
// globalMap contains all the delta map from collectors when we dump them to KV.
globalMap tableDeltaMap
globalMap struct {
sync.Mutex
data tableDeltaMap
}
// feedback is used to store query feedback info.
feedback *statistics.QueryFeedbackMap
feedback struct {
sync.Mutex
data *statistics.QueryFeedbackMap
}

lease atomic2.Duration

Expand Down Expand Up @@ -159,6 +165,7 @@ func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string,

// Clear the statsCache, only for test.
func (h *Handle) Clear() {
// TODO: Here h.mu seems to protect all the fields of Handle. Is is reasonable?
h.mu.Lock()
h.statsCache.Lock()
h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
Expand All @@ -167,13 +174,17 @@ func (h *Handle) Clear() {
for len(h.ddlEventCh) > 0 {
<-h.ddlEventCh
}
h.feedback = statistics.NewQueryFeedbackMap()
h.feedback.Lock()
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
h.mu.ctx.GetSessionVars().InitChunkSize = 1
h.mu.ctx.GetSessionVars().MaxChunkSize = 1
h.mu.ctx.GetSessionVars().EnableChunkRPC = false
h.mu.ctx.GetSessionVars().SetProjectionConcurrency(0)
h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}
h.globalMap = make(tableDeltaMap)
h.globalMap.Lock()
h.globalMap.data = make(tableDeltaMap)
h.globalMap.Unlock()
h.mu.rateMap = make(errorRateDeltaMap)
h.mu.Unlock()
}
Expand All @@ -188,8 +199,6 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*
handle := &Handle{
ddlEventCh: make(chan *util.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
globalMap: make(tableDeltaMap),
feedback: statistics.NewQueryFeedbackMap(),
idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)},
pool: pool,
}
Expand All @@ -199,6 +208,8 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*
handle.mu.ctx = ctx
handle.mu.rateMap = make(errorRateDeltaMap)
handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
handle.globalMap.data = make(tableDeltaMap)
handle.feedback.data = statistics.NewQueryFeedbackMap()
err := handle.RefreshVars()
if err != nil {
return nil, err
Expand All @@ -218,10 +229,12 @@ func (h *Handle) SetLease(lease time.Duration) {

// GetQueryFeedback gets the query feedback. It is only used in test.
func (h *Handle) GetQueryFeedback() *statistics.QueryFeedbackMap {
h.feedback.Lock()
defer func() {
h.feedback = statistics.NewQueryFeedbackMap()
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
}()
return h.feedback
return h.feedback.data
}

// DurationToTS converts duration to timestamp.
Expand Down
82 changes: 46 additions & 36 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[i
m[id] = item
}

func (m tableDeltaMap) merge(deltaMap tableDeltaMap) {
for id, item := range deltaMap {
m.update(id, item.Delta, item.Count, &item.ColSize)
}
}

type errorRateDelta struct {
PkID int64
PkErrorRate *statistics.ErrorRate
Expand Down Expand Up @@ -125,14 +131,12 @@ func (m errorRateDeltaMap) clear(tableID int64, histID int64, isIndex bool) {
m[tableID] = item
}

func (h *Handle) merge(s *SessionStatsCollector, rateMap errorRateDeltaMap) {
for id, item := range s.mapper {
h.globalMap.update(id, item.Delta, item.Count, &item.ColSize)
}
func merge(s *SessionStatsCollector, deltaMap tableDeltaMap, rateMap errorRateDeltaMap, feedback *statistics.QueryFeedbackMap) {
deltaMap.merge(s.mapper)
s.mapper = make(tableDeltaMap)
rateMap.merge(s.rateMap)
s.rateMap = make(errorRateDeltaMap)
h.feedback.Merge(s.feedback)
feedback.Merge(s.feedback)
s.feedback = statistics.NewQueryFeedbackMap()
}

Expand Down Expand Up @@ -375,13 +379,15 @@ const (
// sweepList will loop over the list, merge each session's local stats into handle
// and remove closed session's collector.
func (h *Handle) sweepList() {
deltaMap := make(tableDeltaMap)
errorRateMap := make(errorRateDeltaMap)
feedback := statistics.NewQueryFeedbackMap()
prev := h.listHead
prev.Lock()
errorRateMap := make(errorRateDeltaMap)
for curr := prev.next; curr != nil; curr = curr.next {
curr.Lock()
// Merge the session stats into handle and error rate map.
h.merge(curr, errorRateMap)
// Merge the session stats into deltaMap, errorRateMap and feedback respectively.
merge(curr, deltaMap, errorRateMap, feedback)
if curr.deleted {
prev.next = curr.next
// Since the session is already closed, we can safely unlock it here.
Expand All @@ -393,37 +399,34 @@ func (h *Handle) sweepList() {
}
}
prev.Unlock()
h.globalMap.Lock()
h.globalMap.data.merge(deltaMap)
h.globalMap.Unlock()
h.mu.Lock()
h.mu.rateMap.merge(errorRateMap)
h.mu.Unlock()
h.siftFeedbacks()
}

// siftFeedbacks eliminates feedbacks which are overlapped with others. It is a tradeoff between
// feedback accuracy and its overhead.
func (h *Handle) siftFeedbacks() {
sc := &stmtctx.StatementContext{TimeZone: time.UTC}
for k, qs := range h.feedback.Feedbacks {
fbs := make([]statistics.Feedback, 0, len(qs)*2)
for _, q := range qs {
fbs = append(fbs, q.Feedback...)
}
if len(fbs) == 0 {
delete(h.feedback.Feedbacks, k)
continue
}
h.feedback.Feedbacks[k] = h.feedback.Feedbacks[k][:1]
h.feedback.Feedbacks[k][0].Feedback, _ = statistics.NonOverlappedFeedbacks(sc, fbs)
}
h.feedback.Size = len(h.feedback.Feedbacks)
h.feedback.Lock()
h.feedback.data.Merge(feedback)
h.feedback.data.SiftFeedbacks()
h.feedback.Unlock()
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
h.sweepList()
h.globalMap.Lock()
deltaMap := h.globalMap.data
h.globalMap.data = make(tableDeltaMap)
h.globalMap.Unlock()
defer func() {
h.globalMap.Lock()
deltaMap.merge(h.globalMap.data)
h.globalMap.data = deltaMap
h.globalMap.Unlock()
}()
currentTime := time.Now()
for id, item := range h.globalMap {
for id, item := range deltaMap {
if mode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) {
continue
}
Expand All @@ -432,17 +435,17 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
return errors.Trace(err)
}
if updated {
h.globalMap.update(id, -item.Delta, -item.Count, nil)
deltaMap.update(id, -item.Delta, -item.Count, nil)
}
if err = h.dumpTableStatColSizeToKV(id, item); err != nil {
return errors.Trace(err)
}
if updated {
delete(h.globalMap, id)
delete(deltaMap, id)
} else {
m := h.globalMap[id]
m := deltaMap[id]
m.ColSize = nil
h.globalMap[id] = m
deltaMap[id] = m
}
}
return nil
Expand Down Expand Up @@ -522,8 +525,12 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e

// DumpStatsFeedbackToKV dumps the stats feedback to KV.
func (h *Handle) DumpStatsFeedbackToKV() error {
h.feedback.Lock()
feedback := h.feedback.data
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
var err error
for _, fbs := range h.feedback.Feedbacks {
for _, fbs := range feedback.Feedbacks {
for _, fb := range fbs {
if fb.Tp == statistics.PkType {
err = h.DumpFeedbackToKV(fb)
Expand All @@ -548,7 +555,6 @@ func (h *Handle) DumpStatsFeedbackToKV() error {
}
}
}
h.feedback = statistics.NewQueryFeedbackMap()
return errors.Trace(err)
}

Expand Down Expand Up @@ -581,8 +587,12 @@ func (h *Handle) DumpFeedbackToKV(fb *statistics.QueryFeedback) error {
// feedback locally on this tidb-server, so it could be used more timely.
func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) {
h.sweepList()
h.feedback.Lock()
feedback := h.feedback.data
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
OUTER:
for _, fbs := range h.feedback.Feedbacks {
for _, fbs := range feedback.Feedbacks {
for _, fb := range fbs {
h.mu.Lock()
table, ok := h.getTableByPhysicalID(is, fb.PhysicalID)
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func TestInsertAndDelete(t *testing.T) {
t.Parallel()
h := Handle{
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)},
feedback: statistics.NewQueryFeedbackMap(),
}
h.feedback.data = statistics.NewQueryFeedbackMap()
var items []*SessionStatsCollector
for i := 0; i < 5; i++ {
items = append(items, h.NewSessionStatsCollector())
Expand Down

0 comments on commit 5099a1b

Please sign in to comment.