From 4fee2a42e5f2c10cf0bb5d422121243928a6f297 Mon Sep 17 00:00:00 2001 From: Zach Lite Date: Fri, 27 Jan 2023 18:27:56 -0500 Subject: [PATCH] kvclient, server: implement SpanStats for use with coalesced ranges This commit provides a re-implementation of fetching the MVCC stats and range count for a given span. The previous implementation relied on the assumption that 1 range can contain at most 1 table/index. Since GH-79700, this assumption is no longer true. This re-implementation allows secondary tenants to access span stats via the serverpb.TenantStatusServer interface. If a roachpb.SpanStatsRequest has a node_id value of "0", instead of a specific node id, the response will yield cluster-wide values. To achieve this, the server does a fan-out to nodes that are known to have replicas for the span requested. The interaction between tenants is illustrated below: System Tenant +----------------------------------------------------+ | | | | | KV Node fan-out | | +----------+ | | | | | | +-------------------------------v----+ | | | | | | | | | server.systemStatusServer +-----+ | | | | | | +----------------+-------------------+ | | | | | | | | | via TenantStatusServer | | +--------------+ | | | | | | | | | | | v | | | +-----------+ | | | | | | | | | SQLServer | | | | | | | | | +-----------+ | | | | +----------------------v-----------------------------+ | | | | roachpb.SpanStatsResponse | | Secondary Tenant | +----------------------+------------------------------+ | | | | +------------v-------------+ | | | | | | | | kvtenantccl.Connector | | | | | | | +------------+-------------+ | | | | | | via TenantStatusServer | | +--------------------+ | | | | | | | | | | | | | | +-----------------v-------+ +------v------+ | | | | | | | | | server.statusServer | | SQLServer | | | | | | | | | +-------------------------+ +-------------+ | | | | | | | +-----------------------------------------------------+ Resolves https://github.com/cockroachdb/cockroach/issues/84105 Part of: https://cockroachlabs.atlassian.net/browse/CRDB-22711 Release note: None --- docs/generated/http/full.md | 32 -- pkg/ccl/kvccl/kvtenantccl/connector.go | 16 + .../serverccl/statusccl/tenant_status_test.go | 70 ++++ pkg/kv/kvserver/BUILD.bazel | 1 - pkg/kv/kvserver/store.go | 28 -- pkg/roachpb/BUILD.bazel | 3 + pkg/roachpb/span_stats.proto | 38 +++ pkg/rpc/auth_tenant.go | 12 + pkg/server/BUILD.bazel | 2 + pkg/server/admin.go | 6 +- pkg/server/key_visualizer_server.go | 226 +++++++++++++ pkg/server/server.go | 17 +- pkg/server/server_sql.go | 5 +- pkg/server/serverpb/BUILD.bazel | 5 +- pkg/server/serverpb/status.go | 4 + pkg/server/serverpb/status.proto | 19 +- pkg/server/span_stats_server.go | 318 +++++++++--------- .../span_stats_test.go} | 27 +- pkg/server/status.go | 56 ++- pkg/server/status_test.go | 6 +- pkg/server/tenant.go | 1 - 21 files changed, 595 insertions(+), 297 deletions(-) create mode 100644 pkg/roachpb/span_stats.proto create mode 100644 pkg/server/key_visualizer_server.go rename pkg/{kv/kvserver/client_status_test.go => server/span_stats_test.go} (82%) diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index c7e5fc512de0..3d7fef53f7e0 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2930,42 +2930,10 @@ Support status: [reserved](#support-status) - - - - -| Field | Type | Label | Description | Support status | -| ----- | ---- | ----- | ----------- | -------------- | -| node_id | [string](#cockroach.server.serverpb.SpanStatsRequest-string) | | | [reserved](#support-status) | -| start_key | [bytes](#cockroach.server.serverpb.SpanStatsRequest-bytes) | | | [reserved](#support-status) | -| end_key | [bytes](#cockroach.server.serverpb.SpanStatsRequest-bytes) | | | [reserved](#support-status) | - - - - - - - #### Response Parameters - - - - -| Field | Type | Label | Description | Support status | -| ----- | ---- | ----- | ----------- | -------------- | -| range_count | [int32](#cockroach.server.serverpb.SpanStatsResponse-int32) | | | [reserved](#support-status) | -| approximate_disk_bytes | [uint64](#cockroach.server.serverpb.SpanStatsResponse-uint64) | | | [reserved](#support-status) | -| total_stats | [cockroach.storage.enginepb.MVCCStats](#cockroach.server.serverpb.SpanStatsResponse-cockroach.storage.enginepb.MVCCStats) | | | [reserved](#support-status) | - - - - - - - ## Stacks `GET /_status/stacks/{node_id}` diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index e0ae0d25f253..5f2c53426fa4 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -648,6 +648,22 @@ func (c *Connector) SpanConfigConformance( return report, nil } +// SpanStats implements the serverpb.TenantStatusServer interface. +func (c *Connector) SpanStats( + ctx context.Context, req *roachpb.SpanStatsRequest, +) (*roachpb.SpanStatsResponse, error) { + var response *roachpb.SpanStatsResponse + err := c.withClient(ctx, func(ctx context.Context, c *client) error { + stats, err := c.SpanStats(ctx, req) + if err != nil { + return err + } + response = stats + return nil + }) + return response, err +} + // GetAllSystemSpanConfigsThatApply implements the spanconfig.KVAccessor // interface. func (c *Connector) GetAllSystemSpanConfigsThatApply( diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index ba35f0025226..8816133e98cd 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -9,6 +9,7 @@ package statusccl import ( + "bytes" "context" gosql "database/sql" "encoding/hex" @@ -141,6 +142,75 @@ func TestTenantStatusAPI(t *testing.T) { t.Run("tenant_hot_ranges", func(t *testing.T) { testTenantHotRanges(ctx, t, testHelper) }) + + t.Run("tenant_span_stats", func(t *testing.T) { + testTenantSpanStats(ctx, t, testHelper) + }) +} + +func testTenantSpanStats(ctx context.Context, t *testing.T, helper serverccl.TenantTestHelper) { + tenantA := helper.TestCluster().Tenant(0) + tenantB := helper.ControlCluster().Tenant(0) + + aSpan := tenantA.GetTenant().Codec().TenantSpan() + bSpan := tenantB.GetTenant().Codec().TenantSpan() + + t.Run("test tenant isolation", func(t *testing.T) { + _, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + StartKey: roachpb.RKey(bSpan.Key), + EndKey: roachpb.RKey(bSpan.EndKey), + }) + require.Error(t, err) + }) + + t.Run("test KV node fan-out", func(t *testing.T) { + _, tID, err := keys.DecodeTenantPrefix(aSpan.Key) + require.NoError(t, err) + tPrefix := keys.MakeTenantPrefix(tID) + + makeKey := func(keys ...[]byte) roachpb.Key { + return bytes.Join(keys, nil) + } + + controlStats, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + StartKey: roachpb.RKey(aSpan.Key), + EndKey: roachpb.RKey(aSpan.EndKey), + }) + require.NoError(t, err) + + // Create a new range in this tenant. + _, _, err = helper.HostCluster().Server(0).SplitRange(makeKey(tPrefix, roachpb.Key("c"))) + require.NoError(t, err) + + // Wait for the split to finish and propagate. + err = helper.HostCluster().WaitForFullReplication() + require.NoError(t, err) + + // Create 6 new keys across the tenant's ranges. + incKeys := []string{"a", "b", "bb", "d", "e", "f"} + for _, incKey := range incKeys { + // Prefix each key appropriately for this tenant. + k := makeKey(keys.MakeTenantPrefix(tID), []byte(incKey)) + if _, err := helper.HostCluster().Server(0).DB().Inc(ctx, k, 5); err != nil { + t.Fatal(err) + } + } + + stats, err := tenantA.TenantStatusSrv().(serverpb.TenantStatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", // 0 indicates we want stats from all nodes. + StartKey: roachpb.RKey(aSpan.Key), + EndKey: roachpb.RKey(aSpan.EndKey), + }) + + require.NoError(t, err) + require.Equal(t, controlStats.RangeCount+1, stats.RangeCount) + require.Equal(t, controlStats.TotalStats.LiveCount+int64(len(incKeys)), stats.TotalStats.LiveCount) + }) } func testTenantLogs(ctx context.Context, t *testing.T, helper serverccl.TenantTestHelper) { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 27bcb9e855b3..095e9eb0d6bb 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -249,7 +249,6 @@ go_test( "client_spanconfigs_test.go", "client_split_burst_test.go", "client_split_test.go", - "client_status_test.go", "client_store_test.go", "client_tenant_test.go", "client_test.go", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index c2be10157628..bceb8ad0f107 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -63,7 +63,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" @@ -3231,33 +3230,6 @@ func mapToHotReplicasInfo(repls []CandidateReplica) []HotReplicaInfo { return hotRepls } -// StoreKeySpanStats carries the result of a stats computation over a key range. -type StoreKeySpanStats struct { - ReplicaCount int - MVCC enginepb.MVCCStats - ApproximateDiskBytes uint64 -} - -// ComputeStatsForKeySpan computes the aggregated MVCCStats for all replicas on -// this store which contain any keys in the supplied range. -func (s *Store) ComputeStatsForKeySpan(startKey, endKey roachpb.RKey) (StoreKeySpanStats, error) { - var result StoreKeySpanStats - - newStoreReplicaVisitor(s).UndefinedOrder().Visit(func(repl *Replica) bool { - desc := repl.Desc() - if bytes.Compare(startKey, desc.EndKey) >= 0 || bytes.Compare(desc.StartKey, endKey) >= 0 { - return true // continue - } - result.MVCC.Add(repl.GetMVCCStats()) - result.ReplicaCount++ - return true - }) - - var err error - result.ApproximateDiskBytes, err = s.TODOEngine().ApproximateDiskBytes(startKey.AsRawKey(), endKey.AsRawKey()) - return result, err -} - // ReplicateQueueDryRun runs the given replica through the replicate queue // (using the allocator) without actually carrying out any changes, returning // all trace messages collected along the way. diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index 286159f39236..300e22199c7b 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -141,6 +141,7 @@ proto_library( "io-formats.proto", "metadata.proto", "span_config.proto", + "span_stats.proto", ], strip_import_prefix = "/pkg", visibility = ["//visibility:public"], @@ -158,6 +159,7 @@ proto_library( "@com_github_gogo_protobuf//gogoproto:gogo_proto", "@com_google_protobuf//:duration_proto", "@com_google_protobuf//:timestamp_proto", + "@go_googleapis//google/api:annotations_proto", ], ) @@ -179,6 +181,7 @@ go_proto_library( "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//errorspb", "@com_github_gogo_protobuf//gogoproto", + "@org_golang_google_genproto//googleapis/api/annotations:go_default_library", ], ) diff --git a/pkg/roachpb/span_stats.proto b/pkg/roachpb/span_stats.proto new file mode 100644 index 000000000000..c3a72a06a2c3 --- /dev/null +++ b/pkg/roachpb/span_stats.proto @@ -0,0 +1,38 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.roachpb; +option go_package = "roachpb"; + +import "storage/enginepb/mvcc.proto"; +import "gogoproto/gogo.proto"; +import "google/api/annotations.proto"; + +// SpanStatsRequest is used to request a SpanStatsResponse for the given key +// span and node id. A node_id value of 0 indicates that the server should +// fan-out to all nodes, and the resulting SpanStatsResponse is a cumulative +// result from across the cluster. +message SpanStatsRequest { + string node_id = 1 [(gogoproto.customname) = "NodeID"]; + bytes start_key = 2 [(gogoproto.casttype) = "RKey"]; + bytes end_key = 3 [(gogoproto.casttype) = "RKey"]; +} + +message SpanStatsResponse { + // range_count measures the number of ranges that the request span falls within. + // A SpanStatsResponse for a span that lies within a range, and whose start + // key sorts after the range start, and whose end key sorts before the + // range end, will have a range_count value of 1. + int32 range_count = 2; + uint64 approximate_disk_bytes = 3; + cockroach.storage.enginepb.MVCCStats total_stats = 1 + [(gogoproto.nullable) = false]; +} diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 6b046b3a2b18..9219aa3664ad 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -120,6 +120,9 @@ func (a tenantAuthorizer) authorize( case "/cockroach.server.serverpb.Status/TransactionContentionEvents": return a.authTenant(tenID) + case "/cockroach.server.serverpb.Status/SpanStats": + return a.authSpanStats(tenID, req.(*roachpb.SpanStatsRequest)) + case "/cockroach.roachpb.Internal/GetSpanConfigs": return a.authGetSpanConfigs(tenID, req.(*roachpb.GetSpanConfigsRequest)) @@ -224,6 +227,15 @@ func (a tenantAuthorizer) authGetRangeDescriptors( return validateSpan(tenID, args.Span) } +func (a tenantAuthorizer) authSpanStats( + tenID roachpb.TenantID, args *roachpb.SpanStatsRequest, +) error { + return validateSpan(tenID, roachpb.Span{ + Key: args.StartKey.AsRawKey(), + EndKey: args.EndKey.AsRawKey(), + }) +} + // authRangeLookup authorizes the provided tenant to invoke the RangeLookup RPC // with the provided args. func (a tenantAuthorizer) authRangeLookup( diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 4d07989f7478..429c8e870d90 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "init.go", "init_handshake.go", "initial_sql.go", + "key_visualizer_server.go", "listen_and_update_addrs.go", "load_endpoint.go", "loss_of_quorum.go", @@ -412,6 +413,7 @@ go_test( "server_test.go", "settings_cache_test.go", "settings_test.go", + "span_stats_test.go", "statements_test.go", "stats_test.go", "status_ext_test.go", diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 80c7e64e7e45..9b194c7fd617 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1335,7 +1335,7 @@ func (s *adminServer) statsForSpan( } type nodeResponse struct { nodeID roachpb.NodeID - resp *serverpb.SpanStatsResponse + resp *roachpb.SpanStatsResponse err error } @@ -1351,13 +1351,13 @@ func (s *adminServer) statsForSpan( }, func(ctx context.Context) { // Set a generous timeout on the context for each individual query. - var spanResponse *serverpb.SpanStatsResponse + var spanResponse *roachpb.SpanStatsResponse err := contextutil.RunWithTimeout(ctx, "request remote stats", 20*time.Second, func(ctx context.Context) error { conn, err := s.serverIterator.dialNode(ctx, serverID(nodeID)) if err == nil { client := serverpb.NewStatusClient(conn) - req := serverpb.SpanStatsRequest{ + req := roachpb.SpanStatsRequest{ StartKey: rSpan.Key, EndKey: rSpan.EndKey, NodeID: nodeID.String(), diff --git a/pkg/server/key_visualizer_server.go b/pkg/server/key_visualizer_server.go new file mode 100644 index 000000000000..7e8b0144cb8d --- /dev/null +++ b/pkg/server/key_visualizer_server.go @@ -0,0 +1,226 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "sort" + "strings" + "time" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvispb" + "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvissettings" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// KeyVisualizerServer is a concrete implementation of the keyvispb.KeyVisualizerServer interface. +type KeyVisualizerServer struct { + ie *sql.InternalExecutor + settings *cluster.Settings + nodeDialer *nodedialer.Dialer + status *systemStatusServer + node *Node +} + +var _ keyvispb.KeyVisualizerServer = &KeyVisualizerServer{} + +func (s *KeyVisualizerServer) saveBoundaries( + ctx context.Context, req *keyvispb.UpdateBoundariesRequest, +) error { + encoded, err := protoutil.Marshal(req) + + if err != nil { + return err + } + // Nodes are notified about boundary changes via the keyvissubscriber.BoundarySubscriber. + _, err = s.ie.ExecEx( + ctx, + "upsert tenant boundaries", + nil, + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + `UPSERT INTO system.span_stats_tenant_boundaries( + tenant_id, + boundaries + ) VALUES ($1, $2) + `, + roachpb.SystemTenantID.ToUint64(), + encoded, + ) + + return err +} + +func (s *KeyVisualizerServer) getSamplesFromFanOut( + ctx context.Context, timestamp time.Time, +) (*keyvispb.GetSamplesResponse, error) { + + samplePeriod := keyvissettings.SampleInterval.Get(&s.settings.SV) + + dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { + conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) + return keyvispb.NewKeyVisualizerClient(conn), err + } + + nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { + samples, err := client.(keyvispb.KeyVisualizerClient).GetSamples(ctx, + &keyvispb.GetSamplesRequest{ + NodeID: nodeID, + CollectedOnOrAfter: timestamp, + }) + if err != nil { + return nil, err + } + return samples, err + } + + globalSamples := make(map[int64][]keyvispb.Sample) + + responseFn := func(nodeID roachpb.NodeID, resp interface{}) { + nodeResponse := resp.(*keyvispb.GetSamplesResponse) + + // Collection is spread across each node, so samples that belong to the + // same sample period should be aggregated together. + for _, sampleFragment := range nodeResponse.Samples { + tNanos := sampleFragment.SampleTime.Truncate(samplePeriod).UnixNano() + globalSamples[tNanos] = append(globalSamples[tNanos], sampleFragment) + } + } + + errorFn := func(nodeID roachpb.NodeID, err error) { + log.Errorf(ctx, "could not get key visualizer sample for node %d: %v", + nodeID, err) + } + + err := s.status.iterateNodes(ctx, + "iterating nodes for key visualizer samples", dialFn, nodeFn, + responseFn, errorFn) + if err != nil { + return nil, err + } + + var samples []keyvispb.Sample + for sampleTimeNanos, sampleFragments := range globalSamples { + if !verifySampleBoundariesEqual(sampleFragments) { + log.Warningf(ctx, "key visualizer sample boundaries differ between nodes") + } + samples = append(samples, keyvispb.Sample{ + SampleTime: timeutil.Unix(0, sampleTimeNanos), + SpanStats: cumulativeStats(sampleFragments), + }) + } + + return &keyvispb.GetSamplesResponse{Samples: samples}, nil +} + +// verifySampleBoundariesEqual returns true if all the samples collected +// from across the cluster belonging to the same sample period have identical +// spans. +func verifySampleBoundariesEqual(fragments []keyvispb.Sample) bool { + f0 := fragments[0] + sort.Slice(f0.SpanStats, func(a, b int) bool { + return f0.SpanStats[a].Span.Key.Compare(f0.SpanStats[b].Span.Key) == -1 + }) + + for i := 1; i < len(fragments); i++ { + fi := fragments[i] + + if len(f0.SpanStats) != len(fi.SpanStats) { + return false + } + + sort.Slice(fi.SpanStats, func(a, b int) bool { + return fi.SpanStats[a].Span.Key.Compare(fi.SpanStats[b].Span.Key) == -1 + }) + + for b, bucket := range f0.SpanStats { + if !bucket.Span.Equal(fi.SpanStats[b].Span) { + return false + } + } + } + + return true +} + +// unsafeBytesToString constructs a string from a byte slice. It is +// critical that the byte slice not be modified. +func unsafeBytesToString(data []byte) string { + return *(*string)(unsafe.Pointer(&data)) +} + +// cumulativeStats uniques and accumulates all of a sample's +// keyvispb.SpanStats from across the cluster. Stores collect statistics for +// the same spans, and the caller wants the cumulative statistics for those spans. +func cumulativeStats(fragments []keyvispb.Sample) []keyvispb.SpanStats { + var stats []keyvispb.SpanStats + for _, sampleFragment := range fragments { + stats = append(stats, sampleFragment.SpanStats...) + } + + unique := make(map[string]keyvispb.SpanStats) + for _, stat := range stats { + + var sb strings.Builder + sb.WriteString(unsafeBytesToString(stat.Span.Key)) + sb.WriteString(unsafeBytesToString(stat.Span.EndKey)) + spanAsString := sb.String() + + if uniqueStat, ok := unique[spanAsString]; ok { + uniqueStat.Requests += stat.Requests + } else { + unique[spanAsString] = keyvispb.SpanStats{ + Span: stat.Span, + Requests: stat.Requests, + } + } + } + + ret := make([]keyvispb.SpanStats, 0, len(unique)) + for _, stat := range unique { + ret = append(ret, stat) + } + return ret +} + +// GetSamples implements the keyvispb.KeyVisualizerServer interface. +func (s *KeyVisualizerServer) GetSamples( + ctx context.Context, req *keyvispb.GetSamplesRequest, +) (*keyvispb.GetSamplesResponse, error) { + + if req.NodeID == 0 { + return s.getSamplesFromFanOut(ctx, req.CollectedOnOrAfter) + } + + samples := s.node.spanStatsCollector.GetSamples( + req.CollectedOnOrAfter) + + return &keyvispb.GetSamplesResponse{Samples: samples}, nil +} + +// UpdateBoundaries implements the keyvispb.KeyVisualizerServer interface. +func (s *KeyVisualizerServer) UpdateBoundaries( + ctx context.Context, req *keyvispb.UpdateBoundariesRequest, +) (*keyvispb.UpdateBoundariesResponse, error) { + if err := s.saveBoundaries(ctx, req); err != nil { + return nil, err + } + return &keyvispb.UpdateBoundariesResponse{}, nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 3b4d91ad847b..6fe4a0aa3e96 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvprober" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" @@ -154,8 +155,8 @@ type Server struct { tsDB *ts.DB tsServer *ts.Server - // spanStatsServer implements `keyvispb.KeyVisualizerServer` - spanStatsServer *SpanStatsServer + // keyVisualizerServer implements `keyvispb.KeyVisualizerServer` + keyVisualizerServer *KeyVisualizerServer // The Observability Server, used by the Observability Service to subscribe to // CRDB data. @@ -896,16 +897,18 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { serverIterator, spanConfig.reporter, clock, + distSender, + rangestats.NewFetcher(db), ) - spanStatsServer := &SpanStatsServer{ + keyVisualizerServer := &KeyVisualizerServer{ ie: internalExecutor, settings: st, nodeDialer: nodeDialer, status: sStatus, node: node, } - spanStatsAccessor := spanstatskvaccessor.New(spanStatsServer) + keyVisServerAccessor := spanstatskvaccessor.New(keyVisualizerServer) // Instantiate the KV prober. kvProber := kvprober.NewProber(kvprober.Opts{ @@ -984,7 +987,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { nodeDescs: g, systemConfigWatcher: systemConfigWatcher, spanConfigAccessor: spanConfig.kvAccessor, - spanStatsAccessor: spanStatsAccessor, + keyVisServerAccessor: keyVisServerAccessor, nodeDialer: nodeDialer, distSender: distSender, db: db, @@ -1159,7 +1162,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { externalStorageBuilder: externalStorageBuilder, storeGrantCoords: gcoords.Stores, kvMemoryMonitor: kvMemoryMonitor, - spanStatsServer: spanStatsServer, + keyVisualizerServer: keyVisualizerServer, } return lateBoundServer, err @@ -1402,7 +1405,7 @@ func (s *Server) PreStart(ctx context.Context) error { s.migrationServer = migrationServer // only for testing via TestServer // Register the KeyVisualizer Server - keyvispb.RegisterKeyVisualizerServer(s.grpc.Server, s.spanStatsServer) + keyvispb.RegisterKeyVisualizerServer(s.grpc.Server, s.keyVisualizerServer) // Start the RPC server. This opens the RPC/SQL listen socket, // and dispatches the server worker for the RPC. diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index bbc5b4262e79..6741e09ac1ff 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -276,7 +276,7 @@ type sqlServerArgs struct { spanConfigAccessor spanconfig.KVAccessor // Used by the Key Visualizer job. - spanStatsAccessor *spanstatskvaccessor.SpanStatsKVAccessor + keyVisServerAccessor *spanstatskvaccessor.SpanStatsKVAccessor // Used by DistSQLPlanner to dial KV nodes. nodeDialer *nodedialer.Dialer @@ -892,7 +892,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { contentionRegistry.Start(ctx, cfg.stopper) storageEngineClient := kvserver.NewStorageEngineClient(cfg.nodeDialer) - *execCfg = sql.ExecutorConfig{ Settings: cfg.Settings, NodeInfo: nodeInfo, @@ -1238,7 +1237,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if codec.ForSystemTenant() { ri := kvcoord.MakeRangeIterator(cfg.distSender) spanStatsConsumer := spanstatsconsumer.New( - cfg.spanStatsAccessor, + cfg.keyVisServerAccessor, &ri, cfg.Settings, cfg.circularInternalExecutor, diff --git a/pkg/server/serverpb/BUILD.bazel b/pkg/server/serverpb/BUILD.bazel index 460f1d909c47..4d99a58a772b 100644 --- a/pkg/server/serverpb/BUILD.bazel +++ b/pkg/server/serverpb/BUILD.bazel @@ -101,7 +101,10 @@ go_library( embed = [":serverpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/server/serverpb", visibility = ["//visibility:public"], - deps = ["//pkg/util/errorutil"], + deps = [ + "//pkg/roachpb", + "//pkg/util/errorutil", + ], ) go_test( diff --git a/pkg/server/serverpb/status.go b/pkg/server/serverpb/status.go index c33aa890f565..91ec925dec24 100644 --- a/pkg/server/serverpb/status.go +++ b/pkg/server/serverpb/status.go @@ -13,6 +13,7 @@ package serverpb import ( "context" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/errorutil" ) @@ -84,6 +85,9 @@ type TenantStatusServer interface { // NodesUI is used by DB Console. NodesUI(context.Context, *NodesRequest) (*NodesResponseExternal, error) + + // SpanStats is used to access MVCC stats from KV + SpanStats(context.Context, *roachpb.SpanStatsRequest) (*roachpb.SpanStatsResponse, error) } // OptionalNodesStatusServer returns the wrapped NodesStatusServer, if it is diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index fb59daef508b..a98f97e1703a 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -19,6 +19,7 @@ import "jobs/jobspb/jobs.proto"; import "roachpb/data.proto"; import "roachpb/index_usage_stats.proto"; import "roachpb/span_config.proto"; +import "roachpb/span_stats.proto"; import "roachpb/metadata.proto"; import "server/diagnostics/diagnosticspb/diagnostics.proto"; import "server/serverpb/index_recommendations.proto"; @@ -1183,22 +1184,6 @@ message ListDistSQLFlowsResponse { repeated ListActivityError errors = 2 [ (gogoproto.nullable) = false ]; } -message SpanStatsRequest { - string node_id = 1 [ (gogoproto.customname) = "NodeID" ]; - bytes start_key = 2 - [ (gogoproto.casttype) = - "github.com/cockroachdb/cockroach/pkg/roachpb.RKey" ]; - bytes end_key = 3 [ (gogoproto.casttype) = - "github.com/cockroachdb/cockroach/pkg/roachpb.RKey" ]; -} - -message SpanStatsResponse { - int32 range_count = 2; - uint64 approximate_disk_bytes = 3; - cockroach.storage.enginepb.MVCCStats total_stats = 1 - [ (gogoproto.nullable) = false ]; -} - message ProblemRangesRequest { // If left empty, problem ranges for all nodes/stores will be returned. string node_id = 1 [ (gogoproto.customname) = "NodeID" ]; @@ -2180,7 +2165,7 @@ service Status { // in that span. This is designed to compute stats specific to a SQL table: // it will be called with the highest/lowest key for a SQL table, and return // information about the resources on a node used by that table. - rpc SpanStats(SpanStatsRequest) returns (SpanStatsResponse) { + rpc SpanStats(roachpb.SpanStatsRequest) returns (roachpb.SpanStatsResponse) { option (google.api.http) = { post : "/_status/span" body : "*" diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 44c2b9a51b31..8305c69198d8 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -1,4 +1,4 @@ -// Copyright 2022 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -12,214 +12,216 @@ package server import ( "context" - "sort" - "strings" - "time" - "unsafe" + "strconv" - "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvispb" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) -// SpanStatsServer is a concrete implementation of the keyvispb.KeyVisualizerServer interface. -type SpanStatsServer struct { - ie *sql.InternalExecutor - settings *cluster.Settings - nodeDialer *nodedialer.Dialer - status *systemStatusServer - node *Node -} - -var _ keyvispb.KeyVisualizerServer = &SpanStatsServer{} - -func (s *SpanStatsServer) saveBoundaries( - ctx context.Context, req *keyvispb.UpdateBoundariesRequest, -) error { - - encoded, err := protoutil.Marshal(req) +func (s *systemStatusServer) spanStatsFanOut( + ctx context.Context, req *roachpb.SpanStatsRequest, +) (*roachpb.SpanStatsResponse, error) { + res := &roachpb.SpanStatsResponse{} + rSpan := roachpb.RSpan{ + Key: req.StartKey, + EndKey: req.EndKey, + } + nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, s.distSender, rSpan) if err != nil { - return err + return nil, err + } + nodesWithReplica := make(map[roachpb.NodeID]bool) + for _, nodeID := range nodeIDs { + nodesWithReplica[nodeID] = true } - // Nodes are notified about boundary changes via the keyvissubscriber.BoundarySubscriber. - _, err = s.ie.ExecEx( - ctx, - "upsert tenant boundaries", - nil, - sessiondata.InternalExecutorOverride{User: username.RootUserName()}, - `UPSERT INTO system.span_stats_tenant_boundaries( - tenant_id, - boundaries - ) VALUES ($1, $2) - `, - roachpb.SystemTenantID.ToUint64(), - encoded, - ) - - return err -} - -func (s *SpanStatsServer) getSamplesFromFanOut( - ctx context.Context, timestamp time.Time, -) (*keyvispb.GetSamplesResponse, error) { - - dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { - conn, err := s.nodeDialer.Dial(ctx, nodeID, rpc.DefaultClass) - return keyvispb.NewKeyVisualizerClient(conn), err + // We should only fan out to a node if it has replicas for this span. + // A blind fan out would be wasteful. + smartDial := func( + ctx context.Context, + nodeID roachpb.NodeID, + ) (interface{}, error) { + if _, ok := nodesWithReplica[nodeID]; ok { + return s.dialNode(ctx, nodeID) + } + return nil, nil } nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { - samples, err := client.(keyvispb.KeyVisualizerClient).GetSamples(ctx, - &keyvispb.GetSamplesRequest{ - NodeID: nodeID, - CollectedOnOrAfter: timestamp, + // `smartDial` may skip this node, so check to see if the client is nil. + if client == nil { + return &roachpb.SpanStatsResponse{}, nil + } + stats, err := client.(serverpb.StatusClient).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: nodeID.String(), + StartKey: req.StartKey, + EndKey: req.EndKey, }) if err != nil { return nil, err } - return samples, err + return stats, err } - globalSamples := make(map[int64][]keyvispb.Sample) - responseFn := func(nodeID roachpb.NodeID, resp interface{}) { - nodeResponse := resp.(*keyvispb.GetSamplesResponse) - - // Collection is spread across each node, so samples that belong to the - // same sample period should be aggregated together. The collectors - // guarantee that corresponding sample fragments have the same sample time. - for _, sampleFragment := range nodeResponse.Samples { - sampleTime := sampleFragment.SampleTime.UnixNano() - globalSamples[sampleTime] = append(globalSamples[sampleTime], sampleFragment) - } + nodeResponse := resp.(*roachpb.SpanStatsResponse) + res.ApproximateDiskBytes += nodeResponse.ApproximateDiskBytes + res.TotalStats.Add(nodeResponse.TotalStats) + res.RangeCount += nodeResponse.RangeCount } errorFn := func(nodeID roachpb.NodeID, err error) { log.Errorf(ctx, "could not get span stats sample for node %d: %v", nodeID, err) } - err := s.status.iterateNodes(ctx, "iterating nodes for span stats", dialFn, nodeFn, responseFn, errorFn) - - if err != nil { + if err := s.statusServer.iterateNodes( + ctx, + "iterating nodes for span stats", + smartDial, + nodeFn, + responseFn, + errorFn, + ); err != nil { return nil, err } - var samples []keyvispb.Sample - for sampleTimeNanos, sampleFragments := range globalSamples { - if !verifySampleBoundariesEqual(sampleFragments) { - log.Warningf(ctx, "key visualizer sample boundaries differ between nodes") - } - samples = append(samples, keyvispb.Sample{ - SampleTime: timeutil.Unix(0, sampleTimeNanos), - SpanStats: cumulativeStats(sampleFragments), - }) - } - - return &keyvispb.GetSamplesResponse{Samples: samples}, nil + return res, nil } -// verifySampleBoundariesEqual returns true if all the samples collected -// from across the cluster belonging to the same sample period have identical -// spans. -func verifySampleBoundariesEqual(fragments []keyvispb.Sample) bool { +func (s *systemStatusServer) getLocalStats( + ctx context.Context, req *roachpb.SpanStatsRequest, +) (*roachpb.SpanStatsResponse, error) { + res := &roachpb.SpanStatsResponse{} - f0 := fragments[0] - sort.Slice(f0.SpanStats, func(a, b int) bool { - return f0.SpanStats[a].Span.Key.Compare(f0.SpanStats[b].Span.Key) == -1 - }) - - for i := 1; i < len(fragments); i++ { - fi := fragments[i] + sp := roachpb.RSpan{ + Key: req.StartKey, + EndKey: req.EndKey, + } + ri := kvcoord.MakeRangeIterator(s.distSender) + ri.Seek(ctx, sp.Key, kvcoord.Ascending) - if len(f0.SpanStats) != len(fi.SpanStats) { - return false + for { + if !ri.Valid() { + return nil, ri.Error() } - sort.Slice(fi.SpanStats, func(a, b int) bool { - return fi.SpanStats[a].Span.Key.Compare(fi.SpanStats[b].Span.Key) == -1 - }) - - for b, bucket := range f0.SpanStats { - if !bucket.Span.Equal(fi.SpanStats[b].Span) { - return false + desc := ri.Desc() + descSpan := desc.RSpan() + res.RangeCount += 1 + + // Is the descriptor fully contained by the request span? + if sp.ContainsKeyRange(descSpan.Key, desc.EndKey) { + // If so, obtain stats for this range via RangeStats. + rangeStats, err := s.rangeStatsFetcher.RangeStats(ctx, + desc.StartKey.AsRawKey()) + if err != nil { + return nil, err + } + for _, resp := range rangeStats { + res.TotalStats.Add(resp.MVCCStats) } - } - } - return true -} + } else { + // Otherwise, do an MVCC Scan. + // We should only scan the part of the range that our request span + // encompasses. + scanStart := sp.Key + scanEnd := sp.EndKey + // If our request span began before the start of this range, + // start scanning from this range's start key. + if descSpan.Key.Compare(sp.Key) == 1 { + scanStart = descSpan.Key + } + // If our request span ends after the end of this range, + // stop scanning at this range's end key. + if descSpan.EndKey.Compare(sp.EndKey) == -1 { + scanEnd = descSpan.EndKey + } + err := s.stores.VisitStores(func(s *kvserver.Store) error { + stats, err := storage.ComputeStats( + s.TODOEngine(), + scanStart.AsRawKey(), + scanEnd.AsRawKey(), + timeutil.Now().UnixNano(), + ) + + if err != nil { + return err + } + + res.TotalStats.Add(stats) + return nil + }) -// unsafeBytesToString constructs a string from a byte slice. It is -// critical that the byte slice not be modified. -func unsafeBytesToString(data []byte) string { - return *(*string)(unsafe.Pointer(&data)) -} + if err != nil { + return nil, err + } + } -// cumulativeStats uniques and accumulates all of a sample's -// keyvispb.SpanStats from across the cluster. Stores collect statistics for -// the same spans, and the caller wants the cumulative statistics for those spans. -func cumulativeStats(fragments []keyvispb.Sample) []keyvispb.SpanStats { - var stats []keyvispb.SpanStats - for _, sampleFragment := range fragments { - stats = append(stats, sampleFragment.SpanStats...) + if !ri.NeedAnother(sp) { + break + } + ri.Next(ctx) } - unique := make(map[string]keyvispb.SpanStats) - for _, stat := range stats { - - var sb strings.Builder - sb.WriteString(unsafeBytesToString(stat.Span.Key)) - sb.WriteString(unsafeBytesToString(stat.Span.EndKey)) - spanAsString := sb.String() - - if uniqueStat, ok := unique[spanAsString]; ok { - uniqueStat.Requests += stat.Requests - } else { - unique[spanAsString] = keyvispb.SpanStats{ - Span: stat.Span, - Requests: stat.Requests, - } + // Finally, get the approximate disk bytes from each store. + err := s.stores.VisitStores(func(store *kvserver.Store) error { + approxDiskBytes, err := store.TODOEngine().ApproximateDiskBytes(req. + StartKey.AsRawKey(), req.EndKey.AsRawKey()) + if err != nil { + return err } - } + res.ApproximateDiskBytes += approxDiskBytes + return nil + }) - ret := make([]keyvispb.SpanStats, 0, len(unique)) - for _, stat := range unique { - ret = append(ret, stat) + if err != nil { + return nil, err } - return ret + + return res, nil } -// GetSamples implements the keyvispb.KeyVisualizerServer interface. -func (s *SpanStatsServer) GetSamples( - ctx context.Context, req *keyvispb.GetSamplesRequest, -) (*keyvispb.GetSamplesResponse, error) { +// getSpanStatsInternal will return span stats according to the nodeID specified +// by req. If req.NodeID == 0, a fan out is done to collect and sum span stats +// from across the cluster. Otherwise, the node specified will be dialed, +// if the local node isn't already the node specified. If the node specified +// is the local node, span stats are computed and returned. +func (s *systemStatusServer) getSpanStatsInternal( + ctx context.Context, req *roachpb.SpanStatsRequest, +) (*roachpb.SpanStatsResponse, error) { + // Perform a fan out when the requested NodeID is 0. + if req.NodeID == "0" { + return s.spanStatsFanOut(ctx, req) + } - if req.NodeID == 0 { - return s.getSamplesFromFanOut(ctx, req.CollectedOnOrAfter) + // See if the requested node is the local node. + _, local, err := s.statusServer.parseNodeID(req.NodeID) + if err != nil { + return nil, err } - samples := s.node.spanStatsCollector.GetSamples( - req.CollectedOnOrAfter) + // If the requested node is the local node, return stats. + if local { + return s.getLocalStats(ctx, req) + } - return &keyvispb.GetSamplesResponse{Samples: samples}, nil -} + // Otherwise, dial the correct node, and ask for span stats. + nodeID, err := strconv.ParseInt(req.NodeID, 10, 32) + if err != nil { + return nil, err + } -// UpdateBoundaries implements the keyvispb.KeyVisualizerServer interface. -func (s *SpanStatsServer) UpdateBoundaries( - ctx context.Context, req *keyvispb.UpdateBoundariesRequest, -) (*keyvispb.UpdateBoundariesResponse, error) { - if err := s.saveBoundaries(ctx, req); err != nil { + client, err := s.dialNode(ctx, roachpb.NodeID(nodeID)) + if err != nil { return nil, err } - return &keyvispb.UpdateBoundariesResponse{}, nil + return client.SpanStats(ctx, req) } diff --git a/pkg/kv/kvserver/client_status_test.go b/pkg/server/span_stats_test.go similarity index 82% rename from pkg/kv/kvserver/client_status_test.go rename to pkg/server/span_stats_test.go index 69eb92152501..a05597c8e074 100644 --- a/pkg/kv/kvserver/client_status_test.go +++ b/pkg/server/span_stats_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvserver_test +package server import ( "context" @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -26,7 +25,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestComputeStatsForKeySpan(t *testing.T) { +func TestLocalSpanStats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -36,7 +35,7 @@ func TestComputeStatsForKeySpan(t *testing.T) { Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true}, }, }) - s := serv.(*server.TestServer) + s := serv.(*TestServer) defer s.Stopper().Stop(ctx) store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) @@ -68,24 +67,32 @@ func TestComputeStatsForKeySpan(t *testing.T) { for _, tcase := range []struct { startKey string endKey string - expectedRanges int + expectedRanges int32 expectedKeys int64 }{ {"a", "i", 4, 6}, {"a", "c", 1, 3}, {"b", "e", 2, 5}, {"e", "i", 2, 1}, + {"b", "d", 2, 3}, + {"b", "bbb", 1, 2}, } { start, end := tcase.startKey, tcase.endKey - result, err := store.ComputeStatsForKeySpan( - roachpb.RKey(start), roachpb.RKey(end)) + result, err := s.status.getLocalStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", + StartKey: roachpb.RKey(start), + EndKey: roachpb.RKey(end), + }, + ) if err != nil { t.Fatal(err) } - if a, e := result.ReplicaCount, tcase.expectedRanges; a != e { + + if a, e := result.RangeCount, tcase.expectedRanges; a != e { t.Errorf("Expected %d ranges in span [%s - %s], found %d", e, start, end, a) } - if a, e := result.MVCC.LiveCount, tcase.expectedKeys; a != e { + if a, e := result.TotalStats.LiveCount, tcase.expectedKeys; a != e { t.Errorf("Expected %d keys in span [%s - %s], found %d", e, start, end, a) } } diff --git a/pkg/server/status.go b/pkg/server/status.go index fd48262a46c0..f5d5c1054216 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -39,6 +39,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvisstorage" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -499,6 +501,8 @@ type systemStatusServer struct { stores *kvserver.Stores nodeLiveness *liveness.NodeLiveness spanConfigReporter spanconfig.Reporter + distSender *kvcoord.DistSender + rangeStatsFetcher *rangestats.Fetcher } // StmtDiagnosticsRequester is the interface into *stmtdiagnostics.Registry @@ -606,6 +610,8 @@ func newSystemStatusServer( serverIterator ServerIterator, spanConfigReporter spanconfig.Reporter, clock *hlc.Clock, + distSender *kvcoord.DistSender, + rangeStatsFetcher *rangestats.Fetcher, ) *systemStatusServer { server := newStatusServer( ambient, @@ -631,6 +637,8 @@ func newSystemStatusServer( stores: stores, nodeLiveness: nodeLiveness, spanConfigReporter: spanConfigReporter, + distSender: distSender, + rangeStatsFetcher: rangeStatsFetcher, } } @@ -3385,47 +3393,29 @@ func (s *statusServer) ListExecutionInsights( // SpanStats requests the total statistics stored on a node for a given key // span, which may include multiple ranges. -func (s *systemStatusServer) SpanStats( - ctx context.Context, req *serverpb.SpanStatsRequest, -) (*serverpb.SpanStatsResponse, error) { - ctx = forwardSQLIdentityThroughRPCCalls(ctx) +func (s *statusServer) SpanStats( + ctx context.Context, req *roachpb.SpanStatsRequest, +) (*roachpb.SpanStatsResponse, error) { ctx = s.AnnotateCtx(ctx) - if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil { // NB: not using serverError() here since the priv checker // already returns a proper gRPC error status. return nil, err } + return s.sqlServer.tenantConnect.SpanStats(ctx, req) +} - nodeID, local, err := s.parseNodeID(req.NodeID) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, err.Error()) - } - - if !local { - status, err := s.dialNode(ctx, nodeID) - if err != nil { - return nil, serverError(ctx, err) - } - return status.SpanStats(ctx, req) - } - - output := &serverpb.SpanStatsResponse{} - err = s.stores.VisitStores(func(store *kvserver.Store) error { - result, err := store.ComputeStatsForKeySpan(req.StartKey.Next(), req.EndKey) - if err != nil { - return err - } - output.TotalStats.Add(result.MVCC) - output.RangeCount += int32(result.ReplicaCount) - output.ApproximateDiskBytes += result.ApproximateDiskBytes - return nil - }) - if err != nil { - return nil, serverError(ctx, err) +func (s *systemStatusServer) SpanStats( + ctx context.Context, req *roachpb.SpanStatsRequest, +) (*roachpb.SpanStatsResponse, error) { + ctx = forwardSQLIdentityThroughRPCCalls(ctx) + ctx = s.AnnotateCtx(ctx) + if _, err := s.privilegeChecker.requireAdminUser(ctx); err != nil { + // NB: not using serverError() here since the priv checker + // already returns a proper gRPC error status. + return nil, err } - - return output, nil + return s.getSpanStatsInternal(ctx, req) } // Diagnostics returns an anonymized diagnostics report. diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 762f7813effb..15bc475ed4c4 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1364,8 +1364,8 @@ func TestSpanStatsResponse(t *testing.T) { t.Fatal(err) } - var response serverpb.SpanStatsResponse - request := serverpb.SpanStatsRequest{ + var response roachpb.SpanStatsResponse + request := roachpb.SpanStatsRequest{ NodeID: "1", StartKey: []byte(roachpb.RKeyMin), EndKey: []byte(roachpb.RKeyMax), @@ -1394,7 +1394,7 @@ func TestSpanStatsGRPCResponse(t *testing.T) { rpcStopper := stop.NewStopper() defer rpcStopper.Stop(ctx) rpcContext := newRPCTestContext(ctx, ts, ts.RPCContext().Config) - request := serverpb.SpanStatsRequest{ + request := roachpb.SpanStatsRequest{ NodeID: "1", StartKey: []byte(roachpb.RKeyMin), EndKey: []byte(roachpb.RKeyMax), diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 45adac008fc3..ff76e1346b48 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -1110,7 +1110,6 @@ func makeTenantSQLServerArgs( // TODO(irfansharif): hook up NewGrantCoordinatorSQL. var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil - return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil),