Skip to content

Commit

Permalink
kvserver: hook up rac2 inspect methods
Browse files Browse the repository at this point in the history
The previous patch introduced `Inspect` methods mapping to `Handle`,
`ConnectedStream`, `TrackedDeduction` and `Stream`. Extract the
`Inspect` methods into their own interfaces which are implemented by
rac2.

The methods are then hooked into `storesForRAC2`, which can be accessed
in a similar manner to the v1 methods.

Part of: cockroachdb#128091
Release note: None
  • Loading branch information
kvoli committed Sep 9, 2024
1 parent 4beff47 commit a0ef89d
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 12 deletions.
59 changes: 59 additions & 0 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func (sh *storesForFlowControl) Lookup(
return handle, found
}

// LookupInspect is part of the StoresForFlowControl interface.
func (sh *storesForFlowControl) LookupInspect(
rangeID roachpb.RangeID,
) (handle kvflowcontrol.InspectHandle, found bool) {
return sh.Lookup(rangeID)
}

// LookupReplicationAdmissionHandle is part of the StoresForFlowControl
// interface.
func (sh *storesForFlowControl) LookupReplicationAdmissionHandle(
Expand Down Expand Up @@ -130,6 +137,13 @@ func (sh *storeForFlowControl) Lookup(
return repl.mu.replicaFlowControlIntegration.handle()
}

// LookupInspect is part of the StoresForFlowControl interface.
func (sh *storeForFlowControl) LookupInspect(
rangeID roachpb.RangeID,
) (kvflowcontrol.InspectHandle, bool) {
return sh.Lookup(rangeID)
}

// LookupReplicationAdmissionHandle is part of the StoresForFlowControl
// interface.
func (sh *storeForFlowControl) LookupReplicationAdmissionHandle(
Expand Down Expand Up @@ -267,6 +281,13 @@ func (l NoopStoresFlowControlIntegration) LookupReplicationAdmissionHandle(
func (l NoopStoresFlowControlIntegration) ResetStreams(context.Context) {
}

// LookupInspect is part of the StoresForFlowControl interface.
func (l NoopStoresFlowControlIntegration) LookupInspect(
roachpb.RangeID,
) (kvflowcontrol.InspectHandle, bool) {
return nil, false
}

// Inspect is part of the StoresForFlowControl interface.
func (l NoopStoresFlowControlIntegration) Inspect() []roachpb.RangeID {
return nil
Expand All @@ -284,6 +305,7 @@ func (NoopStoresFlowControlIntegration) OnRaftTransportDisconnected(
type StoresForRACv2 interface {
admission.OnLogEntryAdmitted
PiggybackedAdmittedResponseScheduler
kvflowcontrol.InspectHandles
}

// PiggybackedAdmittedResponseScheduler routes followers piggybacked admitted
Expand Down Expand Up @@ -350,6 +372,43 @@ func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2(
}
}

// LookupInspect implements kvflowcontrol.InspectHandles.
func (ss *storesForRACv2) LookupInspect(
rangeID roachpb.RangeID,
) (handle kvflowcontrol.InspectHandle, found bool) {
ls := (*Stores)(ss)
if err := ls.VisitStores(func(s *Store) error {
if found {
return nil
}
if r := s.GetReplicaIfExists(rangeID); r != nil {
handle, found = r.flowControlV2.Inspect(context.Background())
}
return nil
}); err != nil {
log.Errorf(ls.AnnotateCtx(context.Background()),
"unexpected error iterating stores: %s", err)
}
return handle, found
}

// Inspect implements kvflowcontrol.InspectHandles.
func (ss *storesForRACv2) Inspect() []roachpb.RangeID {
ls := (*Stores)(ss)
var rangeIDs []roachpb.RangeID
if err := ls.VisitStores(func(s *Store) error {
s.VisitReplicas(func(r *Replica) (wantMore bool) {
rangeIDs = append(rangeIDs, r.RangeID)
return true
})
return nil
}); err != nil {
log.Errorf(ls.AnnotateCtx(context.Background()),
"unexpected error iterating stores: %s", err)
}
return rangeIDs
}

type admissionDemuxHandle struct {
v1Handle kvflowcontrol.ReplicationAdmissionHandle
r *Replica
Expand Down
33 changes: 24 additions & 9 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type Tokens int64
// Controller provides flow control for replication traffic in KV, held at the
// node-level.
type Controller interface {
InspectController
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority, create-time, and over the given stream. This blocks
// until there are flow tokens available or the stream disconnects, subject to
Expand All @@ -167,9 +168,6 @@ type Controller interface {
// expected to have been deducted earlier with the same priority provided
// here.
ReturnTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream)
// Inspect returns a snapshot of all underlying streams and their available
// {regular,elastic} tokens. It's used to power /inspectz.
Inspect(context.Context) []kvflowinspectpb.Stream
// InspectStream returns a snapshot of a specific underlying stream and its
// available {regular,elastic} tokens. It's used to power /inspectz.
InspectStream(context.Context, Stream) kvflowinspectpb.Stream
Expand All @@ -181,6 +179,12 @@ type Controller interface {
// See I2, I3a and [^7] in kvflowcontrol/doc.go.
}

type InspectController interface {
// Inspect returns a snapshot of all underlying streams and their available
// {regular,elastic} tokens. It's used to power /inspectz.
Inspect(context.Context) []kvflowinspectpb.Stream
}

// ReplicationAdmissionHandle abstracts waiting for admission across RACv1 and RACv2.
type ReplicationAdmissionHandle interface {
// Admit seeks admission to replicate data, regardless of size, for work
Expand Down Expand Up @@ -218,6 +222,7 @@ type ReplicationAdmissionHandle interface {
// kvflowcontrolpb.AdmittedRaftLogEntries for more details).
type Handle interface {
ReplicationAdmissionHandle
InspectHandle
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
Expand Down Expand Up @@ -267,30 +272,31 @@ type Handle interface {
// Admit(). It's only used when cluster settings change, settings that
// affect all work waiting for flow tokens.
ResetStreams(ctx context.Context)
// Inspect returns a serialized form of the underlying handle. It's used to
// power /inspectz.
Inspect(context.Context) kvflowinspectpb.Handle
// Close closes the handle and returns all held tokens back to the
// underlying controller. Typically used when the replica loses its lease
// and/or raft leadership, or ends up getting GC-ed (if it's being
// rebalanced, merged away, etc).
Close(context.Context)
}

type InspectHandle interface {
// Inspect returns a serialized form of the underlying handle. It's used to
// power /inspectz.
Inspect(context.Context) kvflowinspectpb.Handle
}

// Handles represent a set of flow control handles. Note that handles are
// typically held on replicas initiating replication traffic, so on a given node
// they're uniquely identified by their range ID.
type Handles interface {
InspectHandles
// Lookup the kvflowcontrol.Handle for the specific range (or rather, the
// replica of the specific range that's locally held).
Lookup(roachpb.RangeID) (Handle, bool)
// ResetStreams resets all underlying streams for all underlying
// kvflowcontrol.Handles, i.e. disconnect and reconnect each one. It
// effectively unblocks all requests waiting in Admit().
ResetStreams(ctx context.Context)
// Inspect returns the set of ranges that have an embedded
// kvflowcontrol.Handle. It's used to power /inspectz.
Inspect() []roachpb.RangeID
// TODO(irfansharif): When fixing I1 and I2 from kvflowcontrol/node.go,
// we'll want to disconnect all streams for a specific node. Expose
// something like the following to disconnect all replication streams bound
Expand All @@ -308,6 +314,15 @@ type Handles interface {
LookupReplicationAdmissionHandle(roachpb.RangeID) (ReplicationAdmissionHandle, bool)
}

type InspectHandles interface {
// LookupInspect the serialized form of a handle for the specific range (or
// rather, the replica of the specific range that's locally held).
LookupInspect(roachpb.RangeID) (InspectHandle, bool)
// Inspect returns the set of ranges that have an embedded
// kvflowcontrol.Handle. It's used to power /inspectz.
Inspect() []roachpb.RangeID
}

// HandleFactory is used to construct new Handles.
type HandleFactory interface {
NewHandle(roachpb.RangeID, roachpb.TenantID) Handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,8 @@ func (d dummyHandles) Inspect() []roachpb.RangeID {
return nil
}

func (d dummyHandles) LookupInspect(id roachpb.RangeID) (kvflowcontrol.InspectHandle, bool) {
return nil, false
}

var _ kvflowcontrol.Handles = dummyHandles{}
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
// None of the methods are called with Replica.mu held. The caller should
// typically order its mutexes before Replica.mu.
type RangeController interface {
kvflowcontrol.InspectHandle
// WaitForEval seeks admission to evaluate a request at the given priority.
// This blocks until there are positive tokens available for the request to
// be admitted for evaluation, or the context is canceled (which returns an
Expand Down Expand Up @@ -75,9 +76,6 @@ type RangeController interface {
//
// Requires replica.raftMu to be held.
CloseRaftMuLocked(ctx context.Context)
// Inspect returns a handle containing the state of the range controller.
// It's used to power /inspectz-style debugging pages.
Inspect(ctx context.Context) kvflowinspectpb.Handle
}

// TODO(pav-kv): This interface a placeholder for the interface containing raft
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ type Processor interface {
// and error will be nil.
AdmitForEval(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) (admitted bool, err error)

// Inspect returns a handle to inspect the state of the underlying range
// controller. It is used to power /inspectz-style debugging pages.
Inspect(ctx context.Context) (kvflowcontrol.InspectHandle, bool)
}

type processorImpl struct {
Expand Down Expand Up @@ -985,6 +989,17 @@ func (p *processorImpl) AdmitForEval(
return p.mu.leader.rc.WaitForEval(ctx, pri)
}

// Inspect returns a handle containing the state of the range controller. It's
// used to power /inspectz-style debugging pages.
func (p *processorImpl) Inspect(ctx context.Context) (kvflowcontrol.InspectHandle, bool) {
p.mu.leader.rcReferenceUpdateMu.RLock()
defer p.mu.leader.rcReferenceUpdateMu.RUnlock()
if p.mu.leader.rc == nil {
return nil, false
}
return p.mu.leader.rc, true
}

func admittedIncreased(prev, next [raftpb.NumPriorities]uint64) bool {
for i := range prev {
if prev[i] < next[i] {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,11 @@ func TestProcessorBasic(t *testing.T) {
}
return builderStr()

case "inspect":
rc := rcFactory.rcs[len(rcFactory.rcs)-1]
rc.Inspect(ctx)
return builderStr()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -800,3 +800,7 @@ HandleRaftReady:
Replica.MuUnlock
RangeController.CloseRaftMuLocked
.....

inspect
----
RangeController.Inspect

0 comments on commit a0ef89d

Please sign in to comment.