diff --git a/pkg/roachpb/span_stats.go b/pkg/roachpb/span_stats.go index 3518820bb9e5..fe2a0f48cde4 100644 --- a/pkg/roachpb/span_stats.go +++ b/pkg/roachpb/span_stats.go @@ -10,7 +10,11 @@ package roachpb -import "github.com/cockroachdb/cockroach/pkg/settings" +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/settings" +) // Put span statistics cluster settings here to avoid import cycle. @@ -37,3 +41,18 @@ var RangeStatsBatchLimit = settings.RegisterIntSetting( defaultRangeStatsBatchLimit, settings.PositiveInt, ) + +// RangeDescPageSize controls the page size when iterating through range +// descriptors. +var RangeDescPageSize = settings.RegisterIntSetting( + settings.TenantWritable, + "server.span_stats.range_desc_page_size", + "the page size when iterating through range descriptors", + 100, + func(i int64) error { + if i < 5 || i > 25000 { + return fmt.Errorf("expected range_desc_page_size to be in range [5, 25000], got %d", i) + } + return nil + }, +) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 94f42e75e24d..92832e11188c 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1368,8 +1368,9 @@ func (s *adminServer) statsForSpan( return nil, err } - // Get a list of node ids and range count for the specified span. - nodeIDs, rangeCount, err := nodeIDsAndRangeCountForSpan( + // Get a list of nodeIDs, range counts, and replica counts per node + // for the specified span. + nodeIDs, rangeCount, replCounts, err := getNodeIDsRangeCountReplCountForSpan( ctx, s.distSender, rSpan, ) if err != nil { @@ -1439,6 +1440,15 @@ func (s *adminServer) statsForSpan( return nil, err } } + + // The semantics of tableStatResponse.ReplicaCount counts replicas + // found for this span returned by a cluster-wide fan-out. + // We can use descriptors to know what the final count _should_ be, + // if we assume every request succeeds (nodes and replicas are reachable). + for _, replCount := range replCounts { + tableStatResponse.ReplicaCount += replCount + } + for remainingResponses := len(nodeIDs); remainingResponses > 0; remainingResponses-- { select { case resp := <-responses: @@ -1449,6 +1459,10 @@ func (s *adminServer) statsForSpan( return nil, serverError(ctx, resp.err) } + // If this node is unreachable, + // it's replicas can not be counted. + tableStatResponse.ReplicaCount -= replCounts[resp.nodeID] + tableStatResponse.MissingNodes = append( tableStatResponse.MissingNodes, serverpb.TableStatsResponse_MissingNode{ @@ -1458,7 +1472,6 @@ func (s *adminServer) statsForSpan( ) } else { tableStatResponse.Stats.Add(resp.resp.SpanToStats[span.String()].TotalStats) - tableStatResponse.ReplicaCount += int64(resp.resp.SpanToStats[span.String()].RangeCount) tableStatResponse.ApproximateDiskBytes += resp.resp.SpanToStats[span.String()].ApproximateDiskBytes } case <-ctx.Done(): @@ -1470,16 +1483,19 @@ func (s *adminServer) statsForSpan( return &tableStatResponse, nil } -// Returns the list of node ids for the specified span. -func nodeIDsAndRangeCountForSpan( +// Returns the list of node ids, range count, +// and replica count for the specified span. +func getNodeIDsRangeCountReplCountForSpan( ctx context.Context, ds *kvcoord.DistSender, rSpan roachpb.RSpan, -) (nodeIDList []roachpb.NodeID, rangeCount int64, _ error) { +) (nodeIDList []roachpb.NodeID, rangeCount int64, replCounts map[roachpb.NodeID]int64, _ error) { nodeIDs := make(map[roachpb.NodeID]struct{}) + replCountForNodeID := make(map[roachpb.NodeID]int64) ri := kvcoord.MakeRangeIterator(ds) ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) for ; ri.Valid(); ri.Next(ctx) { rangeCount++ for _, repl := range ri.Desc().Replicas().Descriptors() { + replCountForNodeID[repl.NodeID]++ nodeIDs[repl.NodeID] = struct{}{} } if !ri.NeedAnother(rSpan) { @@ -1487,7 +1503,7 @@ func nodeIDsAndRangeCountForSpan( } } if err := ri.Error(); err != nil { - return nil, 0, err + return nil, 0, nil, err } nodeIDList = make([]roachpb.NodeID, 0, len(nodeIDs)) @@ -1497,7 +1513,7 @@ func nodeIDsAndRangeCountForSpan( sort.Slice(nodeIDList, func(i, j int) bool { return nodeIDList[i] < nodeIDList[j] }) - return nodeIDList, rangeCount, nil + return nodeIDList, rangeCount, replCountForNodeID, nil } // Users returns a list of users, stripped of any passwords. diff --git a/pkg/server/server.go b/pkg/server/server.go index aba8e51157bc..f54bb5ca1108 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -998,7 +998,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { serverIterator, spanConfig.reporter, clock, - distSender, rangestats.NewFetcher(db), node, ) diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 6403f62f4f0d..7d7bd42fda24 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -15,13 +15,14 @@ import ( "strconv" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/rangedesc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -39,7 +40,7 @@ func (s *systemStatusServer) spanStatsFanOut( // Response level error var respErr error - spansPerNode, err := s.getSpansPerNode(ctx, req, s.distSender) + spansPerNode, err := s.getSpansPerNode(ctx, req) if err != nil { return nil, err } @@ -86,7 +87,6 @@ func (s *systemStatusServer) spanStatsFanOut( } else { res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats) - res.SpanToStats[spanStr].RangeCount += spanStats.RangeCount } } } @@ -114,17 +114,8 @@ func (s *systemStatusServer) getLocalStats( ctx context.Context, req *roachpb.SpanStatsRequest, ) (*roachpb.SpanStatsResponse, error) { var res = &roachpb.SpanStatsResponse{SpanToStats: make(map[string]*roachpb.SpanStats)} - ri := kvcoord.MakeRangeIterator(s.distSender) - - // For each span for _, span := range req.Spans { - rSpan, err := keys.SpanAddr(span) - if err != nil { - return nil, err - } - // Seek to the span's start key. - ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) - spanStats, err := s.statsForSpan(ctx, ri, rSpan) + spanStats, err := s.statsForSpan(ctx, span) if err != nil { return nil, err } @@ -134,21 +125,35 @@ func (s *systemStatusServer) getLocalStats( } func (s *systemStatusServer) statsForSpan( - ctx context.Context, ri kvcoord.RangeIterator, rSpan roachpb.RSpan, + ctx context.Context, span roachpb.Span, ) (*roachpb.SpanStats, error) { - // Seek to the span's start key. - ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) + + var descriptors []roachpb.RangeDescriptor + scanner := rangedesc.NewScanner(s.db) + pageSize := int(roachpb.RangeDescPageSize.Get(&s.st.SV)) + err := scanner.Scan(ctx, pageSize, func() { + // If the underlying txn fails and needs to be retried, + // clear the descriptors we've collected so far. + descriptors = nil + }, span, func(scanned ...roachpb.RangeDescriptor) error { + descriptors = append(descriptors, scanned...) + return nil + }) + + if err != nil { + return nil, err + } + spanStats := &roachpb.SpanStats{} var fullyContainedKeysBatch []roachpb.Key - var err error + rSpan, err := keys.SpanAddr(span) + if err != nil { + return nil, err + } // Iterate through the span's ranges. - for { - if !ri.Valid() { - return nil, ri.Error() - } + for _, desc := range descriptors { // Get the descriptor for the current range of the span. - desc := ri.Desc() descSpan := desc.RSpan() spanStats.RangeCount += 1 @@ -200,11 +205,6 @@ func (s *systemStatusServer) statsForSpan( return nil, err } } - - if !ri.NeedAnother(rSpan) { - break - } - ri.Next(ctx) } // If we still have some remaining ranges, request range stats for the current batch. if len(fullyContainedKeysBatch) > 0 { @@ -270,19 +270,13 @@ func (s *systemStatusServer) getSpanStatsInternal( } func (s *systemStatusServer) getSpansPerNode( - ctx context.Context, req *roachpb.SpanStatsRequest, ds *kvcoord.DistSender, + ctx context.Context, req *roachpb.SpanStatsRequest, ) (map[roachpb.NodeID][]roachpb.Span, error) { // Mapping of node ids to spans with a replica on the node. spansPerNode := make(map[roachpb.NodeID][]roachpb.Span) - - // Iterate over the request spans. + pageSize := int(roachpb.RangeDescPageSize.Get(&s.st.SV)) for _, span := range req.Spans { - rSpan, err := keys.SpanAddr(span) - if err != nil { - return nil, err - } - // Get the node ids belonging to the span. - nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, ds, rSpan) + nodeIDs, err := nodeIDsForSpan(ctx, s.db, span, pageSize) if err != nil { return nil, err } @@ -294,6 +288,37 @@ func (s *systemStatusServer) getSpansPerNode( return spansPerNode, nil } +// nodeIDsForSpan returns a list of nodeIDs that contain at least one replica +// for the argument span. Descriptors are found via ScanMetaKVs. +func nodeIDsForSpan( + ctx context.Context, db *kv.DB, span roachpb.Span, pageSize int, +) ([]roachpb.NodeID, error) { + nodeIDs := make(map[roachpb.NodeID]struct{}) + scanner := rangedesc.NewScanner(db) + err := scanner.Scan(ctx, pageSize, func() { + // If the underlying txn fails and needs to be retried, + // clear the nodeIDs we've collected so far. + nodeIDs = map[roachpb.NodeID]struct{}{} + }, span, func(scanned ...roachpb.RangeDescriptor) error { + for _, desc := range scanned { + for _, repl := range desc.Replicas().Descriptors() { + nodeIDs[repl.NodeID] = struct{}{} + } + } + return nil + }) + if err != nil { + return nil, err + } + + nodeIDList := make([]roachpb.NodeID, 0, len(nodeIDs)) + for id := range nodeIDs { + nodeIDList = append(nodeIDList, id) + } + + return nodeIDList, nil +} + func flushBatchedContainedKeys( ctx context.Context, fetcher *rangestats.Fetcher, diff --git a/pkg/server/span_stats_test.go b/pkg/server/span_stats_test.go index 2d5ab3b3ba56..b66eec552655 100644 --- a/pkg/server/span_stats_test.go +++ b/pkg/server/span_stats_test.go @@ -8,41 +8,79 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package server +package server_test import ( "bytes" "context" "fmt" - "strconv" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) -func TestLocalSpanStats(t *testing.T) { +func TestSpanStatsMetaScan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true}, + testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{}) + defer testCluster.Stopper().Stop(context.Background()) + s := testCluster.Server(0) + + testSpans := []roachpb.Span{ + { + Key: keys.Meta1Prefix, + EndKey: keys.Meta1KeyMax, }, - }) - s := serv.(*TestServer) - defer s.Stopper().Stop(ctx) + { + Key: keys.LocalMax, + EndKey: keys.Meta2KeyMax, + }, + { + Key: keys.Meta2Prefix, + EndKey: keys.Meta2KeyMax, + }, + } + + // SpanStats should have no problem finding descriptors for + // spans up to and including Meta2KeyMax. + for _, span := range testSpans { + res, err := s.StatusServer().(serverpb.StatusServer).SpanStats(ctx, + &roachpb.SpanStatsRequest{ + NodeID: "0", + Spans: []roachpb.Span{ + span, + }, + }, + ) + require.NoError(t, err) + require.Equal(t, int32(1), res.SpanToStats[span.String()].RangeCount) + } +} + +func TestLocalSpanStats(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t) + ctx := context.Background() + numNodes := 3 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(0).(*server.TestServer) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) // Create a number of ranges using splits. @@ -104,15 +142,15 @@ func TestLocalSpanStats(t *testing.T) { } testCases := []testCase{ - {spans[0], 4, 6}, - {spans[1], 1, 3}, - {spans[2], 2, 5}, - {spans[3], 2, 1}, - {spans[4], 2, 3}, - {spans[5], 1, 2}, + {spans[0], 4, int64(numNodes * 6)}, + {spans[1], 1, int64(numNodes * 3)}, + {spans[2], 2, int64(numNodes * 5)}, + {spans[3], 2, int64(numNodes * 1)}, + {spans[4], 2, int64(numNodes * 3)}, + {spans[5], 1, int64(numNodes * 2)}, } // Multi-span request - multiResult, err := s.status.getLocalStats(ctx, + multiResult, err := s.StatusServer().(serverpb.StatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ NodeID: "0", Spans: spans, @@ -127,10 +165,30 @@ func TestLocalSpanStats(t *testing.T) { // Assert expected values from multi-span request spanStats := multiResult.SpanToStats[tcase.span.String()] - require.Equal(t, spanStats.RangeCount, tcase.expectedRanges, fmt.Sprintf( - "Multi-span: expected %d ranges in span [%s - %s], found %d", tcase.expectedRanges, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.RangeCount)) - require.Equal(t, spanStats.TotalStats.LiveCount, tcase.expectedKeys, fmt.Sprintf( - "Multi-span: expected %d keys in span [%s - %s], found %d", tcase.expectedKeys, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.TotalStats.LiveCount)) + require.Equal( + t, + tcase.expectedRanges, + spanStats.RangeCount, + fmt.Sprintf( + "Multi-span: expected %d ranges in span [%s - %s], found %d", + tcase.expectedRanges, + rSpan.Key.String(), + rSpan.EndKey.String(), + spanStats.RangeCount, + ), + ) + require.Equal( + t, + tcase.expectedKeys, + spanStats.TotalStats.LiveCount, + fmt.Sprintf( + "Multi-span: expected %d keys in span [%s - %s], found %d", + tcase.expectedKeys, + rSpan.Key.String(), + rSpan.EndKey.String(), + spanStats.TotalStats.LiveCount, + ), + ) } } @@ -193,22 +251,24 @@ func BenchmarkSpanStats(b *testing.B) { var spans []roachpb.Span // Create a table spans - var spanStartKey roachpb.Key for i := 0; i < ts.numSpans; i++ { - spanStartKey = nil + spanStartKey := makeKey( + tenantPrefix, + []byte{byte(i)}, + ) + spanEndKey := makeKey( + tenantPrefix, + []byte{byte(i + 1)}, + ) // Create ranges. - var key roachpb.Key for j := 0; j < ts.numRanges; j++ { - key = makeKey(tenantPrefix, []byte(strconv.Itoa(i*j))) - if spanStartKey == nil { - spanStartKey = key - } - _, _, err := tc.Server(0).SplitRange(key) + splitKey := makeKey(spanStartKey, []byte{byte(j)}) + _, _, err := tc.Server(0).SplitRange(splitKey) require.NoError(b, err) } spans = append(spans, roachpb.Span{ Key: spanStartKey, - EndKey: key, + EndKey: spanEndKey, }) } diff --git a/pkg/server/status.go b/pkg/server/status.go index 51106754f466..f1aef2a2b6f9 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvisstorage" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" @@ -509,7 +508,6 @@ type systemStatusServer struct { stores *kvserver.Stores nodeLiveness *liveness.NodeLiveness spanConfigReporter spanconfig.Reporter - distSender *kvcoord.DistSender rangeStatsFetcher *rangestats.Fetcher node *Node } @@ -619,7 +617,6 @@ func newSystemStatusServer( serverIterator ServerIterator, spanConfigReporter spanconfig.Reporter, clock *hlc.Clock, - distSender *kvcoord.DistSender, rangeStatsFetcher *rangestats.Fetcher, node *Node, ) *systemStatusServer { @@ -647,7 +644,6 @@ func newSystemStatusServer( stores: stores, nodeLiveness: nodeLiveness, spanConfigReporter: spanConfigReporter, - distSender: distSender, rangeStatsFetcher: rangeStatsFetcher, node: node, }