From 385d4ef4d5d1f2b7c8caccce584d6bf06353775e Mon Sep 17 00:00:00 2001 From: Zach Lite Date: Fri, 27 Jan 2023 18:27:56 -0500 Subject: [PATCH] This commit provides a re-implementation of fetching the MVCC stats and range count for a given span. The previous implementation relied on the assumption that 1 range can contain at most 1 table/index. Since GH-79700, this assumption is no longer true. This re-implementation has three components. The first is spanstats.Accessor, which provides an interface for accessing span stats from KV. The second is spanstatsaccessor.LocalAccessor, which provides an implementation of the interface for callers that are co-located on a KV node. The third is an implementation by kvtenantccl.Connector, which provides an implementation of the interface for non co-located callers, like SQL pods. Part of: https://cockroachlabs.atlassian.net/browse/CRDB-22711 Release note (backward-incompatible change): The SpanStatsRequest message field 'node_id' has changed from type 'string' to type 'int32'. --- docs/generated/http/full.md | 2 +- pkg/BUILD.bazel | 4 + pkg/ccl/kvccl/kvtenantccl/connector.go | 21 ++ pkg/kv/kvclient/kvtenant/BUILD.bazel | 1 + pkg/kv/kvclient/kvtenant/connector.go | 4 + pkg/kv/kvclient/spanstats/BUILD.bazel | 15 + pkg/kv/kvclient/spanstats/span_stats.go | 33 ++ .../spanstats/spanstatsaccessor/BUILD.bazel | 15 + .../spanstats/spanstatsaccessor/accessor.go | 51 +++ pkg/kv/kvserver/BUILD.bazel | 1 - pkg/kv/kvserver/store.go | 28 -- pkg/rpc/auth_tenant.go | 3 + pkg/server/BUILD.bazel | 4 + pkg/server/admin.go | 2 +- pkg/server/key_visualizer_server.go | 226 +++++++++++++ pkg/server/server.go | 41 ++- pkg/server/server_sql.go | 9 +- pkg/server/serverpb/BUILD.bazel | 1 + pkg/server/serverpb/span_stats.proto | 43 +++ pkg/server/serverpb/status.proto | 6 +- pkg/server/span_stats_server.go | 320 +++++++++--------- .../span_stats_test.go} | 30 +- pkg/server/status.go | 53 ++- pkg/server/status_test.go | 4 +- pkg/server/tenant.go | 3 +- pkg/sql/BUILD.bazel | 1 + pkg/sql/exec_util.go | 4 + 27 files changed, 685 insertions(+), 240 deletions(-) create mode 100644 pkg/kv/kvclient/spanstats/BUILD.bazel create mode 100644 pkg/kv/kvclient/spanstats/span_stats.go create mode 100644 pkg/kv/kvclient/spanstats/spanstatsaccessor/BUILD.bazel create mode 100644 pkg/kv/kvclient/spanstats/spanstatsaccessor/accessor.go create mode 100644 pkg/server/key_visualizer_server.go create mode 100644 pkg/server/serverpb/span_stats.proto rename pkg/{kv/kvserver/client_status_test.go => server/span_stats_test.go} (80%) diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 4e4eafa7525b..b52dad6a12bd 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2936,7 +2936,7 @@ Support status: [reserved](#support-status) | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | -| node_id | [string](#cockroach.server.serverpb.SpanStatsRequest-string) | | | [reserved](#support-status) | +| node_id | [int32](#cockroach.server.serverpb.SpanStatsRequest-int32) | | | [reserved](#support-status) | | start_key | [bytes](#cockroach.server.serverpb.SpanStatsRequest-bytes) | | | [reserved](#support-status) | | end_key | [bytes](#cockroach.server.serverpb.SpanStatsRequest-bytes) | | | [reserved](#support-status) | diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 861d35784cd7..bef5b1229301 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1159,6 +1159,8 @@ GO_TARGETS = [ "//pkg/kv/kvclient/rangefeed:rangefeed", "//pkg/kv/kvclient/rangefeed:rangefeed_test", "//pkg/kv/kvclient/rangestats:rangestats", + "//pkg/kv/kvclient/spanstats/spanstatsaccessor:spanstatsaccessor", + "//pkg/kv/kvclient/spanstats:spanstats", "//pkg/kv/kvclient:kvclient", "//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil", "//pkg/kv/kvnemesis:kvnemesis", @@ -2581,6 +2583,8 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:get_x_data", "//pkg/kv/kvclient/rangefeed/rangefeedcache:get_x_data", "//pkg/kv/kvclient/rangestats:get_x_data", + "//pkg/kv/kvclient/spanstats:get_x_data", + "//pkg/kv/kvclient/spanstats/spanstatsaccessor:get_x_data", "//pkg/kv/kvnemesis:get_x_data", "//pkg/kv/kvnemesis/kvnemesisutil:get_x_data", "//pkg/kv/kvprober:get_x_data", diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index 773926dcc4f7..7db076979120 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -648,6 +648,27 @@ func (c *Connector) SpanConfigConformance( return report, nil } +// SpanStats implements the spanstats.Accessor Interface. +func (c *Connector) SpanStats( + ctx context.Context, startKey, + endKey roachpb.Key, nodeID roachpb.NodeID, +) (*serverpb.InternalSpanStatsResponse, error) { + var response *serverpb.InternalSpanStatsResponse + err := c.withClient(ctx, func(ctx context.Context, c *client) error { + stats, err := c.SpanStats(ctx, &serverpb.SpanStatsRequest{ + NodeID: nodeID, + StartKey: roachpb.RKey(startKey), + EndKey: roachpb.RKey(endKey), + }) + if err != nil { + return err + } + response = (*serverpb.InternalSpanStatsResponse)(stats) + return nil + }) + return response, err +} + // GetAllSystemSpanConfigsThatApply implements the spanconfig.KVAccessor // interface. func (c *Connector) GetAllSystemSpanConfigsThatApply( diff --git a/pkg/kv/kvclient/kvtenant/BUILD.bazel b/pkg/kv/kvclient/kvtenant/BUILD.bazel index addedfb971f4..e4d2acdca5d5 100644 --- a/pkg/kv/kvclient/kvtenant/BUILD.bazel +++ b/pkg/kv/kvclient/kvtenant/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/keys", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/spanstats", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/kv/kvclient/kvtenant/connector.go b/pkg/kv/kvclient/kvtenant/connector.go index ca310ad5d765..f984d94f157b 100644 --- a/pkg/kv/kvclient/kvtenant/connector.go +++ b/pkg/kv/kvclient/kvtenant/connector.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -91,6 +92,9 @@ type Connector interface { // applicable to secondary tenants. spanconfig.KVAccessor + // Accessor provides access to span stats. + spanstats.Accessor + // Reporter provides access to conformance reports, i.e. whether ranges // backing queried keyspans conform the span configs that apply to them. spanconfig.Reporter diff --git a/pkg/kv/kvclient/spanstats/BUILD.bazel b/pkg/kv/kvclient/spanstats/BUILD.bazel new file mode 100644 index 000000000000..9406ce1587d7 --- /dev/null +++ b/pkg/kv/kvclient/spanstats/BUILD.bazel @@ -0,0 +1,15 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "spanstats", + srcs = ["span_stats.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/server/serverpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvclient/spanstats/span_stats.go b/pkg/kv/kvclient/spanstats/span_stats.go new file mode 100644 index 000000000000..943d289cd6bc --- /dev/null +++ b/pkg/kv/kvclient/spanstats/span_stats.go @@ -0,0 +1,33 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanstats + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" +) + +// Accessor mediates access to SpanStats by consumers across the KV/Tenant +// boundary. +type Accessor interface { + // SpanStats returns the span stats between startKey and endKey. + // The caller can request span stats from across the cluster by setting + // nodeId to 0. The caller can request span stats from a specific node by + // setting the value of nodeID accordingly. + SpanStats( + ctx context.Context, + startKey, + endKey roachpb.Key, + nodeID roachpb.NodeID, + ) (*serverpb.InternalSpanStatsResponse, error) +} diff --git a/pkg/kv/kvclient/spanstats/spanstatsaccessor/BUILD.bazel b/pkg/kv/kvclient/spanstats/spanstatsaccessor/BUILD.bazel new file mode 100644 index 000000000000..2c9722affeb6 --- /dev/null +++ b/pkg/kv/kvclient/spanstats/spanstatsaccessor/BUILD.bazel @@ -0,0 +1,15 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "spanstatsaccessor", + srcs = ["accessor.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats/spanstatsaccessor", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/server/serverpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvclient/spanstats/spanstatsaccessor/accessor.go b/pkg/kv/kvclient/spanstats/spanstatsaccessor/accessor.go new file mode 100644 index 000000000000..b10c7565e9b5 --- /dev/null +++ b/pkg/kv/kvclient/spanstats/spanstatsaccessor/accessor.go @@ -0,0 +1,51 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanstatsaccessor + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" +) + +// LocalAccessor is an implementation of spanstats.Accessor that is meant +// to provide access to span stats to servers that are co-located on the KV +// node. +type LocalAccessor struct { + spanStatsServer serverpb.SpanStatsServer +} + +// New returns a new instance of AccessorImpl. +func New(server serverpb.SpanStatsServer) *LocalAccessor { + return &LocalAccessor{spanStatsServer: server} +} + +// SpanStats implements the spanstats.Accessor interface. +func (a *LocalAccessor) SpanStats( + ctx context.Context, startKey, + endKey roachpb.Key, nodeID roachpb.NodeID, +) (*serverpb.InternalSpanStatsResponse, error) { + res, err := a.spanStatsServer.GetSpanStats(ctx, + &serverpb.InternalSpanStatsRequest{ + Span: roachpb.Span{ + Key: startKey, + EndKey: endKey, + }, + NodeID: nodeID, + }) + + if err != nil { + return nil, err + } + + return res, nil +} diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 90f59c72125a..76199567d511 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -248,7 +248,6 @@ go_test( "client_spanconfigs_test.go", "client_split_burst_test.go", "client_split_test.go", - "client_status_test.go", "client_store_test.go", "client_tenant_test.go", "client_test.go", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 06dd59d52255..69781dff23da 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -63,7 +63,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" @@ -3138,33 +3137,6 @@ func mapToHotReplicasInfo(repls []CandidateReplica) []HotReplicaInfo { return hotRepls } -// StoreKeySpanStats carries the result of a stats computation over a key range. -type StoreKeySpanStats struct { - ReplicaCount int - MVCC enginepb.MVCCStats - ApproximateDiskBytes uint64 -} - -// ComputeStatsForKeySpan computes the aggregated MVCCStats for all replicas on -// this store which contain any keys in the supplied range. -func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeySpanStats, error) { - var result StoreKeySpanStats - - newStoreReplicaVisitor(s).UndefinedOrder().Visit(func(repl *Replica) bool { - desc := repl.Desc() - if bytes.Compare(startKey, desc.EndKey) >= 0 || bytes.Compare(desc.StartKey, endKey) >= 0 { - return true // continue - } - result.MVCC.Add(repl.GetMVCCStats()) - result.ReplicaCount++ - return true - }) - - var err error - result.ApproximateDiskBytes, err = s.engine.ApproximateDiskBytes(startKey.AsRawKey(), endKey.AsRawKey()) - return result, err -} - // ReplicateQueueDryRun runs the given replica through the replicate queue // (using the allocator) without actually carrying out any changes, returning // all trace messages collected along the way. diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index abc74495e3f5..68d39736591c 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -115,6 +115,9 @@ func (a tenantAuthorizer) authorize( case "/cockroach.server.serverpb.Status/TransactionContentionEvents": return a.authTenant(tenID) + case "/cockroach.server.serverpb.Status/SpanStats": + return a.authTenant(tenID) + case "/cockroach.roachpb.Internal/GetSpanConfigs": return a.authGetSpanConfigs(tenID, req.(*roachpb.GetSpanConfigsRequest)) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 889536b47f86..60355a463ffe 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "init.go", "init_handshake.go", "initial_sql.go", + "key_visualizer_server.go", "listen_and_update_addrs.go", "load_endpoint.go", "loss_of_quorum.go", @@ -113,6 +114,8 @@ go_library( "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangestats", + "//pkg/kv/kvclient/spanstats", + "//pkg/kv/kvclient/spanstats/spanstatsaccessor", "//pkg/kv/kvprober", "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/allocatorimpl", @@ -410,6 +413,7 @@ go_test( "server_test.go", "settings_cache_test.go", "settings_test.go", + "span_stats_test.go", "statements_test.go", "stats_test.go", "status_ext_test.go", diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 44736c039a43..7808fc755e73 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1360,7 +1360,7 @@ func (s *adminServer) statsForSpan( req := serverpb.SpanStatsRequest{ StartKey: rSpan.Key, EndKey: rSpan.EndKey, - NodeID: nodeID.String(), + NodeID: nodeID, } spanResponse, err = client.SpanStats(ctx, &req) } diff --git a/pkg/server/key_visualizer_server.go b/pkg/server/key_visualizer_server.go new file mode 100644 index 000000000000..7e8b0144cb8d --- /dev/null +++ b/pkg/server/key_visualizer_server.go @@ -0,0 +1,226 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "sort" + "strings" + "time" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvispb" + "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvissettings" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// KeyVisualizerServer is a concrete implementation of the keyvispb.KeyVisualizerServer interface. +type KeyVisualizerServer struct { + ie *sql.InternalExecutor + settings *cluster.Settings + nodeDialer *nodedialer.Dialer + status *systemStatusServer + node *Node +} + +var _ keyvispb.KeyVisualizerServer = &KeyVisualizerServer{} + +func (s *KeyVisualizerServer) saveBoundaries( + ctx context.Context, req *keyvispb.UpdateBoundariesRequest, +) error { + encoded, err := protoutil.Marshal(req) + + if err != nil { + return err + } + // Nodes are notified about boundary changes via the keyvissubscriber.BoundarySubscriber. + _, err = s.ie.ExecEx( + ctx, + "upsert tenant boundaries", + nil, + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + `UPSERT INTO system.span_stats_tenant_boundaries( + tenant_id, + boundaries + ) VALUES ($1, $2) + `, + roachpb.SystemTenantID.ToUint64(), + encoded, + ) + + return err +} + +func (s *KeyVisualizerServer) getSamplesFromFanOut( + ctx context.Context, timestamp time.Time, +) (*keyvispb.GetSamplesResponse, error) { + + samplePeriod := keyvissettings.SampleInterval.Get(&s.settings.SV) + + dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { + conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) + return keyvispb.NewKeyVisualizerClient(conn), err + } + + nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { + samples, err := client.(keyvispb.KeyVisualizerClient).GetSamples(ctx, + &keyvispb.GetSamplesRequest{ + NodeID: nodeID, + CollectedOnOrAfter: timestamp, + }) + if err != nil { + return nil, err + } + return samples, err + } + + globalSamples := make(map[int64][]keyvispb.Sample) + + responseFn := func(nodeID roachpb.NodeID, resp interface{}) { + nodeResponse := resp.(*keyvispb.GetSamplesResponse) + + // Collection is spread across each node, so samples that belong to the + // same sample period should be aggregated together. + for _, sampleFragment := range nodeResponse.Samples { + tNanos := sampleFragment.SampleTime.Truncate(samplePeriod).UnixNano() + globalSamples[tNanos] = append(globalSamples[tNanos], sampleFragment) + } + } + + errorFn := func(nodeID roachpb.NodeID, err error) { + log.Errorf(ctx, "could not get key visualizer sample for node %d: %v", + nodeID, err) + } + + err := s.status.iterateNodes(ctx, + "iterating nodes for key visualizer samples", dialFn, nodeFn, + responseFn, errorFn) + if err != nil { + return nil, err + } + + var samples []keyvispb.Sample + for sampleTimeNanos, sampleFragments := range globalSamples { + if !verifySampleBoundariesEqual(sampleFragments) { + log.Warningf(ctx, "key visualizer sample boundaries differ between nodes") + } + samples = append(samples, keyvispb.Sample{ + SampleTime: timeutil.Unix(0, sampleTimeNanos), + SpanStats: cumulativeStats(sampleFragments), + }) + } + + return &keyvispb.GetSamplesResponse{Samples: samples}, nil +} + +// verifySampleBoundariesEqual returns true if all the samples collected +// from across the cluster belonging to the same sample period have identical +// spans. +func verifySampleBoundariesEqual(fragments []keyvispb.Sample) bool { + f0 := fragments[0] + sort.Slice(f0.SpanStats, func(a, b int) bool { + return f0.SpanStats[a].Span.Key.Compare(f0.SpanStats[b].Span.Key) == -1 + }) + + for i := 1; i < len(fragments); i++ { + fi := fragments[i] + + if len(f0.SpanStats) != len(fi.SpanStats) { + return false + } + + sort.Slice(fi.SpanStats, func(a, b int) bool { + return fi.SpanStats[a].Span.Key.Compare(fi.SpanStats[b].Span.Key) == -1 + }) + + for b, bucket := range f0.SpanStats { + if !bucket.Span.Equal(fi.SpanStats[b].Span) { + return false + } + } + } + + return true +} + +// unsafeBytesToString constructs a string from a byte slice. It is +// critical that the byte slice not be modified. +func unsafeBytesToString(data []byte) string { + return *(*string)(unsafe.Pointer(&data)) +} + +// cumulativeStats uniques and accumulates all of a sample's +// keyvispb.SpanStats from across the cluster. Stores collect statistics for +// the same spans, and the caller wants the cumulative statistics for those spans. +func cumulativeStats(fragments []keyvispb.Sample) []keyvispb.SpanStats { + var stats []keyvispb.SpanStats + for _, sampleFragment := range fragments { + stats = append(stats, sampleFragment.SpanStats...) + } + + unique := make(map[string]keyvispb.SpanStats) + for _, stat := range stats { + + var sb strings.Builder + sb.WriteString(unsafeBytesToString(stat.Span.Key)) + sb.WriteString(unsafeBytesToString(stat.Span.EndKey)) + spanAsString := sb.String() + + if uniqueStat, ok := unique[spanAsString]; ok { + uniqueStat.Requests += stat.Requests + } else { + unique[spanAsString] = keyvispb.SpanStats{ + Span: stat.Span, + Requests: stat.Requests, + } + } + } + + ret := make([]keyvispb.SpanStats, 0, len(unique)) + for _, stat := range unique { + ret = append(ret, stat) + } + return ret +} + +// GetSamples implements the keyvispb.KeyVisualizerServer interface. +func (s *KeyVisualizerServer) GetSamples( + ctx context.Context, req *keyvispb.GetSamplesRequest, +) (*keyvispb.GetSamplesResponse, error) { + + if req.NodeID == 0 { + return s.getSamplesFromFanOut(ctx, req.CollectedOnOrAfter) + } + + samples := s.node.spanStatsCollector.GetSamples( + req.CollectedOnOrAfter) + + return &keyvispb.GetSamplesResponse{Samples: samples}, nil +} + +// UpdateBoundaries implements the keyvispb.KeyVisualizerServer interface. +func (s *KeyVisualizerServer) UpdateBoundaries( + ctx context.Context, req *keyvispb.UpdateBoundariesRequest, +) (*keyvispb.UpdateBoundariesResponse, error) { + if err := s.saveBoundaries(ctx, req); err != nil { + return nil, err + } + return &keyvispb.UpdateBoundariesResponse{}, nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 8db279869848..ec42ab795f1b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -34,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats/spanstatsaccessor" "github.com/cockroachdb/cockroach/pkg/kv/kvprober" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" @@ -150,8 +152,8 @@ type Server struct { tsDB *ts.DB tsServer *ts.Server - // spanStatsServer implements `keyvispb.KeyVisualizerServer` - spanStatsServer *SpanStatsServer + // keyVisualizerServer implements `keyvispb.KeyVisualizerServer` + keyVisualizerServer *KeyVisualizerServer // The Observability Server, used by the Observability Service to subscribe to // CRDB data. @@ -170,6 +172,9 @@ type Server struct { spanConfigSubscriber spanconfig.KVSubscriber spanConfigReporter spanconfig.Reporter + // spanStatsServer services internal requests for span stats. + spanStatsServer *spanStatsServer + // pgL is the SQL listener. pgL net.Listener @@ -852,6 +857,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { ambientCtx: cfg.AmbientCtx, } + // Instantiate the span stats server. There is a circular dependency + // between server.spanStatsServer and server.systemStatusServer. + spanStats := &spanStatsServer{ + fetcher: rangestats.NewFetcher(db), + distSender: distSender, + statusServer: nil, // Circular dependency. Set below. + node: node, + } + spanStatsLocalAccessor := spanstatsaccessor.New(spanStats) + // Instantiate the status API server. sStatus := newSystemStatusServer( cfg.AmbientCtx, @@ -873,18 +888,23 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { serverIterator, spanConfig.reporter, clock, + distSender, + spanStatsLocalAccessor, ) - spanStatsServer := &SpanStatsServer{ + // The spanStatsServer needs a reference to the status server. + spanStats.statusServer = sStatus + + keyVisualizerServer := &KeyVisualizerServer{ ie: internalExecutor, settings: st, nodeDialer: nodeDialer, status: sStatus, node: node, } - spanStatsAccessor := spanstatskvaccessor.New(spanStatsServer) + keyVisServerAccessor := spanstatskvaccessor.New(keyVisualizerServer) - // Instantiate the KV prober. + // Instantiate the KV prober kvProber := kvprober.NewProber(kvprober.Opts{ Tracer: cfg.AmbientCtx.Tracer, DB: db, @@ -953,7 +973,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { nodeDescs: g, systemConfigWatcher: systemConfigWatcher, spanConfigAccessor: spanConfig.kvAccessor, - spanStatsAccessor: spanStatsAccessor, + keyVisServerAccessor: keyVisServerAccessor, + spanStatsAccessor: spanStatsLocalAccessor, nodeDialer: nodeDialer, distSender: distSender, db: db, @@ -1127,7 +1148,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { externalStorageBuilder: externalStorageBuilder, storeGrantCoords: gcoords.Stores, kvMemoryMonitor: kvMemoryMonitor, - spanStatsServer: spanStatsServer, + keyVisualizerServer: keyVisualizerServer, + spanStatsServer: spanStats, } return lateBoundServer, err @@ -1372,7 +1394,10 @@ func (s *Server) PreStart(ctx context.Context) error { obspb.RegisterObsServer(s.grpc.Server, s.eventsServer) // Register the KeyVisualizer Server - keyvispb.RegisterKeyVisualizerServer(s.grpc.Server, s.spanStatsServer) + keyvispb.RegisterKeyVisualizerServer(s.grpc.Server, s.keyVisualizerServer) + + // Register the SpanStats Server + serverpb.RegisterSpanStatsServer(s.grpc.Server, s.spanStatsServer) // Start the RPC server. This opens the RPC/SQL listen socket, // and dispatches the server worker for the RPC. diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 3faa257033d1..f50b53ef07b9 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" @@ -278,7 +279,10 @@ type sqlServerArgs struct { spanConfigAccessor spanconfig.KVAccessor // Used by the Key Visualizer job. - spanStatsAccessor *spanstatskvaccessor.SpanStatsKVAccessor + keyVisServerAccessor *spanstatskvaccessor.SpanStatsKVAccessor + + // spanStatsAccessor provides access to span stats from KV. + spanStatsAccessor spanstats.Accessor // Used by DistSQLPlanner to dial KV nodes. nodeDialer *nodedialer.Dialer @@ -975,6 +979,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { RangeStatsFetcher: rangeStatsFetcher, EventsExporter: cfg.eventsServer, NodeDescs: cfg.nodeDescs, + SpanStatsAccessor: cfg.spanStatsAccessor, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { @@ -1237,7 +1242,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if codec.ForSystemTenant() { ri := kvcoord.MakeRangeIterator(cfg.distSender) spanStatsConsumer := spanstatsconsumer.New( - cfg.spanStatsAccessor, + cfg.keyVisServerAccessor, &ri, cfg.Settings, cfg.circularInternalExecutor, diff --git a/pkg/server/serverpb/BUILD.bazel b/pkg/server/serverpb/BUILD.bazel index 460f1d909c47..e447496c0149 100644 --- a/pkg/server/serverpb/BUILD.bazel +++ b/pkg/server/serverpb/BUILD.bazel @@ -13,6 +13,7 @@ proto_library( "index_recommendations.proto", "init.proto", "migration.proto", + "span_stats.proto", "status.proto", ], strip_import_prefix = "/pkg", diff --git a/pkg/server/serverpb/span_stats.proto b/pkg/server/serverpb/span_stats.proto new file mode 100644 index 000000000000..1e2e2460365d --- /dev/null +++ b/pkg/server/serverpb/span_stats.proto @@ -0,0 +1,43 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + + +syntax = "proto3"; +package cockroach.server.serverpb; +option go_package = "serverpb"; + +import "roachpb/data.proto"; +import "storage/enginepb/mvcc.proto"; +import "gogoproto/gogo.proto"; +import "google/api/annotations.proto"; + +// InternalSpanStatsRequest +message InternalSpanStatsRequest { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + + // A node_id of `0` indicates the server should issue a fan-out to all nodes. + int32 node_id = 2 [ + (gogoproto.customname) = "NodeID", + (gogoproto.casttype) = + "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" + ]; +} + + +message InternalSpanStatsResponse { + int32 range_count = 2; + uint64 approximate_disk_bytes = 3; + cockroach.storage.enginepb.MVCCStats total_stats = 1 + [ (gogoproto.nullable) = false ]; +} + +service SpanStats { + rpc GetSpanStats(InternalSpanStatsRequest) returns (InternalSpanStatsResponse) {} +} diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index fb59daef508b..7d733c3e7f3e 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -1184,7 +1184,11 @@ message ListDistSQLFlowsResponse { } message SpanStatsRequest { - string node_id = 1 [ (gogoproto.customname) = "NodeID" ]; + int32 node_id = 1 [ + (gogoproto.customname) = "NodeID", + (gogoproto.casttype) = + "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID" + ]; bytes start_key = 2 [ (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RKey" ]; diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 44c2b9a51b31..c61a2f516d27 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -1,4 +1,4 @@ -// Copyright 2022 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -12,214 +12,230 @@ package server import ( "context" - "sort" - "strings" - "time" - "unsafe" - "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvispb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) -// SpanStatsServer is a concrete implementation of the keyvispb.KeyVisualizerServer interface. -type SpanStatsServer struct { - ie *sql.InternalExecutor - settings *cluster.Settings - nodeDialer *nodedialer.Dialer - status *systemStatusServer - node *Node +// spanStatsServer implements serverpb.SpanStatsServer. +// It services requests for serverpb.InternalSpanStatsRequest. +type spanStatsServer struct { + fetcher *rangestats.Fetcher + distSender *kvcoord.DistSender + statusServer *systemStatusServer + nodeDialer *nodedialer.Dialer + node *Node } -var _ keyvispb.KeyVisualizerServer = &SpanStatsServer{} +var _ serverpb.SpanStatsServer = &spanStatsServer{} -func (s *SpanStatsServer) saveBoundaries( - ctx context.Context, req *keyvispb.UpdateBoundariesRequest, -) error { +func (s *spanStatsServer) dialFn(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { + conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) + return serverpb.NewSpanStatsClient(conn), err +} - encoded, err := protoutil.Marshal(req) +func (s *spanStatsServer) fanOut( + ctx context.Context, req *serverpb.InternalSpanStatsRequest, +) (*serverpb.InternalSpanStatsResponse, error) { + var res *serverpb.InternalSpanStatsResponse + rSpan, err := keys.SpanAddr(req.Span) if err != nil { - return err + return nil, err + } + nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, s.distSender, rSpan) + if err != nil { + return nil, err + } + nodesWithReplica := make(map[roachpb.NodeID]bool) + for _, nodeID := range nodeIDs { + nodesWithReplica[nodeID] = true } - // Nodes are notified about boundary changes via the keyvissubscriber.BoundarySubscriber. - _, err = s.ie.ExecEx( - ctx, - "upsert tenant boundaries", - nil, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - `UPSERT INTO system.span_stats_tenant_boundaries( - tenant_id, - boundaries - ) VALUES ($1, $2) - `, - roachpb.SystemTenantID.ToUint64(), - encoded, - ) - - return err -} - -func (s *SpanStatsServer) getSamplesFromFanOut( - ctx context.Context, timestamp time.Time, -) (*keyvispb.GetSamplesResponse, error) { - - dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { - conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) - return keyvispb.NewKeyVisualizerClient(conn), err + // We should only fan out to a node if it has replicas for this span. + // A blind fan out would be wasteful. + smartDial := func( + ctx context.Context, + nodeID roachpb.NodeID, + ) (interface{}, error) { + if _, ok := nodesWithReplica[nodeID]; ok { + return s.dialFn(ctx, nodeID) + } + return nil, nil } nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { - samples, err := client.(keyvispb.KeyVisualizerClient).GetSamples(ctx, - &keyvispb.GetSamplesRequest{ - NodeID: nodeID, - CollectedOnOrAfter: timestamp, + // `smartDial` may skip this node, so check to see if the client is nil. + if client == nil { + return &serverpb.InternalSpanStatsResponse{}, nil + } + stats, err := client.(serverpb.SpanStatsClient).GetSpanStats(ctx, + &serverpb.InternalSpanStatsRequest{ + Span: req.Span, + NodeID: nodeID, }) if err != nil { return nil, err } - return samples, err + return stats, err } - globalSamples := make(map[int64][]keyvispb.Sample) - responseFn := func(nodeID roachpb.NodeID, resp interface{}) { - nodeResponse := resp.(*keyvispb.GetSamplesResponse) - - // Collection is spread across each node, so samples that belong to the - // same sample period should be aggregated together. The collectors - // guarantee that corresponding sample fragments have the same sample time. - for _, sampleFragment := range nodeResponse.Samples { - sampleTime := sampleFragment.SampleTime.UnixNano() - globalSamples[sampleTime] = append(globalSamples[sampleTime], sampleFragment) - } + nodeResponse := resp.(*serverpb.InternalSpanStatsResponse) + res.ApproximateDiskBytes += nodeResponse.ApproximateDiskBytes + res.TotalStats.Add(nodeResponse.TotalStats) + res.RangeCount += nodeResponse.RangeCount } errorFn := func(nodeID roachpb.NodeID, err error) { log.Errorf(ctx, "could not get span stats sample for node %d: %v", nodeID, err) } - err := s.status.iterateNodes(ctx, "iterating nodes for span stats", dialFn, nodeFn, responseFn, errorFn) - - if err != nil { + if err := s.statusServer.iterateNodes( + ctx, + "iterating nodes for span stats", + smartDial, + nodeFn, + responseFn, + errorFn, + ); err != nil { return nil, err } - var samples []keyvispb.Sample - for sampleTimeNanos, sampleFragments := range globalSamples { - if !verifySampleBoundariesEqual(sampleFragments) { - log.Warningf(ctx, "key visualizer sample boundaries differ between nodes") - } - samples = append(samples, keyvispb.Sample{ - SampleTime: timeutil.Unix(0, sampleTimeNanos), - SpanStats: cumulativeStats(sampleFragments), - }) - } - - return &keyvispb.GetSamplesResponse{Samples: samples}, nil + return res, nil } -// verifySampleBoundariesEqual returns true if all the samples collected -// from across the cluster belonging to the same sample period have identical -// spans. -func verifySampleBoundariesEqual(fragments []keyvispb.Sample) bool { - - f0 := fragments[0] - sort.Slice(f0.SpanStats, func(a, b int) bool { - return f0.SpanStats[a].Span.Key.Compare(f0.SpanStats[b].Span.Key) == -1 - }) +func (s *spanStatsServer) getLocalStats( + ctx context.Context, req *serverpb.InternalSpanStatsRequest, +) (*serverpb.InternalSpanStatsResponse, error) { + res := &serverpb.InternalSpanStatsResponse{} - for i := 1; i < len(fragments); i++ { - fi := fragments[i] + sp, err := keys.SpanAddr(req.Span) + if err != nil { + return nil, err + } + ri := kvcoord.MakeRangeIterator(s.distSender) + ri.Seek(ctx, sp.Key, kvcoord.Ascending) - if len(f0.SpanStats) != len(fi.SpanStats) { - return false + for { + if !ri.Valid() { + return nil, ri.Error() } - sort.Slice(fi.SpanStats, func(a, b int) bool { - return fi.SpanStats[a].Span.Key.Compare(fi.SpanStats[b].Span.Key) == -1 - }) - - for b, bucket := range f0.SpanStats { - if !bucket.Span.Equal(fi.SpanStats[b].Span) { - return false + desc := ri.Desc() + descSpan := desc.RSpan() + res.RangeCount += 1 + + // Is the descriptor fully contained by the request span? + if sp.ContainsKeyRange(descSpan.Key, desc.EndKey) { + // If so, obtain stats for this range via RangeStats. + rangeStats, err := s.fetcher.RangeStats(ctx, + desc.StartKey.AsRawKey()) + if err != nil { + return nil, err + } + for _, resp := range rangeStats { + res.TotalStats.Add(resp.MVCCStats) } - } - } - return true -} + } else { + // Otherwise, do an MVCC Scan. + // We should only scan the part of the range that our request span + // encompasses. + scanStart := sp.Key + scanEnd := sp.EndKey + // If our request span began before the start of this range, + // start scanning from this range's start key. + if descSpan.Key.Compare(sp.Key) == 1 { + scanStart = descSpan.Key + } + // If our request span ends after the end of this range, + // stop scanning at this range's end key. + if descSpan.EndKey.Compare(sp.EndKey) == -1 { + scanEnd = descSpan.EndKey + } + err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + stats, err := storage.ComputeStats( + s.Engine(), + scanStart.AsRawKey(), + scanEnd.AsRawKey(), + timeutil.Now().UnixNano(), + ) + + if err != nil { + return err + } + + res.TotalStats.Add(stats) + return nil + }) -// unsafeBytesToString constructs a string from a byte slice. It is -// critical that the byte slice not be modified. -func unsafeBytesToString(data []byte) string { - return *(*string)(unsafe.Pointer(&data)) -} + if err != nil { + return nil, err + } + } -// cumulativeStats uniques and accumulates all of a sample's -// keyvispb.SpanStats from across the cluster. Stores collect statistics for -// the same spans, and the caller wants the cumulative statistics for those spans. -func cumulativeStats(fragments []keyvispb.Sample) []keyvispb.SpanStats { - var stats []keyvispb.SpanStats - for _, sampleFragment := range fragments { - stats = append(stats, sampleFragment.SpanStats...) + if !ri.NeedAnother(sp) { + break + } + ri.Next(ctx) } - unique := make(map[string]keyvispb.SpanStats) - for _, stat := range stats { - - var sb strings.Builder - sb.WriteString(unsafeBytesToString(stat.Span.Key)) - sb.WriteString(unsafeBytesToString(stat.Span.EndKey)) - spanAsString := sb.String() - - if uniqueStat, ok := unique[spanAsString]; ok { - uniqueStat.Requests += stat.Requests - } else { - unique[spanAsString] = keyvispb.SpanStats{ - Span: stat.Span, - Requests: stat.Requests, - } + // Finally, get the approximate disk bytes from each store. + err = s.node.stores.VisitStores(func(store *kvserver.Store) error { + approxDiskBytes, err := store.Engine().ApproximateDiskBytes( + req.Span.Key, req.Span.EndKey) + if err != nil { + return err } - } + res.ApproximateDiskBytes += approxDiskBytes + return nil + }) - ret := make([]keyvispb.SpanStats, 0, len(unique)) - for _, stat := range unique { - ret = append(ret, stat) + if err != nil { + return nil, err } - return ret -} -// GetSamples implements the keyvispb.KeyVisualizerServer interface. -func (s *SpanStatsServer) GetSamples( - ctx context.Context, req *keyvispb.GetSamplesRequest, -) (*keyvispb.GetSamplesResponse, error) { + return res, nil +} +// GetSpanStats will return span stats according to the nodeID specified by req. +// If req.NodeID == 0, a fan out is done to collect and sum span stats from +// across the cluster. Otherwise, the node specified will be dialed, +// if the local node isn't already the node specified. If the node specified +// is the local node, span stats are computed and returned. +func (s *spanStatsServer) GetSpanStats( + ctx context.Context, req *serverpb.InternalSpanStatsRequest, +) (*serverpb.InternalSpanStatsResponse, error) { + // Perform a fan out when the requested NodeID is 0. if req.NodeID == 0 { - return s.getSamplesFromFanOut(ctx, req.CollectedOnOrAfter) + return s.fanOut(ctx, req) } - samples := s.node.spanStatsCollector.GetSamples( - req.CollectedOnOrAfter) + // See if the requested node is the local node. + _, local, err := s.statusServer.parseNodeID(req.NodeID.String()) + if err != nil { + return nil, err + } - return &keyvispb.GetSamplesResponse{Samples: samples}, nil -} + // If the requested node is the local node, return stats. + if local { + return s.getLocalStats(ctx, req) + } -// UpdateBoundaries implements the keyvispb.KeyVisualizerServer interface. -func (s *SpanStatsServer) UpdateBoundaries( - ctx context.Context, req *keyvispb.UpdateBoundariesRequest, -) (*keyvispb.UpdateBoundariesResponse, error) { - if err := s.saveBoundaries(ctx, req); err != nil { + // Otherwise, dial the correct node, and ask for span stats. + client, err := s.dialFn(ctx, req.NodeID) + if err != nil { return nil, err } - return &keyvispb.UpdateBoundariesResponse{}, nil + return client.(serverpb.SpanStatsClient).GetSpanStats(ctx, req) } diff --git a/pkg/kv/kvserver/client_status_test.go b/pkg/server/span_stats_test.go similarity index 80% rename from pkg/kv/kvserver/client_status_test.go rename to pkg/server/span_stats_test.go index 69eb92152501..95f4e4403c5d 100644 --- a/pkg/kv/kvserver/client_status_test.go +++ b/pkg/server/span_stats_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvserver_test +package server import ( "context" @@ -17,7 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestComputeStatsForKeySpan(t *testing.T) { +func TestLocalSpanStats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -36,7 +36,7 @@ func TestComputeStatsForKeySpan(t *testing.T) { Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true}, }, }) - s := serv.(*server.TestServer) + s := serv.(*TestServer) defer s.Stopper().Stop(ctx) store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) @@ -68,24 +68,34 @@ func TestComputeStatsForKeySpan(t *testing.T) { for _, tcase := range []struct { startKey string endKey string - expectedRanges int + expectedRanges int32 expectedKeys int64 }{ {"a", "i", 4, 6}, {"a", "c", 1, 3}, {"b", "e", 2, 5}, {"e", "i", 2, 1}, + {"b", "d", 2, 3}, + {"b", "bbb", 1, 2}, } { start, end := tcase.startKey, tcase.endKey - result, err := store.ComputeStatsForKeySpan( - roachpb.RKey(start), roachpb.RKey(end)) + result, err := s.spanStatsServer.getLocalStats(ctx, + &serverpb.InternalSpanStatsRequest{ + Span: roachpb.Span{ + Key: roachpb.Key(start), + EndKey: roachpb.Key(end), + }, + NodeID: 0, + }, + ) if err != nil { t.Fatal(err) } - if a, e := result.ReplicaCount, tcase.expectedRanges; a != e { + + if a, e := result.RangeCount, tcase.expectedRanges; a != e { t.Errorf("Expected %d ranges in span [%s - %s], found %d", e, start, end, a) } - if a, e := result.MVCC.LiveCount, tcase.expectedKeys; a != e { + if a, e := result.TotalStats.LiveCount, tcase.expectedKeys; a != e { t.Errorf("Expected %d keys in span [%s - %s], found %d", e, start, end, a) } } diff --git a/pkg/server/status.go b/pkg/server/status.go index 54aade2dee81..1c39b5d19af5 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -39,6 +39,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvisstorage" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -463,6 +465,7 @@ type statusServer struct { si systemInfoOnce stmtDiagnosticsRequester StmtDiagnosticsRequester internalExecutor *sql.InternalExecutor + spanStatsAccessor spanstats.Accessor // cancelSemaphore is a semaphore that limits the number of // concurrent calls to the pgwire query cancellation endpoint. This @@ -554,6 +557,7 @@ func newStatusServer( internalExecutor *sql.InternalExecutor, serverIterator ServerIterator, clock *hlc.Clock, + spanStatsAccessor spanstats.Accessor, ) *statusServer { ambient.AddLogTag("status", nil) if !rpcCtx.TenantID.IsSystem() { @@ -573,10 +577,11 @@ func newStatusServer( serverIterator: serverIterator, clock: clock, }, - cfg: cfg, - db: db, - metricSource: metricSource, - internalExecutor: internalExecutor, + cfg: cfg, + db: db, + metricSource: metricSource, + internalExecutor: internalExecutor, + spanStatsAccessor: spanStatsAccessor, // See the docstring on cancelSemaphore for details about this initialization. cancelSemaphore: quotapool.NewIntPool("pgwire-cancel", 256), @@ -606,6 +611,8 @@ func newSystemStatusServer( serverIterator ServerIterator, spanConfigReporter spanconfig.Reporter, clock *hlc.Clock, + distSender *kvcoord.DistSender, + spanStatsAccessor spanstats.Accessor, ) *systemStatusServer { server := newStatusServer( ambient, @@ -622,6 +629,7 @@ func newSystemStatusServer( internalExecutor, serverIterator, clock, + spanStatsAccessor, ) return &systemStatusServer{ @@ -3385,7 +3393,7 @@ func (s *statusServer) ListExecutionInsights( // SpanStats requests the total statistics stored on a node for a given key // span, which may include multiple ranges. -func (s *systemStatusServer) SpanStats( +func (s *statusServer) SpanStats( ctx context.Context, req *serverpb.SpanStatsRequest, ) (*serverpb.SpanStatsResponse, error) { ctx = forwardSQLIdentityThroughRPCCalls(ctx) @@ -3397,35 +3405,14 @@ func (s *systemStatusServer) SpanStats( return nil, err } - nodeID, local, err := s.parseNodeID(req.NodeID) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, err.Error()) - } - - if !local { - status, err := s.dialNode(ctx, nodeID) - if err != nil { - return nil, serverError(ctx, err) - } - return status.SpanStats(ctx, req) - } - - output := &serverpb.SpanStatsResponse{} - err = s.stores.VisitStores(func(store *kvserver.Store) error { - result, err := store.ComputeStatsForKeySpan(req.StartKey.Next(), req.EndKey) - if err != nil { - return err - } - output.TotalStats.Add(result.MVCC) - output.RangeCount += int32(result.ReplicaCount) - output.ApproximateDiskBytes += result.ApproximateDiskBytes - return nil - }) - if err != nil { - return nil, serverError(ctx, err) - } + res, err := s.spanStatsAccessor.SpanStats( + ctx, + roachpb.Key(req.StartKey), + roachpb.Key(req.EndKey), + req.NodeID, + ) - return output, nil + return (*serverpb.SpanStatsResponse)(res), err } // Diagnostics returns an anonymized diagnostics report. diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 6d6ad4ea9b5b..6e6125319b6c 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1366,7 +1366,7 @@ func TestSpanStatsResponse(t *testing.T) { var response serverpb.SpanStatsResponse request := serverpb.SpanStatsRequest{ - NodeID: "1", + NodeID: 1, StartKey: []byte(roachpb.RKeyMin), EndKey: []byte(roachpb.RKeyMax), } @@ -1395,7 +1395,7 @@ func TestSpanStatsGRPCResponse(t *testing.T) { defer rpcStopper.Stop(ctx) rpcContext := newRPCTestContext(ctx, ts, ts.RPCContext().Config) request := serverpb.SpanStatsRequest{ - NodeID: "1", + NodeID: 1, StartKey: []byte(roachpb.RKeyMin), EndKey: []byte(roachpb.RKeyMax), } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 749d8a2ffb7b..b9e6c73087be 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -237,6 +237,7 @@ func NewTenantServer( args.circularInternalExecutor, serverIterator, args.clock, + args.spanStatsAccessor, ) args.sqlStatusServer = sStatus @@ -1032,7 +1033,6 @@ func makeTenantSQLServerArgs( // TODO(irfansharif): hook up NewGrantCoordinatorSQL. var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil - return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil), @@ -1064,6 +1064,7 @@ func makeTenantSQLServerArgs( nodeDescs: tenantConnect, systemConfigWatcher: systemConfigWatcher, spanConfigAccessor: tenantConnect, + spanStatsAccessor: tenantConnect, nodeDialer: nodeDialer, distSender: ds, db: db, diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index f56f0f75dc15..96bb3b657dfb 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -310,6 +310,7 @@ go_library( "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvclient/rangecache", "//pkg/kv/kvclient/rangefeed", + "//pkg/kv/kvclient/spanstats", "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/liveness/livenesspb", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 15f6f9776713..15cb49c78097 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/spanstats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/multitenant" @@ -1351,6 +1352,9 @@ type ExecutorConfig struct { // records. SpanConfigKVAccessor spanconfig.KVAccessor + // SpanStatsAccessor provides access to span stats from KV. + SpanStatsAccessor spanstats.Accessor + // InternalDB is used to create an isql.Executor bound with SessionData and // other ExtraTxnState. InternalDB *InternalDB