Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvflowcontroller: eliminate mutex contention #106466

Closed
Closed
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
608ae76
kvflowcontroller: eliminate mutex contention
irfansharif Jul 8, 2023
0dafffd
[wip] kvflowcontrol: annotate/fix perf regressions
irfansharif Jul 8, 2023
91fbc58
[wip] *: use compaction-instrumented pebble
bananabrick Jul 11, 2023
6edb6a4
[wip] admission: add metric for l0 compacted bytes, generated tokens
irfansharif Jul 12, 2023
d0b5661
[wip] *: use l0-compaction-prioritized pebble
irfansharif Jul 15, 2023
3b55635
[wip] roachtest: use disk snapshots for clearrange/*
irfansharif Jul 16, 2023
ee28401
[wip] *: use l0-compaction-prioritized pebble again
irfansharif Jul 16, 2023
67d86b5
[wip] *: prioritize l0 compactions v3
irfansharif Jul 16, 2023
f76a949
[wip] *: prioritize l0 compactions v4
irfansharif Jul 16, 2023
6234420
[wip] *: prioritize l0 compactions v5
irfansharif Jul 16, 2023
e7e6882
[wip] *: prioritize l0 compactions v6
irfansharif Jul 16, 2023
9bc0873
[wip] *: prioritize l0 compactions v7
irfansharif Jul 16, 2023
37fd5c6
[wip] *: prioritize l0 compactions v8
irfansharif Jul 16, 2023
07f4c3b
[wip] pebble: disable version check
irfansharif Jul 16, 2023
1085aa4
[wip] *: prioritize l0 compactions v9
irfansharif Jul 16, 2023
551c552
[wip] *: introduce L0 reduction factor in IO token calculations
irfansharif Jul 17, 2023
bd0e294
[wip] kvserver: stop bypassing follower work
irfansharif Jul 17, 2023
7ddab8f
[wip] *: go back to using only compaction-instrumented pebble
irfansharif Jul 20, 2023
b7e94ab
[wip] *: experimentally bump L0 priority in the presence of range delets
irfansharif Jul 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[wip] kvflowcontrol: annotate/fix perf regressions
  • Loading branch information
irfansharif committed Jul 12, 2023
commit 0dafffdec38f16766cc4ca7c7c95f1df88baed80
217 changes: 127 additions & 90 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
@@ -68,7 +69,9 @@ type Controller struct {
// streams get closed permanently (tenants get deleted, nodes removed)
// or when completely inactive (no tokens deducted/returned over 30+
// minutes), clear these out.
buckets map[kvflowcontrol.Stream]*bucket
buckets map[kvflowcontrol.Stream]*bucket
bucketsv2 sync.Map
bucketCount int
}
metrics *metrics
clock *hlc.Clock
@@ -86,38 +89,38 @@ func New(registry *metric.Registry, settings *cluster.Settings, clock *hlc.Clock

regularTokens := kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV))
elasticTokens := kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV))
c.mu.limit = map[admissionpb.WorkClass]kvflowcontrol.Tokens{
c.mu.limit = tokensPerWorkClass{
regular: regularTokens,
elastic: elasticTokens,
}
c.mu.buckets = make(map[kvflowcontrol.Stream]*bucket)
c.mu.bucketsv2 = sync.Map{}
regularTokensPerStream.SetOnChange(&settings.SV, func(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()

before := tokensPerWorkClass{
regular: c.mu.limit[regular],
elastic: c.mu.limit[elastic],
}
before := c.mu.limit
now := tokensPerWorkClass{
regular: kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)),
elastic: kvflowcontrol.Tokens(elasticTokensPerStream.Get(&settings.SV)),
}
adjustment := tokensPerWorkClass{
regular: now[regular] - before[regular],
elastic: now[elastic] - before[elastic],
regular: now.regular - before.regular,
elastic: now.elastic - before.elastic,
}
c.mu.limit = now
for _, b := range c.mu.buckets {
c.mu.bucketsv2.Range(func(_, value any) bool { // XXX: c.mu guards against new buckets being added, which .Range doesn't
b := value.(*bucket)
b.mu.Lock()
b.mu.tokens[regular] += adjustment[regular]
b.mu.tokens[elastic] += adjustment[elastic]
b.mu.tokensPerWorkClass.regular += adjustment.regular
b.mu.tokensPerWorkClass.elastic += adjustment.elastic
b.mu.Unlock()
c.metrics.onTokenAdjustment(adjustment)
if adjustment[regular] > 0 || adjustment[elastic] > 0 {
if adjustment.regular > 0 || adjustment.elastic > 0 {
b.signal() // signal a waiter, if any
}
}
return true
})
})
c.metrics = newMetrics(c)
registry.AddMetricStruct(c.metrics)
@@ -143,11 +146,9 @@ func (c *Controller) Admit(
logged := false
tstart := c.clock.PhysicalTime()
for {
c.mu.Lock()
b := c.getBucketLocked(connection.Stream())
c.mu.Unlock()
b := c.getBucket(connection.Stream()) // XXX: Fixed? Used sync.map. Appears in mutex profile. CPU profile

tokens := b.tokens(class)
tokens := b.tokens(class) // XXX: Fixed? Used rlock. Appears in mutex profile.
if tokens > 0 ||
// In addition to letting requests through when there are tokens
// being available, we'll also let them through if we're not
@@ -243,16 +244,20 @@ func (c *Controller) Inspect(ctx context.Context) []kvflowinspectpb.Stream {
defer c.mu.Unlock()

var streams []kvflowinspectpb.Stream
for stream, b := range c.mu.buckets {
b.mu.Lock()
c.mu.bucketsv2.Range(func(key, value any) bool {
stream := key.(kvflowcontrol.Stream)
b := value.(*bucket)

b.mu.RLock()
streams = append(streams, kvflowinspectpb.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
AvailableRegularTokens: int64(b.tokensLocked(regular)),
AvailableElasticTokens: int64(b.tokensLocked(elastic)),
})
b.mu.Unlock()
}
b.mu.RUnlock()
return true
})
sort.Slice(streams, func(i, j int) bool { // for determinism
if streams[i].TenantID != streams[j].TenantID {
return streams[i].TenantID.ToUint64() < streams[j].TenantID.ToUint64()
@@ -270,53 +275,75 @@ func (c *Controller) InspectStream(
return kvflowinspectpb.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
AvailableRegularTokens: int64(tokens[regular]),
AvailableElasticTokens: int64(tokens[elastic]),
AvailableRegularTokens: int64(tokens.regular),
AvailableElasticTokens: int64(tokens.elastic),
}
}

// XXX: Fixed? go build -gcflags '-m' ./pkg/kv/kvserver/kvflowcontrol/kvflowcontroller
func (c *Controller) adjustTokens(
// XXX: Fixed. alloc_objects underneath this call stack. On .adjust, .onTokenAdjustment, onUnaccounted
ctx context.Context,
pri admissionpb.WorkPriority,
delta kvflowcontrol.Tokens,
stream kvflowcontrol.Stream,
) {
class := admissionpb.WorkClassFromPri(pri)

c.mu.Lock()
b := c.getBucketLocked(stream)
c.mu.Unlock()
b := c.getBucket(stream)
adjustment, unaccounted := b.adjust(ctx, class, delta, c.mu.limit)
c.metrics.onTokenAdjustment(adjustment)
c.metrics.onUnaccounted(unaccounted)
if adjustment[regular] > 0 || adjustment[elastic] > 0 {
if unaccounted.regular > 0 || unaccounted.elastic > 0 {
c.metrics.onUnaccounted(unaccounted) // XXX: Fixed. Map access shows up. Only do it if non-zero.
}
if adjustment.regular > 0 || adjustment.elastic > 0 {
b.signal() // signal a waiter, if any
}

if log.ExpensiveLogEnabled(ctx, 2) {
b.mu.Lock()
b.mu.RLock()
log.Infof(ctx, "adjusted flow tokens (pri=%s stream=%s delta=%s): regular=%s elastic=%s",
pri, stream, delta, b.tokensLocked(regular), b.tokensLocked(elastic))
b.mu.Unlock()
b.mu.RUnlock()
}
}

func (c *Controller) getBucketLocked(stream kvflowcontrol.Stream) *bucket {
b, ok := c.mu.buckets[stream]
// XXX: sync.map is more expensive CPU wise as per BenchmarkController for
// reads. ~250ns vs. ~350ns. We could use a RLock at the start and then swap
// out with Lock if editing.
b, ok := c.mu.bucketsv2.Load(stream)
if !ok {
var loaded bool
b, loaded = c.mu.bucketsv2.LoadOrStore(stream, newBucket(c.mu.limit))
if !loaded {
c.mu.bucketCount += 1
}
}
return b.(*bucket)
}

func (c *Controller) getBucket(stream kvflowcontrol.Stream) *bucket {
b, ok := c.mu.bucketsv2.Load(stream)
if !ok {
b = newBucket(c.mu.limit)
c.mu.buckets[stream] = b
c.mu.Lock()
var loaded bool
b, loaded = c.mu.bucketsv2.LoadOrStore(stream, newBucket(c.mu.limit))
if !loaded {
c.mu.bucketCount += 1
}
c.mu.Unlock()
}
return b
return b.(*bucket)
}

// bucket holds flow tokens for {regular,elastic} traffic over a
// kvflowcontrol.Stream. It's used to synchronize handoff between threads
// returning and waiting for flow tokens.
type bucket struct {
mu struct {
syncutil.Mutex
tokens tokensPerWorkClass
syncutil.RWMutex
tokensPerWorkClass tokensPerWorkClass
}

// Waiting requests do so by waiting on signalCh without holding mutexes.
@@ -334,25 +361,25 @@ type bucket struct {
signalCh chan struct{}
}

func newBucket(t tokensPerWorkClass) *bucket {
func newBucket(tokensPerWorkClass tokensPerWorkClass) *bucket {
b := bucket{
signalCh: make(chan struct{}, 1),
}
b.mu.tokens = map[admissionpb.WorkClass]kvflowcontrol.Tokens{
regular: t[regular],
elastic: t[elastic],
}
b.mu.tokensPerWorkClass = tokensPerWorkClass
return &b
}

func (b *bucket) tokens(wc admissionpb.WorkClass) kvflowcontrol.Tokens {
b.mu.Lock()
defer b.mu.Unlock()
b.mu.RLock()
defer b.mu.RUnlock()
return b.tokensLocked(wc)
}

func (b *bucket) tokensLocked(wc admissionpb.WorkClass) kvflowcontrol.Tokens {
return b.mu.tokens[wc]
if wc == regular {
return b.mu.tokensPerWorkClass.regular
}
return b.mu.tokensPerWorkClass.elastic
}

func (b *bucket) signal() {
@@ -374,52 +401,72 @@ func (b *bucket) adjust(
) (adjustment, unaccounted tokensPerWorkClass) {
b.mu.Lock()
defer b.mu.Unlock()

unaccounted = tokensPerWorkClass{
regular: 0,
elastic: 0,
}

before := tokensPerWorkClass{
regular: b.mu.tokens[regular],
elastic: b.mu.tokens[elastic],
}
// XXX: Appears in mutex profiles. Highest. We want to increment but enforce
// a limit. What if reads always capped it at the limit? And when
// incrementing, we'll get the new value. If we're over it, we we know we've
// +delta, so we can maximally -delta over the limit.
//
// var c int64 = 0
// var limit int64 = rand.Int63n(10000000000)
// for i := 0; i < 50; i++ {
// go func() {
// for j := 0; j < 2000; j++ {
// delta := rand.Int63()
// v := atomic.AddInt64(&c, delta)
// if v > limit {
// overlimit := v - limit
// var adjustment int64 = overlimit
// if delta < overlimit {
// adjustment = delta
// }
// n := atomic.AddInt64(&c, -adjustment)
// fmt.Printf("%d > %d by %d, adjusted by %d to %d)\n",
// v, limit, v-limit, -adjustment, n)
// }
// }
// }()
// }

unaccounted = tokensPerWorkClass{}
before := b.mu.tokensPerWorkClass

switch class {
case elastic:
// Elastic {deductions,returns} only affect elastic flow tokens.
b.mu.tokens[class] += delta
if delta > 0 && b.mu.tokens[class] > limit[class] {
unaccounted[class] = b.mu.tokens[class] - limit[class]
b.mu.tokens[class] = limit[class] // enforce ceiling
b.mu.tokensPerWorkClass.elastic += delta
if delta > 0 && b.mu.tokensPerWorkClass.elastic > limit.elastic {
unaccounted.elastic = b.mu.tokensPerWorkClass.elastic - limit.elastic
b.mu.tokensPerWorkClass.elastic = limit.elastic // enforce ceiling
}
case regular:
b.mu.tokens[class] += delta
if delta > 0 && b.mu.tokens[class] > limit[class] {
unaccounted[class] = b.mu.tokens[class] - limit[class]
b.mu.tokens[class] = limit[class] // enforce ceiling
b.mu.tokensPerWorkClass.regular += delta
if delta > 0 && b.mu.tokensPerWorkClass.regular > limit.regular {
unaccounted.regular = b.mu.tokensPerWorkClass.regular - limit.regular
b.mu.tokensPerWorkClass.regular = limit.regular // enforce ceiling
}

b.mu.tokens[elastic] += delta
if delta > 0 && b.mu.tokens[elastic] > limit[elastic] {
unaccounted[elastic] = b.mu.tokens[elastic] - limit[elastic]
b.mu.tokens[elastic] = limit[elastic] // enforce ceiling
b.mu.tokensPerWorkClass.elastic += delta
if delta > 0 && b.mu.tokensPerWorkClass.elastic > limit.elastic {
unaccounted.elastic = b.mu.tokensPerWorkClass.elastic - limit.elastic
b.mu.tokensPerWorkClass.elastic = limit.elastic // enforce ceiling
}
}

if buildutil.CrdbTestBuild && (unaccounted[regular] != 0 || unaccounted[elastic] != 0) {
if buildutil.CrdbTestBuild && (unaccounted.regular != 0 || unaccounted.elastic != 0) {
log.Fatalf(ctx, "unaccounted[regular]=%s unaccounted[elastic]=%s for class=%s delta=%s limit[regular]=%s limit[elastic]=%s",
unaccounted[regular], unaccounted[elastic], class, delta, limit[regular], limit[elastic])
unaccounted.regular, unaccounted.elastic, class, delta, limit.regular, limit.elastic)
}

adjustment = tokensPerWorkClass{
regular: b.mu.tokens[regular] - before[regular],
elastic: b.mu.tokens[elastic] - before[elastic],
regular: b.mu.tokensPerWorkClass.regular - before.regular,
elastic: b.mu.tokensPerWorkClass.elastic - before.elastic,
}
return adjustment, unaccounted
}

type tokensPerWorkClass map[admissionpb.WorkClass]kvflowcontrol.Tokens
type tokensPerWorkClass struct {
regular, elastic kvflowcontrol.Tokens
}

const (
minTokensPerStream kvflowcontrol.Tokens = 1 << 20 // 1 MiB
@@ -438,16 +485,13 @@ func validateTokenRange(b int64) error {
}

func (c *Controller) getTokensForStream(stream kvflowcontrol.Stream) tokensPerWorkClass {
ret := make(map[admissionpb.WorkClass]kvflowcontrol.Tokens)
c.mu.Lock()
b := c.getBucketLocked(stream)
c.mu.Unlock()
ret := tokensPerWorkClass{}
b := c.getBucket(stream)

b.mu.Lock()
for _, wc := range []admissionpb.WorkClass{regular, elastic} {
ret[wc] = b.tokensLocked(wc)
}
b.mu.Unlock()
b.mu.RLock()
ret.regular = b.tokensLocked(regular)
ret.elastic = b.tokensLocked(elastic)
b.mu.RUnlock()
return ret
}

@@ -486,14 +530,9 @@ func (c *Controller) TestingNonBlockingAdmit(
default:
}

c.mu.Lock()
b := c.getBucketLocked(connection.Stream())
c.mu.Unlock()

b.mu.Lock()
tokens := b.mu.tokens[class]
b.mu.Unlock()
b := c.getBucket(connection.Stream())

tokens := b.tokens(class)
if tokens <= 0 {
return false
}
@@ -527,9 +566,7 @@ func (c *Controller) TestingMetrics() interface{} {
}

func (c *Controller) testingGetBucket(stream kvflowcontrol.Stream) *bucket {
c.mu.Lock()
defer c.mu.Unlock()
return c.getBucketLocked(stream)
return c.getBucket(stream)
}

func (b *bucket) testingSignaled(connection kvflowcontrol.ConnectedStream) func() bool {
Loading