Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132252: storeliveness,inspectz,sql: expose store liveness state r=nvanbenschoten a=miraradeva

The first commit introduces two new inspectz endpoints:

- `/inspectz/storeliveness/supportFrom`: for each store on the local server, returns the set of stores the local store is receiving support from, along with the corresponding epochs and expiration timestamps.

- `/inspectz/storeliveness/supportFor`: for each store on the local server, returns the set of stores the local store is providing support for, along with the corresponding epochs and expiration timestamps.

The second commit adds two new `crdb_internal` tables that expose the node-level view of the local stores' support for and from other stores. They are powered by the /inspectz server API.

Both new tables have the same schema:

```
CREATE TABLE crdb_internal.store_liveness_support_{for,from} (
node_id INT NOT NULL,
store_id INT NOT NULL,
support_{for,from}_node_id INT NOT NULL,
support_{for,from}_store_id INT NOT NULL,
support_epoch INT NOT NULL,
support_expiration TIMESTAMP NOT NULL
```

These tables will be empty unless both store liveness and Raft leader fortification are enabled.

133279: logictest: improve reassign_owned_by test to show names r=rafiss a=rafiss

This makes it easier to read the test instead of having to use internal IDs.

Epic: None
Release note: None

Co-authored-by: Mira Radeva <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
3 people committed Oct 28, 2024
3 parents 7818c86 + 6f03abe + e7c7ef1 commit 9acaaa7
Show file tree
Hide file tree
Showing 31 changed files with 739 additions and 429 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ crdb_internal statement_activity view node NULL N
crdb_internal statement_statistics view node NULL NULL
crdb_internal statement_statistics_persisted view node NULL NULL
crdb_internal statement_statistics_persisted_v22_2 view node NULL NULL
crdb_internal store_liveness_support_for table node NULL NULL
crdb_internal store_liveness_support_from table node NULL NULL
crdb_internal super_regions table node NULL NULL
crdb_internal system_jobs table node NULL NULL
crdb_internal table_columns table node NULL NULL
Expand Down
12 changes: 7 additions & 5 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,23 @@ table_name NOT IN (
'predefined_comments',
'session_trace',
'session_variables',
'table_spans',
'table_spans',
'tables',
'cluster_statement_statistics',
'statement_activity',
'statement_activity',
'statement_statistics_persisted',
'statement_statistics_persisted_v22_2',
'store_liveness_support_for',
'store_liveness_support_from',
'cluster_transaction_statistics',
'statement_statistics',
'transaction_activity',
'transaction_activity',
'transaction_statistics_persisted',
'transaction_statistics_persisted_v22_2',
'transaction_statistics',
'tenant_usage_details',
'pg_catalog_table_is_implemented',
'fully_qualified_names'
'pg_catalog_table_is_implemented',
'fully_qualified_names'
)
ORDER BY name ASC`)
assert.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/inspectz/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/inspectz/inspectzpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/kv/kvserver/storeliveness/storelivenesspb",
"//pkg/roachpb",
"//pkg/util/errorutil",
"//pkg/util/log",
Expand Down
51 changes: 51 additions & 0 deletions pkg/inspectz/inspectz.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand All @@ -32,6 +34,7 @@ type Server struct {
mux *http.ServeMux
handlesV1, handlesV2 kvflowcontrol.InspectHandles
kvflowControllerV1, kvflowControllerV2 kvflowcontrol.InspectController
storeLiveness kvserver.InspectAllStoreLiveness
}

var _ inspectzpb.InspectzServer = &Server{}
Expand All @@ -41,6 +44,7 @@ func NewServer(
ambient log.AmbientContext,
handlesV1, handlesV2 kvflowcontrol.InspectHandles,
kvflowControllerV1, kvflowControllerV2 kvflowcontrol.InspectController,
storeLiveness kvserver.InspectAllStoreLiveness,
) *Server {
mux := http.NewServeMux()
server := &Server{
Expand All @@ -51,11 +55,20 @@ func NewServer(
handlesV2: handlesV2,
kvflowControllerV1: kvflowControllerV1,
kvflowControllerV2: kvflowControllerV2,
storeLiveness: storeLiveness,
}
mux.Handle("/inspectz/v1/kvflowhandles", server.makeKVFlowHandlesHandler(server.KVFlowHandles))
mux.Handle("/inspectz/v1/kvflowcontroller", server.makeKVFlowControllerHandler(server.KVFlowController))
mux.Handle("/inspectz/v2/kvflowhandles", server.makeKVFlowHandlesHandler(server.KVFlowHandlesV2))
mux.Handle("/inspectz/v2/kvflowcontroller", server.makeKVFlowControllerHandler(server.KVFlowControllerV2))
mux.Handle(
"/inspectz/storeliveness/supportFrom",
server.makeStoreLivenessHandler(server.StoreLivenessSupportFrom),
)
mux.Handle(
"/inspectz/storeliveness/supportFor",
server.makeStoreLivenessHandler(server.StoreLivenessSupportFor),
)

return server
}
Expand Down Expand Up @@ -103,6 +116,24 @@ func (s *Server) makeKVFlowControllerHandler(
}
}

func (s *Server) makeStoreLivenessHandler(
impl func(ctx context.Context, request *slpb.InspectStoreLivenessRequest) (
*slpb.InspectStoreLivenessResponse, error,
),
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := s.AnnotateCtx(context.Background())
req := &slpb.InspectStoreLivenessRequest{}
resp, err := impl(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
}
}

// KVFlowController implements the InspectzServer interface.
func (s *Server) KVFlowController(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
Expand Down Expand Up @@ -131,6 +162,26 @@ func (s *Server) KVFlowHandlesV2(
return kvFlowHandles(ctx, request, s.handlesV2)
}

// StoreLivenessSupportFrom implements the InspectzServer interface.
func (s *Server) StoreLivenessSupportFrom(
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
) (*slpb.InspectStoreLivenessResponse, error) {
resp := &slpb.InspectStoreLivenessResponse{}
support, err := s.storeLiveness.InspectAllSupportFrom()
resp.SupportStatesPerStore = support
return resp, err
}

// StoreLivenessSupportFor implements the InspectzServer interface.
func (s *Server) StoreLivenessSupportFor(
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
) (*slpb.InspectStoreLivenessResponse, error) {
resp := &slpb.InspectStoreLivenessResponse{}
support, err := s.storeLiveness.InspectAllSupportFor()
resp.SupportStatesPerStore = support
return resp, err
}

// ServeHTTP serves various tools under the /debug endpoint.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
Expand Down
2 changes: 2 additions & 0 deletions pkg/inspectz/inspectzpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb_proto",
"//pkg/kv/kvserver/storeliveness/storelivenesspb:storelivenesspb_proto",
"@go_googleapis//google/api:annotations_proto",
],
)
Expand All @@ -21,6 +22,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/kv/kvserver/storeliveness/storelivenesspb",
"@org_golang_google_genproto//googleapis/api/annotations:go_default_library",
],
)
Expand Down
13 changes: 13 additions & 0 deletions pkg/inspectz/inspectzpb/inspectz.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb";

import "google/api/annotations.proto";
import "kv/kvserver/kvflowcontrol/kvflowinspectpb/kvflowinspect.proto";
import "kv/kvserver/storeliveness/storelivenesspb/service.proto";

// Inspectz exposes in-memory state of various CRDB components.
//
Expand Down Expand Up @@ -38,6 +39,18 @@ service Inspectz {
rpc KVFlowHandlesV2(kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesRequest)
returns (kv.kvserver.kvflowcontrol.kvflowinspectpb.HandlesResponse) {}

// StoreLivenessSupportFrom exposes the in-memory state of all stores'
// storeliveness.SupportManagers' views of support provided from other stores.
// It's housed under /inspectz/storeliveness/supportFrom.
rpc StoreLivenessSupportFrom(kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessRequest)
returns (kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessResponse) {}

// StoreLivenessSupportFor exposes the in-memory state of all stores'
// storeliveness.SupportManagers' views of support provided for other stores.
// It's housed under /inspectz/storeliveness/supportFor.
rpc StoreLivenessSupportFor(kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessRequest)
returns (kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessResponse) {}

}

// As of 04/23, we're not invoking these RPC interfaces as RPCs. But they're
Expand Down
15 changes: 15 additions & 0 deletions pkg/inspectz/unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
)

Expand Down Expand Up @@ -46,3 +47,17 @@ func (u Unsupported) KVFlowHandlesV2(
) (*kvflowinspectpb.HandlesResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}

// StoreLivenessSupportFrom is part of the inspectzpb.InspectzServer interface.
func (u Unsupported) StoreLivenessSupportFrom(
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
) (*slpb.InspectStoreLivenessResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}

// StoreLivenessSupportFor is part of the inspectzpb.InspectzServer interface.
func (u Unsupported) StoreLivenessSupportFor(
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
) (*slpb.InspectStoreLivenessResponse, error) {
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ go_library(
"stores.go",
"stores_base.go",
"stores_server.go",
"stores_store_liveness.go",
"testing_knobs.go",
"ts_maintenance_queue.go",
":gen-refreshraftreason-stringer", # keep
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/storeliveness/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
// Fabric is a representation of the Store Liveness fabric. It provides
// information about uninterrupted periods of "support" between stores.
type Fabric interface {
InspectFabric

// SupportFor returns the epoch of the current uninterrupted period of Store
// Liveness support from the local store (S_local) for the store (S_remote)
// corresponding to the specified id, and a boolean indicating whether S_local
Expand Down Expand Up @@ -57,3 +59,10 @@ type Fabric interface {
// to ensure any promise by the local store to provide support is still kept.
SupportFromEnabled(ctx context.Context) bool
}

// InspectFabric is an interface that exposes all in-memory support state for a
// given store. It is used to power the Store Liveness /inspectz functionality.
type InspectFabric interface {
InspectSupportFrom() slpb.SupportStatesPerStore
InspectSupportFor() slpb.SupportStatesPerStore
}
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/storeliveness/requester_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ func (rsh *requesterStateHandler) getSupportFrom(id slpb.StoreIdent) (slpb.Suppo
return supportState, ok
}

// exportAllSupportFrom exports a copy of all SupportStates from the
// requesterState.supportFrom map.
func (rsh *requesterStateHandler) exportAllSupportFrom() []slpb.SupportState {
rsh.mu.RLock()
defer rsh.mu.RUnlock()
supportStates := make([]slpb.SupportState, len(rsh.requesterState.supportFrom))
for _, ss := range rsh.requesterState.supportFrom {
supportStates = append(supportStates, ss.state)
}
return supportStates
}

// addStore adds a store to the requesterState.supportFrom map, if not present.
// The function returns a boolean indicating whether the store was added.
func (rsh *requesterStateHandler) addStore(id slpb.StoreIdent) bool {
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/storeliveness/storelivenesspb/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,20 @@ message SupportState {
int64 epoch = 2 [(gogoproto.casttype) = "Epoch"];
util.hlc.Timestamp expiration = 3 [(gogoproto.nullable) = false];
}

// InspectStoreLivenessRequest is used to power the Store Liveness /inspectz
// functionality. The request doesn't take any parameters.
message InspectStoreLivenessRequest {}

// InspectStoreLivenessRequest is used to power the Store Liveness /inspectz
// functionality. The response is a list of SupportStatesPerStore.
message InspectStoreLivenessResponse {
repeated SupportStatesPerStore support_states_per_store = 1 [(gogoproto.nullable) = false];
}

// SupportStatesPerStore includes all SupportStates for a given store; they
// correspond to either the support-from or support-for map of a given store.
message SupportStatesPerStore {
StoreIdent store_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "StoreID"];
repeated SupportState support_states = 2 [(gogoproto.nullable) = false];
}
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type SupportManager struct {
metrics *SupportManagerMetrics
}

var _ Fabric = (*SupportManager)(nil)

// NewSupportManager creates a new Store Liveness SupportManager. The main
// goroutine that processes Store Liveness messages is initialized
// separately, via Start.
Expand Down Expand Up @@ -107,6 +109,18 @@ func (sm *SupportManager) SupportFor(id slpb.StoreIdent) (slpb.Epoch, bool) {
return ss.Epoch, !ss.Expiration.IsEmpty()
}

// InspectSupportFrom implements the InspectFabric interface.
func (sm *SupportManager) InspectSupportFrom() slpb.SupportStatesPerStore {
supportStates := sm.requesterStateHandler.exportAllSupportFrom()
return slpb.SupportStatesPerStore{StoreID: sm.storeID, SupportStates: supportStates}
}

// InspectSupportFor implements the InspectFabric interface.
func (sm *SupportManager) InspectSupportFor() slpb.SupportStatesPerStore {
supportStates := sm.supporterStateHandler.exportAllSupportFor()
return slpb.SupportStatesPerStore{StoreID: sm.storeID, SupportStates: supportStates}
}

// SupportFrom implements the Fabric interface. It delegates the response to the
// SupportManager's supporterStateHandler.
func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/storeliveness/supporter_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ func (ssh *supporterStateHandler) getNumSupportFor() int {
return len(ssh.supporterState.supportFor)
}

// exportAllSupportFor exports a copy of all SupportStates from the
// supporterState.supportFor map.
func (ssh *supporterStateHandler) exportAllSupportFor() []slpb.SupportState {
ssh.mu.RLock()
defer ssh.mu.RUnlock()
supportStates := make([]slpb.SupportState, len(ssh.supporterState.supportFor))
for _, ss := range ssh.supporterState.supportFor {
supportStates = append(supportStates, ss)
}
return supportStates
}

// Functions for handling supporterState updates.

// assertMeta ensures the meta in the inProgress view does not regress any of
Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/stores_store_liveness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package kvserver

import slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"

// InspectAllStoreLiveness is an interface that allows for per-store Store
// Liveness state to be combined into a per-node view. It powers the inspectz
// Store Liveness functionality.
type InspectAllStoreLiveness interface {
InspectAllSupportFrom() ([]slpb.SupportStatesPerStore, error)
InspectAllSupportFor() ([]slpb.SupportStatesPerStore, error)
}

// StoresForStoreLiveness is a wrapper around Stores that implements
// InspectAllStoreLiveness.
type StoresForStoreLiveness Stores

var _ InspectAllStoreLiveness = (*StoresForStoreLiveness)(nil)

// MakeStoresForStoreLiveness casts Stores into StoresForStoreLiveness.
func MakeStoresForStoreLiveness(stores *Stores) *StoresForStoreLiveness {
return (*StoresForStoreLiveness)(stores)
}

// InspectAllSupportFrom implements the InspectAllStoreLiveness interface. It
// iterates over all stores and aggregates their SupportFrom SupportStates.
func (sfsl *StoresForStoreLiveness) InspectAllSupportFrom() ([]slpb.SupportStatesPerStore, error) {
stores := (*Stores)(sfsl)
var sspf []slpb.SupportStatesPerStore
err := stores.VisitStores(
func(s *Store) error {
sspf = append(sspf, s.storeLiveness.InspectSupportFrom())
return nil
},
)
return sspf, err
}

// InspectAllSupportFor implements the InspectAllStoreLiveness interface. It
// iterates over all stores and aggregates their SupportFor SupportStates.
func (sfsl *StoresForStoreLiveness) InspectAllSupportFor() ([]slpb.SupportStatesPerStore, error) {
stores := (*Stores)(sfsl)
var sspf []slpb.SupportStatesPerStore
err := stores.VisitStores(
func(s *Store) error {
sspf = append(sspf, s.storeLiveness.InspectSupportFor())
return nil
},
)
return sspf, err
}
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
storesForRACv2,
node.storeCfg.KVFlowController,
node.storeCfg.KVFlowStreamTokenProvider,
kvserver.MakeStoresForStoreLiveness(stores),
)
if err = cfg.CidrLookup.Start(ctx, stopper); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 9acaaa7

Please sign in to comment.