diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 4e4eafa7525b..b52dad6a12bd 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2936,7 +2936,7 @@ Support status: [reserved](#support-status) | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | -| node_id | [string](#cockroach.server.serverpb.SpanStatsRequest-string) | | | [reserved](#support-status) | +| node_id | [int32](#cockroach.server.serverpb.SpanStatsRequest-int32) | | | [reserved](#support-status) | | start_key | [bytes](#cockroach.server.serverpb.SpanStatsRequest-bytes) | | | [reserved](#support-status) | | end_key | [bytes](#cockroach.server.serverpb.SpanStatsRequest-bytes) | | | [reserved](#support-status) | diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 861d35784cd7..bef5b1229301 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1159,6 +1159,8 @@ GO_TARGETS = [ "//pkg/kv/kvclient/rangefeed:rangefeed", "//pkg/kv/kvclient/rangefeed:rangefeed_test", "//pkg/kv/kvclient/rangestats:rangestats", + "//pkg/kv/kvclient/spanstats/spanstatsaccessor:spanstatsaccessor", + "//pkg/kv/kvclient/spanstats:spanstats", "//pkg/kv/kvclient:kvclient", "//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil", "//pkg/kv/kvnemesis:kvnemesis", @@ -2581,6 +2583,8 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:get_x_data", "//pkg/kv/kvclient/rangefeed/rangefeedcache:get_x_data", "//pkg/kv/kvclient/rangestats:get_x_data", + "//pkg/kv/kvclient/spanstats:get_x_data", + "//pkg/kv/kvclient/spanstats/spanstatsaccessor:get_x_data", "//pkg/kv/kvnemesis:get_x_data", "//pkg/kv/kvnemesis/kvnemesisutil:get_x_data", "//pkg/kv/kvprober:get_x_data", diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index 773926dcc4f7..7db076979120 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -648,6 +648,27 @@ func (c *Connector) SpanConfigConformance( return report, nil } +// SpanStats implements the spanstats.Accessor Interface. +func (c *Connector) SpanStats( + ctx context.Context, startKey, + endKey roachpb.Key, nodeID roachpb.NodeID, +) (*serverpb.InternalSpanStatsResponse, error) { + var response *serverpb.InternalSpanStatsResponse + err := c.withClient(ctx, func(ctx context.Context, c *client) error { + stats, err := c.SpanStats(ctx, &serverpb.SpanStatsRequest{ + NodeID: nodeID, + StartKey: roachpb.RKey(startKey), + EndKey: roachpb.RKey(endKey), + }) + if err != nil { + return err + } + response = (*serverpb.InternalSpanStatsResponse)(stats) + return nil + }) + return response, err +} + // GetAllSystemSpanConfigsThatApply implements the spanconfig.KVAccessor // interface. func (c *Connector) GetAllSystemSpanConfigsThatApply( diff --git a/pkg/kv/kvclient/kvtenant/BUILD.bazel b/pkg/kv/kvclient/kvtenant/BUILD.bazel index addedfb971f4..e4d2acdca5d5 100644 --- a/pkg/kv/kvclient/kvtenant/BUILD.bazel +++ b/pkg/kv/kvclient/kvtenant/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/keys", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/spanstats", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/kv/kvclient/kvtenant/connector.go b/pkg/kv/kvclient/kvtenant/connector.go index ca310ad5d765..f984d94f157b 100644 --- a/pkg/kv/kvclient/kvtenant/connector.go +++ b/pkg/kv/kvclient/kvtenant/connector.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -91,6 +92,9 @@ type Connector interface { // applicable to secondary tenants. spanconfig.KVAccessor + // Accessor provides access to span stats. + spanstats.Accessor + // Reporter provides access to conformance reports, i.e. whether ranges // backing queried keyspans conform the span configs that apply to them. spanconfig.Reporter diff --git a/pkg/kv/kvclient/spanstats/BUILD.bazel b/pkg/kv/kvclient/spanstats/BUILD.bazel new file mode 100644 index 000000000000..9406ce1587d7 --- /dev/null +++ b/pkg/kv/kvclient/spanstats/BUILD.bazel @@ -0,0 +1,15 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "spanstats", + srcs = ["span_stats.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/server/serverpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvclient/spanstats/span_stats.go b/pkg/kv/kvclient/spanstats/span_stats.go new file mode 100644 index 000000000000..943d289cd6bc --- /dev/null +++ b/pkg/kv/kvclient/spanstats/span_stats.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanstats + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" +) + +// Accessor mediates access to SpanStats by consumers across the KV/Tenant +// boundary. +type Accessor interface { + // SpanStats returns the span stats between startKey and endKey. + // The caller can request span stats from across the cluster by setting + // nodeId to 0. The caller can request span stats from a specific node by + // setting the value of nodeID accordingly. + SpanStats( + ctx context.Context, + startKey, + endKey roachpb.Key, + nodeID roachpb.NodeID, + ) (*serverpb.InternalSpanStatsResponse, error) +} diff --git a/pkg/kv/kvclient/spanstats/spanstatsaccessor/BUILD.bazel b/pkg/kv/kvclient/spanstats/spanstatsaccessor/BUILD.bazel new file mode 100644 index 000000000000..2c9722affeb6 --- /dev/null +++ b/pkg/kv/kvclient/spanstats/spanstatsaccessor/BUILD.bazel @@ -0,0 +1,15 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "spanstatsaccessor", + srcs = ["accessor.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats/spanstatsaccessor", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/server/serverpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvclient/spanstats/spanstatsaccessor/accessor.go b/pkg/kv/kvclient/spanstats/spanstatsaccessor/accessor.go new file mode 100644 index 000000000000..b10c7565e9b5 --- /dev/null +++ b/pkg/kv/kvclient/spanstats/spanstatsaccessor/accessor.go @@ -0,0 +1,51 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanstatsaccessor + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" +) + +// LocalAccessor is an implementation of spanstats.Accessor that is meant +// to provide access to span stats to servers that are co-located on the KV +// node. +type LocalAccessor struct { + spanStatsServer serverpb.SpanStatsServer +} + +// New returns a new instance of AccessorImpl. +func New(server serverpb.SpanStatsServer) *LocalAccessor { + return &LocalAccessor{spanStatsServer: server} +} + +// SpanStats implements the spanstats.Accessor interface. +func (a *LocalAccessor) SpanStats( + ctx context.Context, startKey, + endKey roachpb.Key, nodeID roachpb.NodeID, +) (*serverpb.InternalSpanStatsResponse, error) { + res, err := a.spanStatsServer.GetSpanStats(ctx, + &serverpb.InternalSpanStatsRequest{ + Span: roachpb.Span{ + Key: startKey, + EndKey: endKey, + }, + NodeID: nodeID, + }) + + if err != nil { + return nil, err + } + + return res, nil +} diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 90f59c72125a..76199567d511 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -248,7 +248,6 @@ go_test( "client_spanconfigs_test.go", "client_split_burst_test.go", "client_split_test.go", - "client_status_test.go", "client_store_test.go", "client_tenant_test.go", "client_test.go", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 06dd59d52255..69781dff23da 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -63,7 +63,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" @@ -3138,33 +3137,6 @@ func mapToHotReplicasInfo(repls []CandidateReplica) []HotReplicaInfo { return hotRepls } -// StoreKeySpanStats carries the result of a stats computation over a key range. -type StoreKeySpanStats struct { - ReplicaCount int - MVCC enginepb.MVCCStats - ApproximateDiskBytes uint64 -} - -// ComputeStatsForKeySpan computes the aggregated MVCCStats for all replicas on -// this store which contain any keys in the supplied range. -func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeySpanStats, error) { - var result StoreKeySpanStats - - newStoreReplicaVisitor(s).UndefinedOrder().Visit(func(repl *Replica) bool { - desc := repl.Desc() - if bytes.Compare(startKey, desc.EndKey) >= 0 || bytes.Compare(desc.StartKey, endKey) >= 0 { - return true // continue - } - result.MVCC.Add(repl.GetMVCCStats()) - result.ReplicaCount++ - return true - }) - - var err error - result.ApproximateDiskBytes, err = s.engine.ApproximateDiskBytes(startKey.AsRawKey(), endKey.AsRawKey()) - return result, err -} - // ReplicateQueueDryRun runs the given replica through the replicate queue // (using the allocator) without actually carrying out any changes, returning // all trace messages collected along the way. diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index abc74495e3f5..68d39736591c 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -115,6 +115,9 @@ func (a tenantAuthorizer) authorize( case "/cockroach.server.serverpb.Status/TransactionContentionEvents": return a.authTenant(tenID) + case "/cockroach.server.serverpb.Status/SpanStats": + return a.authTenant(tenID) + case "/cockroach.roachpb.Internal/GetSpanConfigs": return a.authGetSpanConfigs(tenID, req.(*roachpb.GetSpanConfigsRequest)) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 889536b47f86..60355a463ffe 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "init.go", "init_handshake.go", "initial_sql.go", + "key_visualizer_server.go", "listen_and_update_addrs.go", "load_endpoint.go", "loss_of_quorum.go", @@ -113,6 +114,8 @@ go_library( "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangestats", + "//pkg/kv/kvclient/spanstats", + "//pkg/kv/kvclient/spanstats/spanstatsaccessor", "//pkg/kv/kvprober", "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/allocatorimpl", @@ -410,6 +413,7 @@ go_test( "server_test.go", "settings_cache_test.go", "settings_test.go", + "span_stats_test.go", "statements_test.go", "stats_test.go", "status_ext_test.go", diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 44736c039a43..7808fc755e73 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1360,7 +1360,7 @@ func (s *adminServer) statsForSpan( req := serverpb.SpanStatsRequest{ StartKey: rSpan.Key, EndKey: rSpan.EndKey, - NodeID: nodeID.String(), + NodeID: nodeID, } spanResponse, err = client.SpanStats(ctx, &req) } diff --git a/pkg/server/key_visualizer_server.go b/pkg/server/key_visualizer_server.go new file mode 100644 index 000000000000..7e8b0144cb8d --- /dev/null +++ b/pkg/server/key_visualizer_server.go @@ -0,0 +1,226 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "sort" + "strings" + "time" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvispb" + "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvissettings" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// KeyVisualizerServer is a concrete implementation of the keyvispb.KeyVisualizerServer interface. +type KeyVisualizerServer struct { + ie *sql.InternalExecutor + settings *cluster.Settings + nodeDialer *nodedialer.Dialer + status *systemStatusServer + node *Node +} + +var _ keyvispb.KeyVisualizerServer = &KeyVisualizerServer{} + +func (s *KeyVisualizerServer) saveBoundaries( + ctx context.Context, req *keyvispb.UpdateBoundariesRequest, +) error { + encoded, err := protoutil.Marshal(req) + + if err != nil { + return err + } + // Nodes are notified about boundary changes via the keyvissubscriber.BoundarySubscriber. + _, err = s.ie.ExecEx( + ctx, + "upsert tenant boundaries", + nil, + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + `UPSERT INTO system.span_stats_tenant_boundaries( + tenant_id, + boundaries + ) VALUES ($1, $2) + `, + roachpb.SystemTenantID.ToUint64(), + encoded, + ) + + return err +} + +func (s *KeyVisualizerServer) getSamplesFromFanOut( + ctx context.Context, timestamp time.Time, +) (*keyvispb.GetSamplesResponse, error) { + + samplePeriod := keyvissettings.SampleInterval.Get(&s.settings.SV) + + dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { + conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) + return keyvispb.NewKeyVisualizerClient(conn), err + } + + nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { + samples, err := client.(keyvispb.KeyVisualizerClient).GetSamples(ctx, + &keyvispb.GetSamplesRequest{ + NodeID: nodeID, + CollectedOnOrAfter: timestamp, + }) + if err != nil { + return nil, err + } + return samples, err + } + + globalSamples := make(map[int64][]keyvispb.Sample) + + responseFn := func(nodeID roachpb.NodeID, resp interface{}) { + nodeResponse := resp.(*keyvispb.GetSamplesResponse) + + // Collection is spread across each node, so samples that belong to the + // same sample period should be aggregated together. + for _, sampleFragment := range nodeResponse.Samples { + tNanos := sampleFragment.SampleTime.Truncate(samplePeriod).UnixNano() + globalSamples[tNanos] = append(globalSamples[tNanos], sampleFragment) + } + } + + errorFn := func(nodeID roachpb.NodeID, err error) { + log.Errorf(ctx, "could not get key visualizer sample for node %d: %v", + nodeID, err) + } + + err := s.status.iterateNodes(ctx, + "iterating nodes for key visualizer samples", dialFn, nodeFn, + responseFn, errorFn) + if err != nil { + return nil, err + } + + var samples []keyvispb.Sample + for sampleTimeNanos, sampleFragments := range globalSamples { + if !verifySampleBoundariesEqual(sampleFragments) { + log.Warningf(ctx, "key visualizer sample boundaries differ between nodes") + } + samples = append(samples, keyvispb.Sample{ + SampleTime: timeutil.Unix(0, sampleTimeNanos), + SpanStats: cumulativeStats(sampleFragments), + }) + } + + return &keyvispb.GetSamplesResponse{Samples: samples}, nil +} + +// verifySampleBoundariesEqual returns true if all the samples collected +// from across the cluster belonging to the same sample period have identical +// spans. +func verifySampleBoundariesEqual(fragments []keyvispb.Sample) bool { + f0 := fragments[0] + sort.Slice(f0.SpanStats, func(a, b int) bool { + return f0.SpanStats[a].Span.Key.Compare(f0.SpanStats[b].Span.Key) == -1 + }) + + for i := 1; i < len(fragments); i++ { + fi := fragments[i] + + if len(f0.SpanStats) != len(fi.SpanStats) { + return false + } + + sort.Slice(fi.SpanStats, func(a, b int) bool { + return fi.SpanStats[a].Span.Key.Compare(fi.SpanStats[b].Span.Key) == -1 + }) + + for b, bucket := range f0.SpanStats { + if !bucket.Span.Equal(fi.SpanStats[b].Span) { + return false + } + } + } + + return true +} + +// unsafeBytesToString constructs a string from a byte slice. It is +// critical that the byte slice not be modified. +func unsafeBytesToString(data []byte) string { + return *(*string)(unsafe.Pointer(&data)) +} + +// cumulativeStats uniques and accumulates all of a sample's +// keyvispb.SpanStats from across the cluster. Stores collect statistics for +// the same spans, and the caller wants the cumulative statistics for those spans. +func cumulativeStats(fragments []keyvispb.Sample) []keyvispb.SpanStats { + var stats []keyvispb.SpanStats + for _, sampleFragment := range fragments { + stats = append(stats, sampleFragment.SpanStats...) + } + + unique := make(map[string]keyvispb.SpanStats) + for _, stat := range stats { + + var sb strings.Builder + sb.WriteString(unsafeBytesToString(stat.Span.Key)) + sb.WriteString(unsafeBytesToString(stat.Span.EndKey)) + spanAsString := sb.String() + + if uniqueStat, ok := unique[spanAsString]; ok { + uniqueStat.Requests += stat.Requests + } else { + unique[spanAsString] = keyvispb.SpanStats{ + Span: stat.Span, + Requests: stat.Requests, + } + } + } + + ret := make([]keyvispb.SpanStats, 0, len(unique)) + for _, stat := range unique { + ret = append(ret, stat) + } + return ret +} + +// GetSamples implements the keyvispb.KeyVisualizerServer interface. +func (s *KeyVisualizerServer) GetSamples( + ctx context.Context, req *keyvispb.GetSamplesRequest, +) (*keyvispb.GetSamplesResponse, error) { + + if req.NodeID == 0 { + return s.getSamplesFromFanOut(ctx, req.CollectedOnOrAfter) + } + + samples := s.node.spanStatsCollector.GetSamples( + req.CollectedOnOrAfter) + + return &keyvispb.GetSamplesResponse{Samples: samples}, nil +} + +// UpdateBoundaries implements the keyvispb.KeyVisualizerServer interface. +func (s *KeyVisualizerServer) UpdateBoundaries( + ctx context.Context, req *keyvispb.UpdateBoundariesRequest, +) (*keyvispb.UpdateBoundariesResponse, error) { + if err := s.saveBoundaries(ctx, req); err != nil { + return nil, err + } + return &keyvispb.UpdateBoundariesResponse{}, nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 8db279869848..ec42ab795f1b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -34,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats/spanstatsaccessor" "github.com/cockroachdb/cockroach/pkg/kv/kvprober" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" @@ -150,8 +152,8 @@ type Server struct { tsDB *ts.DB tsServer *ts.Server - // spanStatsServer implements `keyvispb.KeyVisualizerServer` - spanStatsServer *SpanStatsServer + // keyVisualizerServer implements `keyvispb.KeyVisualizerServer` + keyVisualizerServer *KeyVisualizerServer // The Observability Server, used by the Observability Service to subscribe to // CRDB data. @@ -170,6 +172,9 @@ type Server struct { spanConfigSubscriber spanconfig.KVSubscriber spanConfigReporter spanconfig.Reporter + // spanStatsServer services internal requests for span stats. + spanStatsServer *spanStatsServer + // pgL is the SQL listener. pgL net.Listener @@ -852,6 +857,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ambientCtx: cfg.AmbientCtx, } + // Instantiate the span stats server. There is a circular dependency + // between server.spanStatsServer and server.systemStatusServer. + spanStats := &spanStatsServer{ + fetcher: rangestats.NewFetcher(db), + distSender: distSender, + statusServer: nil, // Circular dependency. Set below. + node: node, + } + spanStatsLocalAccessor := spanstatsaccessor.New(spanStats) + // Instantiate the status API server. sStatus := newSystemStatusServer( cfg.AmbientCtx, @@ -873,18 +888,23 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { serverIterator, spanConfig.reporter, clock, + distSender, + spanStatsLocalAccessor, ) - spanStatsServer := &SpanStatsServer{ + // The spanStatsServer needs a reference to the status server. + spanStats.statusServer = sStatus + + keyVisualizerServer := &KeyVisualizerServer{ ie: internalExecutor, settings: st, nodeDialer: nodeDialer, status: sStatus, node: node, } - spanStatsAccessor := spanstatskvaccessor.New(spanStatsServer) + keyVisServerAccessor := spanstatskvaccessor.New(keyVisualizerServer) - // Instantiate the KV prober. + // Instantiate the KV prober kvProber := kvprober.NewProber(kvprober.Opts{ Tracer: cfg.AmbientCtx.Tracer, DB: db, @@ -953,7 +973,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { nodeDescs: g, systemConfigWatcher: systemConfigWatcher, spanConfigAccessor: spanConfig.kvAccessor, - spanStatsAccessor: spanStatsAccessor, + keyVisServerAccessor: keyVisServerAccessor, + spanStatsAccessor: spanStatsLocalAccessor, nodeDialer: nodeDialer, distSender: distSender, db: db, @@ -1127,7 +1148,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { externalStorageBuilder: externalStorageBuilder, storeGrantCoords: gcoords.Stores, kvMemoryMonitor: kvMemoryMonitor, - spanStatsServer: spanStatsServer, + keyVisualizerServer: keyVisualizerServer, + spanStatsServer: spanStats, } return lateBoundServer, err @@ -1372,7 +1394,10 @@ func (s *Server) PreStart(ctx context.Context) error { obspb.RegisterObsServer(s.grpc.Server, s.eventsServer) // Register the KeyVisualizer Server - keyvispb.RegisterKeyVisualizerServer(s.grpc.Server, s.spanStatsServer) + keyvispb.RegisterKeyVisualizerServer(s.grpc.Server, s.keyVisualizerServer) + + // Register the SpanStats Server + serverpb.RegisterSpanStatsServer(s.grpc.Server, s.spanStatsServer) // Start the RPC server. This opens the RPC/SQL listen socket, // and dispatches the server worker for the RPC. diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 3faa257033d1..f50b53ef07b9 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" @@ -278,7 +279,10 @@ type sqlServerArgs struct { spanConfigAccessor spanconfig.KVAccessor // Used by the Key Visualizer job. - spanStatsAccessor *spanstatskvaccessor.SpanStatsKVAccessor + keyVisServerAccessor *spanstatskvaccessor.SpanStatsKVAccessor + + // spanStatsAccessor provides access to span stats from KV. + spanStatsAccessor spanstats.Accessor // Used by DistSQLPlanner to dial KV nodes. nodeDialer *nodedialer.Dialer @@ -975,6 +979,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { RangeStatsFetcher: rangeStatsFetcher, EventsExporter: cfg.eventsServer, NodeDescs: cfg.nodeDescs, + SpanStatsAccessor: cfg.spanStatsAccessor, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { @@ -1237,7 +1242,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if codec.ForSystemTenant() { ri := kvcoord.MakeRangeIterator(cfg.distSender) spanStatsConsumer := spanstatsconsumer.New( - cfg.spanStatsAccessor, + cfg.keyVisServerAccessor, &ri, cfg.Settings, cfg.circularInternalExecutor, diff --git a/pkg/server/serverpb/BUILD.bazel b/pkg/server/serverpb/BUILD.bazel index 460f1d909c47..e447496c0149 100644 --- a/pkg/server/serverpb/BUILD.bazel +++ b/pkg/server/serverpb/BUILD.bazel @@ -13,6 +13,7 @@ proto_library( "index_recommendations.proto", "init.proto", "migration.proto", + "span_stats.proto", "status.proto", ], strip_import_prefix = "/pkg", diff --git a/pkg/server/serverpb/span_stats.proto b/pkg/server/serverpb/span_stats.proto new file mode 100644 index 000000000000..1e2e2460365d --- /dev/null +++ b/pkg/server/serverpb/span_stats.proto @@ -0,0 +1,43 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + + +syntax = "proto3"; +package cockroach.server.serverpb; +option go_package = "serverpb"; + +import "roachpb/data.proto"; +import "storage/enginepb/mvcc.proto"; +import "gogoproto/gogo.proto"; +import "google/api/annotations.proto"; + +// InternalSpanStatsRequest +message InternalSpanStatsRequest { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + + // A node_id of `0` indicates the server should issue a fan-out to all nodes. + int32 node_id = 2 [ + (gogoproto.customname) = "NodeID", + (gogoproto.casttype) = + "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" + ]; +} + + +message InternalSpanStatsResponse { + int32 range_count = 2; + uint64 approximate_disk_bytes = 3; + cockroach.storage.enginepb.MVCCStats total_stats = 1 + [ (gogoproto.nullable) = false ]; +} + +service SpanStats { + rpc GetSpanStats(InternalSpanStatsRequest) returns (InternalSpanStatsResponse) {} +} diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index fb59daef508b..7d733c3e7f3e 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -1184,7 +1184,11 @@ message ListDistSQLFlowsResponse { } message SpanStatsRequest { - string node_id = 1 [ (gogoproto.customname) = "NodeID" ]; + int32 node_id = 1 [ + (gogoproto.customname) = "NodeID", + (gogoproto.casttype) = + "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" + ]; bytes start_key = 2 [ (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RKey" ]; diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 44c2b9a51b31..c61a2f516d27 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -1,4 +1,4 @@ -// Copyright 2022 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -12,214 +12,230 @@ package server import ( "context" - "sort" - "strings" - "time" - "unsafe" - "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvispb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) -// SpanStatsServer is a concrete implementation of the keyvispb.KeyVisualizerServer interface. -type SpanStatsServer struct { - ie *sql.InternalExecutor - settings *cluster.Settings - nodeDialer *nodedialer.Dialer - status *systemStatusServer - node *Node +// spanStatsServer implements serverpb.SpanStatsServer. +// It services requests for serverpb.InternalSpanStatsRequest. +type spanStatsServer struct { + fetcher *rangestats.Fetcher + distSender *kvcoord.DistSender + statusServer *systemStatusServer + nodeDialer *nodedialer.Dialer + node *Node } -var _ keyvispb.KeyVisualizerServer = &SpanStatsServer{} +var _ serverpb.SpanStatsServer = &spanStatsServer{} -func (s *SpanStatsServer) saveBoundaries( - ctx context.Context, req *keyvispb.UpdateBoundariesRequest, -) error { +func (s *spanStatsServer) dialFn(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { + conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) + return serverpb.NewSpanStatsClient(conn), err +} - encoded, err := protoutil.Marshal(req) +func (s *spanStatsServer) fanOut( + ctx context.Context, req *serverpb.InternalSpanStatsRequest, +) (*serverpb.InternalSpanStatsResponse, error) { + var res *serverpb.InternalSpanStatsResponse + rSpan, err := keys.SpanAddr(req.Span) if err != nil { - return err + return nil, err + } + nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, s.distSender, rSpan) + if err != nil { + return nil, err + } + nodesWithReplica := make(map[roachpb.NodeID]bool) + for _, nodeID := range nodeIDs { + nodesWithReplica[nodeID] = true } - // Nodes are notified about boundary changes via the keyvissubscriber.BoundarySubscriber. - _, err = s.ie.ExecEx( - ctx, - "upsert tenant boundaries", - nil, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - `UPSERT INTO system.span_stats_tenant_boundaries( - tenant_id, - boundaries - ) VALUES ($1, $2) - `, - roachpb.SystemTenantID.ToUint64(), - encoded, - ) - - return err -} - -func (s *SpanStatsServer) getSamplesFromFanOut( - ctx context.Context, timestamp time.Time, -) (*keyvispb.GetSamplesResponse, error) { - - dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { - conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) - return keyvispb.NewKeyVisualizerClient(conn), err + // We should only fan out to a node if it has replicas for this span. + // A blind fan out would be wasteful. + smartDial := func( + ctx context.Context, + nodeID roachpb.NodeID, + ) (interface{}, error) { + if _, ok := nodesWithReplica[nodeID]; ok { + return s.dialFn(ctx, nodeID) + } + return nil, nil } nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { - samples, err := client.(keyvispb.KeyVisualizerClient).GetSamples(ctx, - &keyvispb.GetSamplesRequest{ - NodeID: nodeID, - CollectedOnOrAfter: timestamp, + // `smartDial` may skip this node, so check to see if the client is nil. + if client == nil { + return &serverpb.InternalSpanStatsResponse{}, nil + } + stats, err := client.(serverpb.SpanStatsClient).GetSpanStats(ctx, + &serverpb.InternalSpanStatsRequest{ + Span: req.Span, + NodeID: nodeID, }) if err != nil { return nil, err } - return samples, err + return stats, err } - globalSamples := make(map[int64][]keyvispb.Sample) - responseFn := func(nodeID roachpb.NodeID, resp interface{}) { - nodeResponse := resp.(*keyvispb.GetSamplesResponse) - - // Collection is spread across each node, so samples that belong to the - // same sample period should be aggregated together. The collectors - // guarantee that corresponding sample fragments have the same sample time. - for _, sampleFragment := range nodeResponse.Samples { - sampleTime := sampleFragment.SampleTime.UnixNano() - globalSamples[sampleTime] = append(globalSamples[sampleTime], sampleFragment) - } + nodeResponse := resp.(*serverpb.InternalSpanStatsResponse) + res.ApproximateDiskBytes += nodeResponse.ApproximateDiskBytes + res.TotalStats.Add(nodeResponse.TotalStats) + res.RangeCount += nodeResponse.RangeCount } errorFn := func(nodeID roachpb.NodeID, err error) { log.Errorf(ctx, "could not get span stats sample for node %d: %v", nodeID, err) } - err := s.status.iterateNodes(ctx, "iterating nodes for span stats", dialFn, nodeFn, responseFn, errorFn) - - if err != nil { + if err := s.statusServer.iterateNodes( + ctx, + "iterating nodes for span stats", + smartDial, + nodeFn, + responseFn, + errorFn, + ); err != nil { return nil, err } - var samples []keyvispb.Sample - for sampleTimeNanos, sampleFragments := range globalSamples { - if !verifySampleBoundariesEqual(sampleFragments) { - log.Warningf(ctx, "key visualizer sample boundaries differ between nodes") - } - samples = append(samples, keyvispb.Sample{ - SampleTime: timeutil.Unix(0, sampleTimeNanos), - SpanStats: cumulativeStats(sampleFragments), - }) - } - - return &keyvispb.GetSamplesResponse{Samples: samples}, nil + return res, nil } -// verifySampleBoundariesEqual returns true if all the samples collected -// from across the cluster belonging to the same sample period have identical -// spans. -func verifySampleBoundariesEqual(fragments []keyvispb.Sample) bool { - - f0 := fragments[0] - sort.Slice(f0.SpanStats, func(a, b int) bool { - return f0.SpanStats[a].Span.Key.Compare(f0.SpanStats[b].Span.Key) == -1 - }) +func (s *spanStatsServer) getLocalStats( + ctx context.Context, req *serverpb.InternalSpanStatsRequest, +) (*serverpb.InternalSpanStatsResponse, error) { + res := &serverpb.InternalSpanStatsResponse{} - for i := 1; i < len(fragments); i++ { - fi := fragments[i] + sp, err := keys.SpanAddr(req.Span) + if err != nil { + return nil, err + } + ri := kvcoord.MakeRangeIterator(s.distSender) + ri.Seek(ctx, sp.Key, kvcoord.Ascending) - if len(f0.SpanStats) != len(fi.SpanStats) { - return false + for { + if !ri.Valid() { + return nil, ri.Error() } - sort.Slice(fi.SpanStats, func(a, b int) bool { - return fi.SpanStats[a].Span.Key.Compare(fi.SpanStats[b].Span.Key) == -1 - }) - - for b, bucket := range f0.SpanStats { - if !bucket.Span.Equal(fi.SpanStats[b].Span) { - return false + desc := ri.Desc() + descSpan := desc.RSpan() + res.RangeCount += 1 + + // Is the descriptor fully contained by the request span? + if sp.ContainsKeyRange(descSpan.Key, desc.EndKey) { + // If so, obtain stats for this range via RangeStats. + rangeStats, err := s.fetcher.RangeStats(ctx, + desc.StartKey.AsRawKey()) + if err != nil { + return nil, err + } + for _, resp := range rangeStats { + res.TotalStats.Add(resp.MVCCStats) } - } - } - return true -} + } else { + // Otherwise, do an MVCC Scan. + // We should only scan the part of the range that our request span + // encompasses. + scanStart := sp.Key + scanEnd := sp.EndKey + // If our request span began before the start of this range, + // start scanning from this range's start key. + if descSpan.Key.Compare(sp.Key) == 1 { + scanStart = descSpan.Key + } + // If our request span ends after the end of this range, + // stop scanning at this range's end key. + if descSpan.EndKey.Compare(sp.EndKey) == -1 { + scanEnd = descSpan.EndKey + } + err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + stats, err := storage.ComputeStats( + s.Engine(), + scanStart.AsRawKey(), + scanEnd.AsRawKey(), + timeutil.Now().UnixNano(), + ) + + if err != nil { + return err + } + + res.TotalStats.Add(stats) + return nil + }) -// unsafeBytesToString constructs a string from a byte slice. It is -// critical that the byte slice not be modified. -func unsafeBytesToString(data []byte) string { - return *(*string)(unsafe.Pointer(&data)) -} + if err != nil { + return nil, err + } + } -// cumulativeStats uniques and accumulates all of a sample's -// keyvispb.SpanStats from across the cluster. Stores collect statistics for -// the same spans, and the caller wants the cumulative statistics for those spans. -func cumulativeStats(fragments []keyvispb.Sample) []keyvispb.SpanStats { - var stats []keyvispb.SpanStats - for _, sampleFragment := range fragments { - stats = append(stats, sampleFragment.SpanStats...) + if !ri.NeedAnother(sp) { + break + } + ri.Next(ctx) } - unique := make(map[string]keyvispb.SpanStats) - for _, stat := range stats { - - var sb strings.Builder - sb.WriteString(unsafeBytesToString(stat.Span.Key)) - sb.WriteString(unsafeBytesToString(stat.Span.EndKey)) - spanAsString := sb.String() - - if uniqueStat, ok := unique[spanAsString]; ok { - uniqueStat.Requests += stat.Requests - } else { - unique[spanAsString] = keyvispb.SpanStats{ - Span: stat.Span, - Requests: stat.Requests, - } + // Finally, get the approximate disk bytes from each store. + err = s.node.stores.VisitStores(func(store *kvserver.Store) error { + approxDiskBytes, err := store.Engine().ApproximateDiskBytes( + req.Span.Key, req.Span.EndKey) + if err != nil { + return err } - } + res.ApproximateDiskBytes += approxDiskBytes + return nil + }) - ret := make([]keyvispb.SpanStats, 0, len(unique)) - for _, stat := range unique { - ret = append(ret, stat) + if err != nil { + return nil, err } - return ret -} -// GetSamples implements the keyvispb.KeyVisualizerServer interface. -func (s *SpanStatsServer) GetSamples( - ctx context.Context, req *keyvispb.GetSamplesRequest, -) (*keyvispb.GetSamplesResponse, error) { + return res, nil +} +// GetSpanStats will return span stats according to the nodeID specified by req. +// If req.NodeID == 0, a fan out is done to collect and sum span stats from +// across the cluster. Otherwise, the node specified will be dialed, +// if the local node isn't already the node specified. If the node specified +// is the local node, span stats are computed and returned. +func (s *spanStatsServer) GetSpanStats( + ctx context.Context, req *serverpb.InternalSpanStatsRequest, +) (*serverpb.InternalSpanStatsResponse, error) { + // Perform a fan out when the requested NodeID is 0. if req.NodeID == 0 { - return s.getSamplesFromFanOut(ctx, req.CollectedOnOrAfter) + return s.fanOut(ctx, req) } - samples := s.node.spanStatsCollector.GetSamples( - req.CollectedOnOrAfter) + // See if the requested node is the local node. + _, local, err := s.statusServer.parseNodeID(req.NodeID.String()) + if err != nil { + return nil, err + } - return &keyvispb.GetSamplesResponse{Samples: samples}, nil -} + // If the requested node is the local node, return stats. + if local { + return s.getLocalStats(ctx, req) + } -// UpdateBoundaries implements the keyvispb.KeyVisualizerServer interface. -func (s *SpanStatsServer) UpdateBoundaries( - ctx context.Context, req *keyvispb.UpdateBoundariesRequest, -) (*keyvispb.UpdateBoundariesResponse, error) { - if err := s.saveBoundaries(ctx, req); err != nil { + // Otherwise, dial the correct node, and ask for span stats. + client, err := s.dialFn(ctx, req.NodeID) + if err != nil { return nil, err } - return &keyvispb.UpdateBoundariesResponse{}, nil + return client.(serverpb.SpanStatsClient).GetSpanStats(ctx, req) } diff --git a/pkg/kv/kvserver/client_status_test.go b/pkg/server/span_stats_test.go similarity index 80% rename from pkg/kv/kvserver/client_status_test.go rename to pkg/server/span_stats_test.go index 69eb92152501..95f4e4403c5d 100644 --- a/pkg/kv/kvserver/client_status_test.go +++ b/pkg/server/span_stats_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvserver_test +package server import ( "context" @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestComputeStatsForKeySpan(t *testing.T) { +func TestLocalSpanStats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -36,7 +36,7 @@ func TestComputeStatsForKeySpan(t *testing.T) { Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true}, }, }) - s := serv.(*server.TestServer) + s := serv.(*TestServer) defer s.Stopper().Stop(ctx) store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) @@ -68,24 +68,34 @@ func TestComputeStatsForKeySpan(t *testing.T) { for _, tcase := range []struct { startKey string endKey string - expectedRanges int + expectedRanges int32 expectedKeys int64 }{ {"a", "i", 4, 6}, {"a", "c", 1, 3}, {"b", "e", 2, 5}, {"e", "i", 2, 1}, + {"b", "d", 2, 3}, + {"b", "bbb", 1, 2}, } { start, end := tcase.startKey, tcase.endKey - result, err := store.ComputeStatsForKeySpan( - roachpb.RKey(start), roachpb.RKey(end)) + result, err := s.spanStatsServer.getLocalStats(ctx, + &serverpb.InternalSpanStatsRequest{ + Span: roachpb.Span{ + Key: roachpb.Key(start), + EndKey: roachpb.Key(end), + }, + NodeID: 0, + }, + ) if err != nil { t.Fatal(err) } - if a, e := result.ReplicaCount, tcase.expectedRanges; a != e { + + if a, e := result.RangeCount, tcase.expectedRanges; a != e { t.Errorf("Expected %d ranges in span [%s - %s], found %d", e, start, end, a) } - if a, e := result.MVCC.LiveCount, tcase.expectedKeys; a != e { + if a, e := result.TotalStats.LiveCount, tcase.expectedKeys; a != e { t.Errorf("Expected %d keys in span [%s - %s], found %d", e, start, end, a) } } diff --git a/pkg/server/status.go b/pkg/server/status.go index 54aade2dee81..1c39b5d19af5 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -39,6 +39,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvisstorage" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -463,6 +465,7 @@ type statusServer struct { si systemInfoOnce stmtDiagnosticsRequester StmtDiagnosticsRequester internalExecutor *sql.InternalExecutor + spanStatsAccessor spanstats.Accessor // cancelSemaphore is a semaphore that limits the number of // concurrent calls to the pgwire query cancellation endpoint. This @@ -554,6 +557,7 @@ func newStatusServer( internalExecutor *sql.InternalExecutor, serverIterator ServerIterator, clock *hlc.Clock, + spanStatsAccessor spanstats.Accessor, ) *statusServer { ambient.AddLogTag("status", nil) if !rpcCtx.TenantID.IsSystem() { @@ -573,10 +577,11 @@ func newStatusServer( serverIterator: serverIterator, clock: clock, }, - cfg: cfg, - db: db, - metricSource: metricSource, - internalExecutor: internalExecutor, + cfg: cfg, + db: db, + metricSource: metricSource, + internalExecutor: internalExecutor, + spanStatsAccessor: spanStatsAccessor, // See the docstring on cancelSemaphore for details about this initialization. cancelSemaphore: quotapool.NewIntPool("pgwire-cancel", 256), @@ -606,6 +611,8 @@ func newSystemStatusServer( serverIterator ServerIterator, spanConfigReporter spanconfig.Reporter, clock *hlc.Clock, + distSender *kvcoord.DistSender, + spanStatsAccessor spanstats.Accessor, ) *systemStatusServer { server := newStatusServer( ambient, @@ -622,6 +629,7 @@ func newSystemStatusServer( internalExecutor, serverIterator, clock, + spanStatsAccessor, ) return &systemStatusServer{ @@ -3385,7 +3393,7 @@ func (s *statusServer) ListExecutionInsights( // SpanStats requests the total statistics stored on a node for a given key // span, which may include multiple ranges. -func (s *systemStatusServer) SpanStats( +func (s *statusServer) SpanStats( ctx context.Context, req *serverpb.SpanStatsRequest, ) (*serverpb.SpanStatsResponse, error) { ctx = forwardSQLIdentityThroughRPCCalls(ctx) @@ -3397,35 +3405,14 @@ func (s *systemStatusServer) SpanStats( return nil, err } - nodeID, local, err := s.parseNodeID(req.NodeID) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, err.Error()) - } - - if !local { - status, err := s.dialNode(ctx, nodeID) - if err != nil { - return nil, serverError(ctx, err) - } - return status.SpanStats(ctx, req) - } - - output := &serverpb.SpanStatsResponse{} - err = s.stores.VisitStores(func(store *kvserver.Store) error { - result, err := store.ComputeStatsForKeySpan(req.StartKey.Next(), req.EndKey) - if err != nil { - return err - } - output.TotalStats.Add(result.MVCC) - output.RangeCount += int32(result.ReplicaCount) - output.ApproximateDiskBytes += result.ApproximateDiskBytes - return nil - }) - if err != nil { - return nil, serverError(ctx, err) - } + res, err := s.spanStatsAccessor.SpanStats( + ctx, + roachpb.Key(req.StartKey), + roachpb.Key(req.EndKey), + req.NodeID, + ) - return output, nil + return (*serverpb.SpanStatsResponse)(res), err } // Diagnostics returns an anonymized diagnostics report. diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 6d6ad4ea9b5b..6e6125319b6c 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1366,7 +1366,7 @@ func TestSpanStatsResponse(t *testing.T) { var response serverpb.SpanStatsResponse request := serverpb.SpanStatsRequest{ - NodeID: "1", + NodeID: 1, StartKey: []byte(roachpb.RKeyMin), EndKey: []byte(roachpb.RKeyMax), } @@ -1395,7 +1395,7 @@ func TestSpanStatsGRPCResponse(t *testing.T) { defer rpcStopper.Stop(ctx) rpcContext := newRPCTestContext(ctx, ts, ts.RPCContext().Config) request := serverpb.SpanStatsRequest{ - NodeID: "1", + NodeID: 1, StartKey: []byte(roachpb.RKeyMin), EndKey: []byte(roachpb.RKeyMax), } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 749d8a2ffb7b..b9e6c73087be 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -237,6 +237,7 @@ func NewTenantServer( args.circularInternalExecutor, serverIterator, args.clock, + args.spanStatsAccessor, ) args.sqlStatusServer = sStatus @@ -1032,7 +1033,6 @@ func makeTenantSQLServerArgs( // TODO(irfansharif): hook up NewGrantCoordinatorSQL. var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil - return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil), @@ -1064,6 +1064,7 @@ func makeTenantSQLServerArgs( nodeDescs: tenantConnect, systemConfigWatcher: systemConfigWatcher, spanConfigAccessor: tenantConnect, + spanStatsAccessor: tenantConnect, nodeDialer: nodeDialer, distSender: ds, db: db, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index f56f0f75dc15..96bb3b657dfb 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -310,6 +310,7 @@ go_library( "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvclient/rangecache", "//pkg/kv/kvclient/rangefeed", + "//pkg/kv/kvclient/spanstats", "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/liveness/livenesspb", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 15f6f9776713..15cb49c78097 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/multitenant" @@ -1351,6 +1352,9 @@ type ExecutorConfig struct { // records. SpanConfigKVAccessor spanconfig.KVAccessor + // SpanStatsAccessor provides access to span stats from KV. + SpanStatsAccessor spanstats.Accessor + // InternalDB is used to create an isql.Executor bound with SessionData and // other ExtraTxnState. InternalDB *InternalDB