Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130270: kvserver,inspectz: add rac2 inspectz support  r=sumeerbhola a=kvoli

The existing flow control tables are:

```
crdb_internal.kv_flow_controller
crdb_internal.kv_flow_control_handles
crdb_internal.kv_flow_token_deductions
```

Which are used in testing to assert on flow control stream state and
transitions.

Introduce another set of three tables, most of which are identical to the
exsting tables but populating the table data using rac2.

```
crdb_internal.kv_flow_controller_v2
crdb_internal.kv_flow_control_handles_v2
crdb_internal.kv_flow_token_deductions_v2
```

`crdb_internal.kv_flow_controller_v2` has two additional columns,
tracking the amount of available (regular|elastic) send tokens. The
schema is:

```
CREATE TABLE crdb_internal.kv_flow_controller_v2 (
  tenant_id                     INT NOT NULL,
  store_id                      INT NOT NULL,
  available_eval_regular_tokens INT NOT NULL,
  available_eval_elastic_tokens INT NOT NULL,
  available_send_regular_tokens INT NOT NULL,
  available_send_elastic_tokens INT NOT NULL
);
```

Note that unless rac2 is enabled, the tables are unlikely to show
anything interesting. Both tables (v1 and v2) are kept for
compatibility, with an intent to replace the v1 tables with the v2 ones
after v1 replication flow control is removed entirely from the code.

Resolves: cockroachdb#128091
Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
2 people authored and msbutler committed Sep 13, 2024
2 parents a2f37eb + c0a5058 commit 1a2fa7f
Show file tree
Hide file tree
Showing 45 changed files with 1,760 additions and 510 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ crdb_internal kv_catalog_namespace table node NULL N
crdb_internal kv_catalog_zones table node NULL NULL
crdb_internal kv_dropped_relations view node NULL NULL
crdb_internal kv_flow_control_handles table node NULL NULL
crdb_internal kv_flow_control_handles_v2 table node NULL NULL
crdb_internal kv_flow_controller table node NULL NULL
crdb_internal kv_flow_controller_v2 table node NULL NULL
crdb_internal kv_flow_token_deductions table node NULL NULL
crdb_internal kv_flow_token_deductions_v2 table node NULL NULL
crdb_internal kv_inherited_role_members table node NULL NULL
crdb_internal kv_node_liveness table node NULL NULL
crdb_internal kv_node_status table node NULL NULL
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ table_name NOT IN (
'kv_dropped_relations',
'kv_inherited_role_members',
'kv_flow_control_handles',
'kv_flow_control_handles_v2',
'kv_flow_controller',
'kv_flow_controller_v2',
'kv_flow_token_deductions',
'kv_flow_token_deductions_v2',
'lost_descriptors_with_data',
'table_columns',
'table_row_statistics',
Expand Down
144 changes: 96 additions & 48 deletions pkg/inspectz/inspectz.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,92 +34,140 @@ const URLPrefix = "/inspectz/"
type Server struct {
log.AmbientContext

mux *http.ServeMux
handles kvflowcontrol.Handles
kvflowController kvflowcontrol.Controller
mux *http.ServeMux
handlesV1, handlesV2 kvflowcontrol.InspectHandles
kvflowControllerV1, kvflowControllerV2 kvflowcontrol.InspectController
}

var _ inspectzpb.InspectzServer = &Server{}

// NewServer sets up an inspectz server.
func NewServer(
ambient log.AmbientContext,
handles kvflowcontrol.Handles,
kvflowController kvflowcontrol.Controller,
handlesV1, handlesV2 kvflowcontrol.InspectHandles,
kvflowControllerV1, kvflowControllerV2 kvflowcontrol.InspectController,
) *Server {
mux := http.NewServeMux()
server := &Server{
AmbientContext: ambient,

mux: mux,
handles: handles,
kvflowController: kvflowController,
mux: mux,
handlesV1: handlesV1,
handlesV2: handlesV2,
kvflowControllerV1: kvflowControllerV1,
kvflowControllerV2: kvflowControllerV2,
}
mux.Handle("/inspectz/kvflowhandles", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.HandlesRequest{}
if rangeIDs, ok := parseRangeIDs(r.URL.Query().Get("ranges"), w); ok {
req.RangeIDs = rangeIDs
}
resp, err := server.KVFlowHandles(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))
mux.Handle("/inspectz/kvflowcontroller", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.ControllerRequest{}
resp, err := server.KVFlowController(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))
mux.Handle("/inspectz/v1/kvflowhandles", server.makeKVFlowHandlesHandler(server.KVFlowHandles))
mux.Handle("/inspectz/v1/kvflowcontroller", server.makeKVFlowControllerHandler(server.KVFlowController))
mux.Handle("/inspectz/v2/kvflowhandles", server.makeKVFlowHandlesHandler(server.KVFlowHandlesV2))
mux.Handle("/inspectz/v2/kvflowcontroller", server.makeKVFlowControllerHandler(server.KVFlowControllerV2))

return server
}

func (s *Server) makeKVFlowHandlesHandler(
impl func(
ctx context.Context,
request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error),
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := s.AnnotateCtx(context.Background())

req := &kvflowinspectpb.HandlesRequest{}
if rangeIDs, ok := parseRangeIDs(r.URL.Query().Get("ranges"), w); ok {
req.RangeIDs = rangeIDs
}
resp, err := impl(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
}
}

func (s *Server) makeKVFlowControllerHandler(
impl func(
ctx context.Context,
request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error),
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := s.AnnotateCtx(context.Background())

req := &kvflowinspectpb.ControllerRequest{}
resp, err := impl(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
}
}

// KVFlowController implements the InspectzServer interface.
func (s *Server) KVFlowController(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return &kvflowinspectpb.ControllerResponse{
Streams: s.kvflowController.Inspect(ctx),
}, nil
return kvFlowController(ctx, request, s.kvflowControllerV1)
}

// KVFlowHandles implements the InspectzServer interface.
func (s *Server) KVFlowHandles(
ctx context.Context, request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error) {
return kvFlowHandles(ctx, request, s.handlesV1)
}

// KVFlowControllerV2 implements the InspectzServer interface.
func (s *Server) KVFlowControllerV2(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return kvFlowController(ctx, request, s.kvflowControllerV2)
}

// KVFlowHandlesV2 implements the InspectzServer interface.
func (s *Server) KVFlowHandlesV2(
ctx context.Context, request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error) {
return kvFlowHandles(ctx, request, s.handlesV2)
}

// ServeHTTP serves various tools under the /debug endpoint.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
}

func kvFlowHandles(
ctx context.Context,
request *kvflowinspectpb.HandlesRequest,
handles kvflowcontrol.InspectHandles,
) (*kvflowinspectpb.HandlesResponse, error) {
resp := &kvflowinspectpb.HandlesResponse{}
if len(request.RangeIDs) == 0 {
request.RangeIDs = s.handles.Inspect()
request.RangeIDs = handles.Inspect()
}
for _, rangeID := range request.RangeIDs {
handle, found := s.handles.Lookup(rangeID)
handle, found := handles.LookupInspect(rangeID)
if !found {
continue // nothing to do
}
resp.Handles = append(resp.Handles, handle.Inspect(ctx))
resp.Handles = append(resp.Handles, handle)
}
return resp, nil
}

// ServeHTTP serves various tools under the /debug endpoint.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
func kvFlowController(
ctx context.Context,
request *kvflowinspectpb.ControllerRequest,
controller kvflowcontrol.InspectController,
) (*kvflowinspectpb.ControllerResponse, error) {
return &kvflowinspectpb.ControllerResponse{
Streams: controller.Inspect(ctx),
}, nil
}

func respond(ctx context.Context, w http.ResponseWriter, code int, payload interface{}) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/inspectz/inspectzpb/inspectz.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ service Inspectz {
rpc KVFlowHandles(kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesRequest)
returns (kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesResponse) {}

// KVFlowController exposes in-memory state of the node-level
// rac2.StreamTokenCounterProvider. It's housed under
// /inspectz/v2/kvflowcontroller.
rpc KVFlowControllerV2(kv.kvserver.kvflowcontrol.kvflowinspectpb.ControllerRequest)
returns (kv.kvserver.kvflowcontrol.kvflowinspectpb.ControllerResponse) {}

// KVFlowHandles exposes in-memory state of all rac2.RangeController(s). It's
// housed under /inspectz/v2/kvflowhandles.
rpc KVFlowHandlesV2(kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesRequest)
returns (kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesResponse) {}

}

// As of 04/23, we're not invoking these RPC interfaces as RPCs. But they're
Expand Down
14 changes: 14 additions & 0 deletions pkg/inspectz/unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,17 @@ func (u Unsupported) KVFlowHandles(
) (*kvflowinspectpb.HandlesResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}

// KVFlowControllerV2 is part of the inspectzpb.InspectzServer interface.
func (u Unsupported) KVFlowControllerV2(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}

// KVFlowHandlesV2 is part of the inspectzpb.InspectzServer interface.
func (u Unsupported) KVFlowHandlesV2(
ctx context.Context, request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ go_library(
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/kv/kvserver/kvflowcontrol/node_rac2",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,24 +2357,24 @@ func (h *flowControlTestHelper) checkAllTokensReturned(
return fmt.Errorf("expected %d replication streams, got %d", expStreamCount, len(streams))
}
for _, stream := range streams {
if stream.AvailableRegularTokens != 16<<20 {
if stream.AvailableEvalRegularTokens != 16<<20 {
return fmt.Errorf("expected %s of regular flow tokens for %s, got %s",
humanize.IBytes(16<<20),
kvflowcontrol.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
},
humanize.IBytes(uint64(stream.AvailableRegularTokens)),
humanize.IBytes(uint64(stream.AvailableEvalRegularTokens)),
)
}
if stream.AvailableElasticTokens != 8<<20 {
if stream.AvailableEvalElasticTokens != 8<<20 {
return fmt.Errorf("expected %s of elastic flow tokens for %s, got %s",
humanize.IBytes(8<<20),
kvflowcontrol.Stream{
TenantID: stream.TenantID,
StoreID: stream.StoreID,
},
humanize.IBytes(uint64(stream.AvailableElasticTokens)),
humanize.IBytes(uint64(stream.AvailableEvalElasticTokens)),
)
}
}
Expand Down
68 changes: 68 additions & 0 deletions pkg/kv/kvserver/flow_control_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -56,6 +57,16 @@ func (sh *storesForFlowControl) Lookup(
return handle, found
}

// LookupInspect is part of the StoresForFlowControl interface.
func (sh *storesForFlowControl) LookupInspect(
rangeID roachpb.RangeID,
) (handle kvflowinspectpb.Handle, found bool) {
if handle, found := sh.Lookup(rangeID); found {
return handle.Inspect(context.Background()), found
}
return kvflowinspectpb.Handle{}, false
}

// LookupReplicationAdmissionHandle is part of the StoresForFlowControl
// interface.
func (sh *storesForFlowControl) LookupReplicationAdmissionHandle(
Expand Down Expand Up @@ -130,6 +141,16 @@ func (sh *storeForFlowControl) Lookup(
return repl.mu.replicaFlowControlIntegration.handle()
}

// LookupInspect is part of the StoresForFlowControl interface.
func (sh *storeForFlowControl) LookupInspect(
rangeID roachpb.RangeID,
) (handle kvflowinspectpb.Handle, found bool) {
if handle, found := sh.Lookup(rangeID); found {
return handle.Inspect(context.Background()), found
}
return kvflowinspectpb.Handle{}, false
}

// LookupReplicationAdmissionHandle is part of the StoresForFlowControl
// interface.
func (sh *storeForFlowControl) LookupReplicationAdmissionHandle(
Expand Down Expand Up @@ -267,6 +288,13 @@ func (l NoopStoresFlowControlIntegration) LookupReplicationAdmissionHandle(
func (l NoopStoresFlowControlIntegration) ResetStreams(context.Context) {
}

// LookupInspect is part of the StoresForFlowControl interface.
func (l NoopStoresFlowControlIntegration) LookupInspect(
roachpb.RangeID,
) (kvflowinspectpb.Handle, bool) {
return kvflowinspectpb.Handle{}, false
}

// Inspect is part of the StoresForFlowControl interface.
func (l NoopStoresFlowControlIntegration) Inspect() []roachpb.RangeID {
return nil
Expand All @@ -284,6 +312,7 @@ func (NoopStoresFlowControlIntegration) OnRaftTransportDisconnected(
type StoresForRACv2 interface {
admission.OnLogEntryAdmitted
PiggybackedAdmittedResponseScheduler
kvflowcontrol.InspectHandles
}

// PiggybackedAdmittedResponseScheduler routes followers piggybacked admitted
Expand Down Expand Up @@ -350,6 +379,45 @@ func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2(
}
}

// LookupInspect implements kvflowcontrol.InspectHandles.
func (ss *storesForRACv2) LookupInspect(
rangeID roachpb.RangeID,
) (handle kvflowinspectpb.Handle, found bool) {
ls := (*Stores)(ss)
if err := ls.VisitStores(func(s *Store) error {
if found {
return nil
}
if r := s.GetReplicaIfExists(rangeID); r != nil {
r.raftMu.Lock()
defer r.raftMu.Unlock()
handle, found = r.flowControlV2.InspectRaftMuLocked(context.Background())
}
return nil
}); err != nil {
log.Errorf(ls.AnnotateCtx(context.Background()),
"unexpected error iterating stores: %s", err)
}
return handle, found
}

// Inspect implements kvflowcontrol.InspectHandles.
func (ss *storesForRACv2) Inspect() []roachpb.RangeID {
ls := (*Stores)(ss)
var rangeIDs []roachpb.RangeID
if err := ls.VisitStores(func(s *Store) error {
s.VisitReplicas(func(r *Replica) (wantMore bool) {
rangeIDs = append(rangeIDs, r.RangeID)
return true
})
return nil
}); err != nil {
log.Errorf(ls.AnnotateCtx(context.Background()),
"unexpected error iterating stores: %s", err)
}
return rangeIDs
}

type admissionDemuxHandle struct {
v1Handle kvflowcontrol.ReplicationAdmissionHandle
r *Replica
Expand Down
Loading

0 comments on commit 1a2fa7f

Please sign in to comment.