From a0ef89d41ca509f2731b5779028bcf1ab7f7a817 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 9 Sep 2024 14:53:41 -0400 Subject: [PATCH] kvserver: hook up rac2 inspect methods The previous patch introduced `Inspect` methods mapping to `Handle`, `ConnectedStream`, `TrackedDeduction` and `Stream`. Extract the `Inspect` methods into their own interfaces which are implemented by rac2. The methods are then hooked into `storesForRAC2`, which can be accessed in a similar manner to the v1 methods. Part of: #128091 Release note: None --- pkg/kv/kvserver/flow_control_stores.go | 59 +++++++++++++++++++ .../kvserver/kvflowcontrol/kvflowcontrol.go | 33 ++++++++--- .../kvflowdispatch/kvflowdispatch_test.go | 4 ++ .../kvflowcontrol/rac2/range_controller.go | 4 +- .../kvflowcontrol/replica_rac2/processor.go | 15 +++++ .../replica_rac2/processor_test.go | 5 ++ .../replica_rac2/testdata/processor | 4 ++ 7 files changed, 112 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index 87c5081acb1c..09c759d6d82f 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -56,6 +56,13 @@ func (sh *storesForFlowControl) Lookup( return handle, found } +// LookupInspect is part of the StoresForFlowControl interface. +func (sh *storesForFlowControl) LookupInspect( + rangeID roachpb.RangeID, +) (handle kvflowcontrol.InspectHandle, found bool) { + return sh.Lookup(rangeID) +} + // LookupReplicationAdmissionHandle is part of the StoresForFlowControl // interface. func (sh *storesForFlowControl) LookupReplicationAdmissionHandle( @@ -130,6 +137,13 @@ func (sh *storeForFlowControl) Lookup( return repl.mu.replicaFlowControlIntegration.handle() } +// LookupInspect is part of the StoresForFlowControl interface. +func (sh *storeForFlowControl) LookupInspect( + rangeID roachpb.RangeID, +) (kvflowcontrol.InspectHandle, bool) { + return sh.Lookup(rangeID) +} + // LookupReplicationAdmissionHandle is part of the StoresForFlowControl // interface. func (sh *storeForFlowControl) LookupReplicationAdmissionHandle( @@ -267,6 +281,13 @@ func (l NoopStoresFlowControlIntegration) LookupReplicationAdmissionHandle( func (l NoopStoresFlowControlIntegration) ResetStreams(context.Context) { } +// LookupInspect is part of the StoresForFlowControl interface. +func (l NoopStoresFlowControlIntegration) LookupInspect( + roachpb.RangeID, +) (kvflowcontrol.InspectHandle, bool) { + return nil, false +} + // Inspect is part of the StoresForFlowControl interface. func (l NoopStoresFlowControlIntegration) Inspect() []roachpb.RangeID { return nil @@ -284,6 +305,7 @@ func (NoopStoresFlowControlIntegration) OnRaftTransportDisconnected( type StoresForRACv2 interface { admission.OnLogEntryAdmitted PiggybackedAdmittedResponseScheduler + kvflowcontrol.InspectHandles } // PiggybackedAdmittedResponseScheduler routes followers piggybacked admitted @@ -350,6 +372,43 @@ func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2( } } +// LookupInspect implements kvflowcontrol.InspectHandles. +func (ss *storesForRACv2) LookupInspect( + rangeID roachpb.RangeID, +) (handle kvflowcontrol.InspectHandle, found bool) { + ls := (*Stores)(ss) + if err := ls.VisitStores(func(s *Store) error { + if found { + return nil + } + if r := s.GetReplicaIfExists(rangeID); r != nil { + handle, found = r.flowControlV2.Inspect(context.Background()) + } + return nil + }); err != nil { + log.Errorf(ls.AnnotateCtx(context.Background()), + "unexpected error iterating stores: %s", err) + } + return handle, found +} + +// Inspect implements kvflowcontrol.InspectHandles. +func (ss *storesForRACv2) Inspect() []roachpb.RangeID { + ls := (*Stores)(ss) + var rangeIDs []roachpb.RangeID + if err := ls.VisitStores(func(s *Store) error { + s.VisitReplicas(func(r *Replica) (wantMore bool) { + rangeIDs = append(rangeIDs, r.RangeID) + return true + }) + return nil + }); err != nil { + log.Errorf(ls.AnnotateCtx(context.Background()), + "unexpected error iterating stores: %s", err) + } + return rangeIDs +} + type admissionDemuxHandle struct { v1Handle kvflowcontrol.ReplicationAdmissionHandle r *Replica diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 8e917b802c77..84c71c6853be 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -150,6 +150,7 @@ type Tokens int64 // Controller provides flow control for replication traffic in KV, held at the // node-level. type Controller interface { + InspectController // Admit seeks admission to replicate data, regardless of size, for work with // the given priority, create-time, and over the given stream. This blocks // until there are flow tokens available or the stream disconnects, subject to @@ -167,9 +168,6 @@ type Controller interface { // expected to have been deducted earlier with the same priority provided // here. ReturnTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) - // Inspect returns a snapshot of all underlying streams and their available - // {regular,elastic} tokens. It's used to power /inspectz. - Inspect(context.Context) []kvflowinspectpb.Stream // InspectStream returns a snapshot of a specific underlying stream and its // available {regular,elastic} tokens. It's used to power /inspectz. InspectStream(context.Context, Stream) kvflowinspectpb.Stream @@ -181,6 +179,12 @@ type Controller interface { // See I2, I3a and [^7] in kvflowcontrol/doc.go. } +type InspectController interface { + // Inspect returns a snapshot of all underlying streams and their available + // {regular,elastic} tokens. It's used to power /inspectz. + Inspect(context.Context) []kvflowinspectpb.Stream +} + // ReplicationAdmissionHandle abstracts waiting for admission across RACv1 and RACv2. type ReplicationAdmissionHandle interface { // Admit seeks admission to replicate data, regardless of size, for work @@ -218,6 +222,7 @@ type ReplicationAdmissionHandle interface { // kvflowcontrolpb.AdmittedRaftLogEntries for more details). type Handle interface { ReplicationAdmissionHandle + InspectHandle // DeductTokensFor deducts (without blocking) flow tokens for replicating // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it @@ -267,9 +272,6 @@ type Handle interface { // Admit(). It's only used when cluster settings change, settings that // affect all work waiting for flow tokens. ResetStreams(ctx context.Context) - // Inspect returns a serialized form of the underlying handle. It's used to - // power /inspectz. - Inspect(context.Context) kvflowinspectpb.Handle // Close closes the handle and returns all held tokens back to the // underlying controller. Typically used when the replica loses its lease // and/or raft leadership, or ends up getting GC-ed (if it's being @@ -277,10 +279,17 @@ type Handle interface { Close(context.Context) } +type InspectHandle interface { + // Inspect returns a serialized form of the underlying handle. It's used to + // power /inspectz. + Inspect(context.Context) kvflowinspectpb.Handle +} + // Handles represent a set of flow control handles. Note that handles are // typically held on replicas initiating replication traffic, so on a given node // they're uniquely identified by their range ID. type Handles interface { + InspectHandles // Lookup the kvflowcontrol.Handle for the specific range (or rather, the // replica of the specific range that's locally held). Lookup(roachpb.RangeID) (Handle, bool) @@ -288,9 +297,6 @@ type Handles interface { // kvflowcontrol.Handles, i.e. disconnect and reconnect each one. It // effectively unblocks all requests waiting in Admit(). ResetStreams(ctx context.Context) - // Inspect returns the set of ranges that have an embedded - // kvflowcontrol.Handle. It's used to power /inspectz. - Inspect() []roachpb.RangeID // TODO(irfansharif): When fixing I1 and I2 from kvflowcontrol/node.go, // we'll want to disconnect all streams for a specific node. Expose // something like the following to disconnect all replication streams bound @@ -308,6 +314,15 @@ type Handles interface { LookupReplicationAdmissionHandle(roachpb.RangeID) (ReplicationAdmissionHandle, bool) } +type InspectHandles interface { + // LookupInspect the serialized form of a handle for the specific range (or + // rather, the replica of the specific range that's locally held). + LookupInspect(roachpb.RangeID) (InspectHandle, bool) + // Inspect returns the set of ranges that have an embedded + // kvflowcontrol.Handle. It's used to power /inspectz. + Inspect() []roachpb.RangeID +} + // HandleFactory is used to construct new Handles. type HandleFactory interface { NewHandle(roachpb.RangeID, roachpb.TenantID) Handle diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 29ee17a30450..ab115ddbec06 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -238,4 +238,8 @@ func (d dummyHandles) Inspect() []roachpb.RangeID { return nil } +func (d dummyHandles) LookupInspect(id roachpb.RangeID) (kvflowcontrol.InspectHandle, bool) { + return nil, false +} + var _ kvflowcontrol.Handles = dummyHandles{} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 09a151d6ecd2..22cd43c0d781 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -39,6 +39,7 @@ import ( // None of the methods are called with Replica.mu held. The caller should // typically order its mutexes before Replica.mu. type RangeController interface { + kvflowcontrol.InspectHandle // WaitForEval seeks admission to evaluate a request at the given priority. // This blocks until there are positive tokens available for the request to // be admitted for evaluation, or the context is canceled (which returns an @@ -75,9 +76,6 @@ type RangeController interface { // // Requires replica.raftMu to be held. CloseRaftMuLocked(ctx context.Context) - // Inspect returns a handle containing the state of the range controller. - // It's used to power /inspectz-style debugging pages. - Inspect(ctx context.Context) kvflowinspectpb.Handle } // TODO(pav-kv): This interface a placeholder for the interface containing raft diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index a34c2cb41afe..567d74c1f420 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -413,6 +413,10 @@ type Processor interface { // and error will be nil. AdmitForEval( ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) (admitted bool, err error) + + // Inspect returns a handle to inspect the state of the underlying range + // controller. It is used to power /inspectz-style debugging pages. + Inspect(ctx context.Context) (kvflowcontrol.InspectHandle, bool) } type processorImpl struct { @@ -985,6 +989,17 @@ func (p *processorImpl) AdmitForEval( return p.mu.leader.rc.WaitForEval(ctx, pri) } +// Inspect returns a handle containing the state of the range controller. It's +// used to power /inspectz-style debugging pages. +func (p *processorImpl) Inspect(ctx context.Context) (kvflowcontrol.InspectHandle, bool) { + p.mu.leader.rcReferenceUpdateMu.RLock() + defer p.mu.leader.rcReferenceUpdateMu.RUnlock() + if p.mu.leader.rc == nil { + return nil, false + } + return p.mu.leader.rc, true +} + func admittedIncreased(prev, next [raftpb.NumPriorities]uint64) bool { for i := range prev { if prev[i] < next[i] { diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 5ef1f1c5a6ec..1fb986fb115b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -500,6 +500,11 @@ func TestProcessorBasic(t *testing.T) { } return builderStr() + case "inspect": + rc := rcFactory.rcs[len(rcFactory.rcs)-1] + rc.Inspect(ctx) + return builderStr() + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 3622bc89aa1b..30ed6ff3236e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -800,3 +800,7 @@ HandleRaftReady: Replica.MuUnlock RangeController.CloseRaftMuLocked ..... + +inspect +---- + RangeController.Inspect