Skip to content

Commit

Permalink
rac2: de-interface token counter and use syncutil map
Browse files Browse the repository at this point in the history
Prior to this change, `TokenCounter` provided an interface implemented
by `*tokenCounter`. As there is only one implementation, de-interface
`TokenCounter`. Also, store the TokenCounter in a `syncutil.Map`, as opposed
to a native mutex protected map.

Epic: CRDB-37515
Release note: None
  • Loading branch information
kvoli committed Aug 29, 2024
1 parent 65fdb8a commit fba5ab3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 64 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ type voterStateForWaiters struct {
isLeader bool
isLeaseHolder bool
isStateReplicate bool
evalTokenCounter TokenCounter
evalTokenCounter *tokenCounter
}

type voterSet []voterStateForWaiters
Expand Down Expand Up @@ -432,7 +432,7 @@ type replicaState struct {
// is the identity that is used to deduct tokens or wait for tokens to be
// positive.
stream kvflowcontrol.Stream
evalTokenCounter TokenCounter
evalTokenCounter *tokenCounter
desc roachpb.ReplicaDescriptor
connectedState connectedState
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,11 @@ func TestRangeControllerWaitForEval(t *testing.T) {

tokenCountsString := func() string {
var b strings.Builder
streams := make([]kvflowcontrol.Stream, 0, len(ssTokenCounter.mu.evalCounters))
for stream := range ssTokenCounter.mu.evalCounters {
streams = append(streams, stream)
}
var streams []kvflowcontrol.Stream
ssTokenCounter.evalCounters.Range(func(k kvflowcontrol.Stream, v *tokenCounter) bool {
streams = append(streams, k)
return true
})
sort.Slice(streams, func(i, j int) bool {
return streams[i].StoreID < streams[j].StoreID
})
Expand Down Expand Up @@ -375,7 +376,7 @@ func TestRangeControllerWaitForEval(t *testing.T) {
}
if _, ok := zeroedTokenCounters[stream]; !ok {
zeroedTokenCounters[stream] = struct{}{}
ssTokenCounter.Eval(stream).(*tokenCounter).adjust(ctx, admissionpb.RegularWorkClass, -1)
ssTokenCounter.Eval(stream).adjust(ctx, admissionpb.RegularWorkClass, -1)
}
}
}
Expand Down Expand Up @@ -448,7 +449,7 @@ func TestRangeControllerWaitForEval(t *testing.T) {
ssTokenCounter.Eval(kvflowcontrol.Stream{
StoreID: roachpb.StoreID(store),
TenantID: roachpb.SystemTenantID,
}).(*tokenCounter).adjust(ctx,
}).adjust(ctx,
admissionpb.WorkClassFromPri(pri),
kvflowcontrol.Tokens(tokens))
}
Expand Down
38 changes: 10 additions & 28 deletions pkg/kv/kvserver/kvflowcontrol/rac2/store_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,31 @@ import (
// for a given stream.
//
// TODO(kvoli): Add stream deletion upon decommissioning a store.
// TODO(kvoli): Check mutex performance against syncutil.Map.
type StreamTokenCounterProvider struct {
settings *cluster.Settings

mu struct {
syncutil.Mutex
sendCounters, evalCounters map[kvflowcontrol.Stream]TokenCounter
}
settings *cluster.Settings
sendCounters, evalCounters syncutil.Map[kvflowcontrol.Stream, tokenCounter]
}

// NewStreamTokenCounterProvider creates a new StreamTokenCounterProvider.
func NewStreamTokenCounterProvider(settings *cluster.Settings) *StreamTokenCounterProvider {
p := StreamTokenCounterProvider{settings: settings}
p.mu.evalCounters = make(map[kvflowcontrol.Stream]TokenCounter)
p.mu.sendCounters = make(map[kvflowcontrol.Stream]TokenCounter)
return &p
return &StreamTokenCounterProvider{settings: settings}
}

// Eval returns the evaluation token counter for the given stream.
func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) TokenCounter {
p.mu.Lock()
defer p.mu.Unlock()

if t, ok := p.mu.evalCounters[stream]; ok {
func (p *StreamTokenCounterProvider) Eval(stream kvflowcontrol.Stream) *tokenCounter {
if t, ok := p.evalCounters.Load(stream); ok {
return t
}

t := newTokenCounter(p.settings)
p.mu.evalCounters[stream] = t
t, _ := p.evalCounters.LoadOrStore(stream, newTokenCounter(p.settings))
return t
}

// Send returns the send token counter for the given stream.
func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) TokenCounter {
p.mu.Lock()
defer p.mu.Unlock()

if t, ok := p.mu.sendCounters[stream]; ok {
func (p *StreamTokenCounterProvider) Send(stream kvflowcontrol.Stream) *tokenCounter {
if t, ok := p.sendCounters.Load(stream); ok {
return t
}

t := newTokenCounter(p.settings)
p.mu.sendCounters[stream] = t
t, _ := p.sendCounters.LoadOrStore(stream, newTokenCounter(p.settings))
return t
}

Expand All @@ -86,7 +68,7 @@ type SendTokenWatcher interface {
// call CancelHandle when tokens are no longer needed, or when the caller is
// done.
NotifyWhenAvailable(
TokenCounter,
*tokenCounter,
TokenGrantNotification,
) SendTokenWatcherHandleID
// CancelHandle cancels the given handle, stopping it from being notified
Expand Down
29 changes: 1 addition & 28 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,6 @@ import (
"github.com/cockroachdb/redact"
)

// TokenCounter is the interface for a token counter that can be used to deduct
// and return flow control tokens. Additionally, it can be used to wait for
// tokens to become available, and to check if tokens are available without
// blocking.
//
// TODO(kvoli): Consider de-interfacing if not necessary for testing.
type TokenCounter interface {
// TokensAvailable returns true if tokens are available. If false, it returns
// a handle that may be used for waiting for tokens to become available.
TokensAvailable(admissionpb.WorkClass) (available bool, tokenWaitingHandle TokenWaitingHandle)
// TryDeduct attempts to deduct flow tokens for the given work class. If
// there are no tokens available, 0 tokens are returned. When less than the
// requested token count is available, partial tokens are returned
// corresponding to this partial amount.
TryDeduct(
context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens) kvflowcontrol.Tokens
// Deduct deducts (without blocking) flow tokens for the given work class. If
// there are not enough available tokens, the token counter will go into debt
// (negative available count) and still issue the requested number of tokens.
Deduct(context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens)
// Return returns flow tokens for the given work class.
Return(context.Context, admissionpb.WorkClass, kvflowcontrol.Tokens)
// String returns a string representation of the token counter.
String() string
}

// TokenWaitingHandle is the interface for waiting for positive tokens from a
// token counter.
type TokenWaitingHandle interface {
Expand Down Expand Up @@ -177,8 +151,7 @@ type tokenCounter struct {
}
}

var _ TokenCounter = &tokenCounter{}

// newTokenCounter creates a new TokenCounter.
func newTokenCounter(settings *cluster.Settings) *tokenCounter {
t := &tokenCounter{
settings: settings,
Expand Down

0 comments on commit fba5ab3

Please sign in to comment.