Skip to content

Commit

Permalink
kvflowcontrol: performance optimizations in tracker.go
Browse files Browse the repository at this point in the history
This patch makes 2 main performance optimizations in the tracker
codepath:
1. Replace the trackedList with a struct that points to a list. This
   avoids map assignments each time anything is appended to the list.
2. We use a sync.Pool for tracked items to avoid frequent allocations
   and gc for the objects.

Informs #104154.

Release note: None
  • Loading branch information
aadityasondhi committed Sep 19, 2023
1 parent 0ff99f1 commit f5bca80
Showing 1 changed file with 47 additions and 25 deletions.
72 changes: 47 additions & 25 deletions pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"sort"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
Expand All @@ -29,11 +30,7 @@ import (
// admissionpb.WorkPriority, for replication along an individual
// kvflowcontrol.Stream.
type Tracker struct {
// TODO(irfansharif,aaditya): Everytime we track something, we incur a map
// assignment (shows up in CPU profiles). We could introduce a struct that
// internally embeds this list of tracked deductions, and append there
// instead. Do this as part of #104154.
trackedM map[admissionpb.WorkPriority][]tracked
trackedM map[admissionpb.WorkPriority]*trackedList

// lowerBound tracks on a per-stream basis the log position below which
// we ignore token deductions.
Expand All @@ -44,6 +41,24 @@ type Tracker struct {
knobs *kvflowcontrol.TestingKnobs
}

var trackedPool sync.Pool = sync.Pool{
New: func() interface{} {
return new(tracked)
},
}

func newTracked(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) *tracked {
t := trackedPool.Get().(*tracked)
t.tokens = tokens
t.position = position
return t
}

func freeTracked(t *tracked) {
*t = tracked{}
trackedPool.Put(t)
}

// tracked represents tracked flow tokens; they're tracked with respect to a
// raft log position (typically where the proposed command is expected to end
// up).
Expand All @@ -52,6 +67,10 @@ type tracked struct {
position kvflowcontrolpb.RaftLogPosition
}

type trackedList struct {
items []*tracked
}

// New constructs a new Tracker with the given lower bound raft log position
// (below which we're not allowed to deduct tokens).
func New(
Expand All @@ -63,7 +82,7 @@ func New(
knobs = &kvflowcontrol.TestingKnobs{}
}
return &Tracker{
trackedM: make(map[admissionpb.WorkPriority][]tracked),
trackedM: make(map[admissionpb.WorkPriority]*trackedList),
lowerBound: lb,
knobs: knobs,
stream: stream,
Expand Down Expand Up @@ -102,8 +121,12 @@ func (dt *Tracker) Track(
}
dt.lowerBound = pos

if len(dt.trackedM[pri]) >= 1 {
last := dt.trackedM[pri][len(dt.trackedM[pri])-1]
if _, ok := dt.trackedM[pri]; !ok {
dt.trackedM[pri] = &trackedList{}
}

if len(dt.trackedM[pri].items) >= 1 {
last := dt.trackedM[pri].items[len(dt.trackedM[pri].items)-1]
if !last.position.Less(pos) {
logFn := log.Errorf
if buildutil.CrdbTestBuild {
Expand All @@ -115,13 +138,7 @@ func (dt *Tracker) Track(
}
}

// TODO(irfansharif,aaditya): The tracked instances here make up about ~0.4%
// of allocations under kv0/enc=false/nodes=3/cpu=9. Maybe clean it up as
// part of #104154, by using a sync.Pool perhaps.
dt.trackedM[pri] = append(dt.trackedM[pri], tracked{
tokens: tokens,
position: pos,
})
dt.trackedM[pri].items = append(dt.trackedM[pri].items, newTracked(tokens, pos))
if log.V(1) {
log.Infof(ctx, "tracking %s flow control tokens for pri=%s stream=%s pos=%s",
tokens, pri, dt.stream, pos)
Expand All @@ -144,11 +161,11 @@ func (dt *Tracker) Untrack(
var untracked int
var tokens kvflowcontrol.Tokens
for {
if untracked == len(dt.trackedM[pri]) {
if untracked == len(dt.trackedM[pri].items) {
break
}

deduction := dt.trackedM[pri][untracked]
deduction := dt.trackedM[pri].items[untracked]
if !deduction.position.LessEq(upto) {
break
}
Expand All @@ -161,17 +178,22 @@ func (dt *Tracker) Untrack(
tokens += deduction.tokens
}

trackedBefore := len(dt.trackedM[pri])
dt.trackedM[pri] = dt.trackedM[pri][untracked:]
trackedBefore := len(dt.trackedM[pri].items)

// Free up untracked items.
for i := 0; i < untracked; i++ {
freeTracked(dt.trackedM[pri].items[i])
}
dt.trackedM[pri].items = dt.trackedM[pri].items[untracked:]
if log.V(1) {
remaining := ""
if len(dt.trackedM[pri]) > 0 {
remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri][0].tokens)
if len(dt.trackedM[pri].items) > 0 {
remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri].items[0].tokens)
}
log.Infof(ctx, "released %s flow control tokens for %d out of %d tracked deductions for pri=%s stream=%s, up to %s; %d tracked deduction(s) remain%s",
tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri]), remaining)
tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri].items), remaining)
}
if len(dt.trackedM[pri]) == 0 {
if len(dt.trackedM[pri].items) == 0 {
delete(dt.trackedM, pri)
}

Expand All @@ -186,7 +208,7 @@ func (dt *Tracker) Untrack(
func (dt *Tracker) Iter(_ context.Context, f func(admissionpb.WorkPriority, kvflowcontrol.Tokens)) {
for pri, deductions := range dt.trackedM {
var tokens kvflowcontrol.Tokens
for _, deduction := range deductions {
for _, deduction := range deductions.items {
tokens += deduction.tokens
}
f(pri, tokens)
Expand Down Expand Up @@ -229,7 +251,7 @@ func (dt *Tracker) TestingIter(
f func(admissionpb.WorkPriority, kvflowcontrol.Tokens, kvflowcontrolpb.RaftLogPosition) bool,
) {
for pri, deductions := range dt.trackedM {
for _, deduction := range deductions {
for _, deduction := range deductions.items {
if !f(pri, deduction.tokens, deduction.position) {
return
}
Expand Down

0 comments on commit f5bca80

Please sign in to comment.