Skip to content

Commit

Permalink
server,inspectz: introduce v2 flow control inspectz service
Browse files Browse the repository at this point in the history
Rename the existing `inspectz/(kvflowhandles|kvflowcontroller)` endpoint
to `inspectz/v1/(kvflowhandles|kvflowcontroller)`, and introduce the v2
endpoint serving the same types,
`inspectz/v2/(kvflowhandles|kvflowcontroller)`.

Note that these endpoints are not used internally in any webpages or
testing and are not relied upon. Internal tables call
`KVFlowHandles(V1)` or `KVFlowController(V1)` directly to populate
themselves.

Part of: cockroachdb#128091
Release note: None
  • Loading branch information
kvoli committed Sep 9, 2024
1 parent a0ef89d commit e89bcbc
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 42 deletions.
129 changes: 87 additions & 42 deletions pkg/inspectz/inspectz.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,68 +34,86 @@ const URLPrefix = "/inspectz/"
type Server struct {
log.AmbientContext

mux *http.ServeMux
handles kvflowcontrol.Handles
kvflowController kvflowcontrol.Controller
mux *http.ServeMux
handlesV1, handlesV2 kvflowcontrol.InspectHandles
kvflowControllerV1, kvflowControllerV2 kvflowcontrol.InspectController
}

var _ inspectzpb.InspectzServer = &Server{}

// NewServer sets up an inspectz server.
func NewServer(
ambient log.AmbientContext,
handles kvflowcontrol.Handles,
kvflowController kvflowcontrol.Controller,
handlesV1, handlesV2 kvflowcontrol.InspectHandles,
kvflowControllerV1, kvflowControllerV2 kvflowcontrol.InspectController,
) *Server {
mux := http.NewServeMux()
server := &Server{
AmbientContext: ambient,

mux: mux,
handles: handles,
kvflowController: kvflowController,
mux: mux,
handlesV1: handlesV1,
handlesV2: handlesV2,
kvflowControllerV1: kvflowControllerV1,
kvflowControllerV2: kvflowControllerV2,
}
mux.Handle("/inspectz/kvflowhandles", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.HandlesRequest{}
if rangeIDs, ok := parseRangeIDs(r.URL.Query().Get("ranges"), w); ok {
req.RangeIDs = rangeIDs
}
resp, err := server.KVFlowHandles(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))
mux.Handle("/inspectz/kvflowcontroller", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.ControllerRequest{}
resp, err := server.KVFlowController(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))
mux.Handle("/inspectz/v1/kvflowhandles", server.makeKVFlowHandlesHandler(server.KVFlowHandles))
mux.Handle("/inspectz/v1/kvflowcontroller", server.makeKVFlowControllerHandler(server.KVFlowController))
mux.Handle("/inspectz/v2/kvflowhandles", server.makeKVFlowHandlesHandler(server.KVFlowHandlesV2))
mux.Handle("/inspectz/v2/kvflowcontroller", server.makeKVFlowControllerHandler(server.KVFlowControllerV2))

return server
}

func (s *Server) makeKVFlowHandlesHandler(
impl func(
ctx context.Context,
request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error),
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := s.AnnotateCtx(context.Background())

req := &kvflowinspectpb.HandlesRequest{}
if rangeIDs, ok := parseRangeIDs(r.URL.Query().Get("ranges"), w); ok {
req.RangeIDs = rangeIDs
}
resp, err := impl(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
}
}

func (s *Server) makeKVFlowControllerHandler(
impl func(
ctx context.Context,
request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error),
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := s.AnnotateCtx(context.Background())

req := &kvflowinspectpb.ControllerRequest{}
resp, err := impl(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
}
}

// KVFlowController implements the InspectzServer interface.
func (s *Server) KVFlowController(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return &kvflowinspectpb.ControllerResponse{
Streams: s.kvflowController.Inspect(ctx),
Streams: s.kvflowControllerV1.Inspect(ctx),
}, nil
}

Expand All @@ -105,10 +123,37 @@ func (s *Server) KVFlowHandles(
) (*kvflowinspectpb.HandlesResponse, error) {
resp := &kvflowinspectpb.HandlesResponse{}
if len(request.RangeIDs) == 0 {
request.RangeIDs = s.handles.Inspect()
request.RangeIDs = s.handlesV1.Inspect()
}
for _, rangeID := range request.RangeIDs {
handle, found := s.handlesV1.LookupInspect(rangeID)
if !found {
continue // nothing to do
}
resp.Handles = append(resp.Handles, handle.Inspect(ctx))
}
return resp, nil
}

// KVFlowControllerV2 implements the InspectzServer interface.
func (s *Server) KVFlowControllerV2(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return &kvflowinspectpb.ControllerResponse{
Streams: s.kvflowControllerV2.Inspect(ctx),
}, nil
}

// KVFlowHandlesV2 implements the InspectzServer interface.
func (s *Server) KVFlowHandlesV2(
ctx context.Context, request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error) {
resp := &kvflowinspectpb.HandlesResponse{}
if len(request.RangeIDs) == 0 {
request.RangeIDs = s.handlesV2.Inspect()
}
for _, rangeID := range request.RangeIDs {
handle, found := s.handles.Lookup(rangeID)
handle, found := s.handlesV2.LookupInspect(rangeID)
if !found {
continue // nothing to do
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/inspectz/inspectzpb/inspectz.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ service Inspectz {
rpc KVFlowHandles(kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesRequest)
returns (kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesResponse) {}

// KVFlowController exposes in-memory state of the node-level
// rac2.StreamTokenCounterProvider. It's housed under
// /inspectz/v2/kvflowcontroller.
rpc KVFlowControllerV2(kv.kvserver.kvflowcontrol.kvflowinspectpb.ControllerRequest)
returns (kv.kvserver.kvflowcontrol.kvflowinspectpb.ControllerResponse) {}

// KVFlowHandles exposes in-memory state of all rac2.RangeController(s). It's
// housed under /inspectz/v2/kvflowhandles.
rpc KVFlowHandlesV2(kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesRequest)
returns (kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesResponse) {}

}

// As of 04/23, we're not invoking these RPC interfaces as RPCs. But they're
Expand Down
14 changes: 14 additions & 0 deletions pkg/inspectz/unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,17 @@ func (u Unsupported) KVFlowHandles(
) (*kvflowinspectpb.HandlesResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}

// KVFlowControllerV2 is part of the inspectzpb.InspectzServer interface.
func (u Unsupported) KVFlowControllerV2(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}

// KVFlowHandlesV2 is part of the inspectzpb.InspectzServer interface.
func (u Unsupported) KVFlowHandlesV2(
ctx context.Context, request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
inspectzServer := inspectz.NewServer(
cfg.BaseConfig.AmbientCtx,
node.storeCfg.KVFlowHandles,
kvserver.MakeStoresForRACv2(stores),
node.storeCfg.KVFlowController,
node.storeCfg.KVFlowStreamTokenProvider,
)
cfg.CidrLookup.Start(ctx, stopper)

Expand Down

0 comments on commit e89bcbc

Please sign in to comment.