From 32ab463bc242a10b150e2dd73c04156d8d27bef0 Mon Sep 17 00:00:00 2001 From: Zach Lite Date: Tue, 6 Jun 2023 13:22:38 -0400 Subject: [PATCH] server: make SpanStats authoritative This commit guarantees that SpanStats uses up-to-date range descriptors on all nodes. The dependency on DistSender is removed and is replaced with a dependency on ScanMetaKVs. ScanMetaKVs is used to: 1) Locate nodes that house replicas of a span to avoid cluster-wide fan-outs. 2) Find Range Descriptors that touch a span, to build a SpanStatsResponse. This commit also fixes #103809, where a SpanStatsResponse incorrectly returned the replica_count for a span, instead of the range_count. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-24928 Issue: https://github.com/cockroachdb/cockroach/issues/103957 Release note (bug fix): SpanStats is no longer subject to stale information, and should be considered authoritative. --- pkg/server/admin.go | 18 ++--- pkg/server/server.go | 1 - pkg/server/span_stats_server.go | 119 ++++++++++++++++++++------------ pkg/server/span_stats_test.go | 82 +++++++++++++--------- pkg/server/status.go | 4 -- 5 files changed, 136 insertions(+), 88 deletions(-) diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 69a85785958e..c4713e2b45b7 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1369,7 +1369,7 @@ func (s *adminServer) statsForSpan( } // Get a list of node ids and range count for the specified span. - nodeIDs, rangeCount, err := nodeIDsAndRangeCountForSpan( + nodeIDs, rangeCount, replCount, err := getNodeIDsRangeCountReplCountForSpan( ctx, s.distSender, rSpan, ) if err != nil { @@ -1394,7 +1394,8 @@ func (s *adminServer) statsForSpan( // the advantage of populating the cache (without the disadvantage of // potentially returning stale data). // See GitHub #5435 for some discussion. - RangeCount: rangeCount, + RangeCount: rangeCount, + ReplicaCount: replCount, } type nodeResponse struct { nodeID roachpb.NodeID @@ -1458,7 +1459,6 @@ func (s *adminServer) statsForSpan( ) } else { tableStatResponse.Stats.Add(resp.resp.SpanToStats[span.String()].TotalStats) - tableStatResponse.ReplicaCount += int64(resp.resp.SpanToStats[span.String()].RangeCount) tableStatResponse.ApproximateDiskBytes += resp.resp.SpanToStats[span.String()].ApproximateDiskBytes } case <-ctx.Done(): @@ -1470,16 +1470,18 @@ func (s *adminServer) statsForSpan( return &tableStatResponse, nil } -// Returns the list of node ids for the specified span. -func nodeIDsAndRangeCountForSpan( +// Returns the list of node ids, range count, +// and replica count for the specified span. +func getNodeIDsRangeCountReplCountForSpan( ctx context.Context, ds *kvcoord.DistSender, rSpan roachpb.RSpan, -) (nodeIDList []roachpb.NodeID, rangeCount int64, _ error) { +) (nodeIDList []roachpb.NodeID, rangeCount int64, replCount int64, _ error) { nodeIDs := make(map[roachpb.NodeID]struct{}) ri := kvcoord.MakeRangeIterator(ds) ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) for ; ri.Valid(); ri.Next(ctx) { rangeCount++ for _, repl := range ri.Desc().Replicas().Descriptors() { + replCount++ nodeIDs[repl.NodeID] = struct{}{} } if !ri.NeedAnother(rSpan) { @@ -1487,7 +1489,7 @@ func nodeIDsAndRangeCountForSpan( } } if err := ri.Error(); err != nil { - return nil, 0, err + return nil, 0, 0, err } nodeIDList = make([]roachpb.NodeID, 0, len(nodeIDs)) @@ -1497,7 +1499,7 @@ func nodeIDsAndRangeCountForSpan( sort.Slice(nodeIDList, func(i, j int) bool { return nodeIDList[i] < nodeIDList[j] }) - return nodeIDList, rangeCount, nil + return nodeIDList, rangeCount, replCount, nil } // Users returns a list of users, stripped of any passwords. diff --git a/pkg/server/server.go b/pkg/server/server.go index 7f0640c56ae2..0ff3985ef9be 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -932,7 +932,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { serverIterator, spanConfig.reporter, clock, - distSender, rangestats.NewFetcher(db), node, ) diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 6403f62f4f0d..8ee48afd0e25 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -15,7 +15,8 @@ import ( "strconv" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -39,7 +40,7 @@ func (s *systemStatusServer) spanStatsFanOut( // Response level error var respErr error - spansPerNode, err := s.getSpansPerNode(ctx, req, s.distSender) + spansPerNode, err := s.getSpansPerNode(ctx, req) if err != nil { return nil, err } @@ -86,7 +87,6 @@ func (s *systemStatusServer) spanStatsFanOut( } else { res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats) - res.SpanToStats[spanStr].RangeCount += spanStats.RangeCount } } } @@ -114,41 +114,48 @@ func (s *systemStatusServer) getLocalStats( ctx context.Context, req *roachpb.SpanStatsRequest, ) (*roachpb.SpanStatsResponse, error) { var res = &roachpb.SpanStatsResponse{SpanToStats: make(map[string]*roachpb.SpanStats)} - ri := kvcoord.MakeRangeIterator(s.distSender) - - // For each span - for _, span := range req.Spans { - rSpan, err := keys.SpanAddr(span) - if err != nil { - return nil, err + err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + for _, span := range req.Spans { + spanStats, err := s.statsForSpan(ctx, txn, span) + if err != nil { + return err + } + // Note: Even if this function retries, this side effect is safe. + res.SpanToStats[span.String()] = spanStats } - // Seek to the span's start key. - ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) - spanStats, err := s.statsForSpan(ctx, ri, rSpan) + return nil + }) + return res, err +} + +func (s *systemStatusServer) statsForSpan( + ctx context.Context, txn *kv.Txn, span roachpb.Span, +) (*roachpb.SpanStats, error) { + kvs, err := kvclient.ScanMetaKVs(ctx, txn, span) + if err != nil { + return nil, err + } + + var descriptors []roachpb.RangeDescriptor + for _, metaKv := range kvs { + var rangeDescriptor roachpb.RangeDescriptor + err := metaKv.ValueProto(&rangeDescriptor) if err != nil { return nil, err } - res.SpanToStats[span.String()] = spanStats + descriptors = append(descriptors, rangeDescriptor) } - return res, nil -} -func (s *systemStatusServer) statsForSpan( - ctx context.Context, ri kvcoord.RangeIterator, rSpan roachpb.RSpan, -) (*roachpb.SpanStats, error) { - // Seek to the span's start key. - ri.Seek(ctx, rSpan.Key, kvcoord.Ascending) spanStats := &roachpb.SpanStats{} var fullyContainedKeysBatch []roachpb.Key - var err error + rSpan, err := keys.SpanAddr(span) + if err != nil { + return nil, err + } // Iterate through the span's ranges. - for { - if !ri.Valid() { - return nil, ri.Error() - } + for _, desc := range descriptors { // Get the descriptor for the current range of the span. - desc := ri.Desc() descSpan := desc.RSpan() spanStats.RangeCount += 1 @@ -200,11 +207,6 @@ func (s *systemStatusServer) statsForSpan( return nil, err } } - - if !ri.NeedAnother(rSpan) { - break - } - ri.Next(ctx) } // If we still have some remaining ranges, request range stats for the current batch. if len(fullyContainedKeysBatch) > 0 { @@ -270,28 +272,57 @@ func (s *systemStatusServer) getSpanStatsInternal( } func (s *systemStatusServer) getSpansPerNode( - ctx context.Context, req *roachpb.SpanStatsRequest, ds *kvcoord.DistSender, + ctx context.Context, req *roachpb.SpanStatsRequest, ) (map[roachpb.NodeID][]roachpb.Span, error) { // Mapping of node ids to spans with a replica on the node. spansPerNode := make(map[roachpb.NodeID][]roachpb.Span) - // Iterate over the request spans. - for _, span := range req.Spans { - rSpan, err := keys.SpanAddr(span) - if err != nil { - return nil, err + // Get the node ids belonging to the span. + var nodeIDs []roachpb.NodeID + err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + // Iterate over the request spans. + for _, span := range req.Spans { + nodeIDs, err = nodeIDsForSpan(ctx, txn, span) + if err != nil { + return err + } + // Add the span to the map for each of the node IDs it belongs to. + for _, nodeID := range nodeIDs { + spansPerNode[nodeID] = append(spansPerNode[nodeID], span) + } } - // Get the node ids belonging to the span. - nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, ds, rSpan) + return nil + }) + + return spansPerNode, err +} + +// nodeIDsForSpan returns a list of nodeIDs that contain at least one replica +// for the argument span. Descriptors are found via ScanMetaKVs. +func nodeIDsForSpan(ctx context.Context, txn *kv.Txn, span roachpb.Span) ([]roachpb.NodeID, error) { + nodeIDs := make(map[roachpb.NodeID]struct{}) + kvs, err := kvclient.ScanMetaKVs(ctx, txn, span) + if err != nil { + return nil, err + } + + for _, metaKV := range kvs { + var rangeDescriptor roachpb.RangeDescriptor + err := metaKV.ValueProto(&rangeDescriptor) if err != nil { return nil, err } - // Add the span to the map for each of the node IDs it belongs to. - for _, nodeID := range nodeIDs { - spansPerNode[nodeID] = append(spansPerNode[nodeID], span) + for _, repl := range rangeDescriptor.Replicas().Descriptors() { + nodeIDs[repl.NodeID] = struct{}{} } } - return spansPerNode, nil + + nodeIDList := make([]roachpb.NodeID, 0, len(nodeIDs)) + for id := range nodeIDs { + nodeIDList = append(nodeIDList, id) + } + + return nodeIDList, nil } func flushBatchedContainedKeys( diff --git a/pkg/server/span_stats_test.go b/pkg/server/span_stats_test.go index 2d5ab3b3ba56..de5bc2f2dc4e 100644 --- a/pkg/server/span_stats_test.go +++ b/pkg/server/span_stats_test.go @@ -8,23 +8,23 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package server +package server_test import ( "bytes" "context" "fmt" - "strconv" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -34,15 +34,13 @@ import ( func TestLocalSpanStats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true}, - }, - }) - s := serv.(*TestServer) - defer s.Stopper().Stop(ctx) + numNodes := 3 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(0).(*server.TestServer) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) require.NoError(t, err) // Create a number of ranges using splits. @@ -104,15 +102,15 @@ func TestLocalSpanStats(t *testing.T) { } testCases := []testCase{ - {spans[0], 4, 6}, - {spans[1], 1, 3}, - {spans[2], 2, 5}, - {spans[3], 2, 1}, - {spans[4], 2, 3}, - {spans[5], 1, 2}, + {spans[0], 4, int64(numNodes * 6)}, + {spans[1], 1, int64(numNodes * 3)}, + {spans[2], 2, int64(numNodes * 5)}, + {spans[3], 2, int64(numNodes * 1)}, + {spans[4], 2, int64(numNodes * 3)}, + {spans[5], 1, int64(numNodes * 2)}, } // Multi-span request - multiResult, err := s.status.getLocalStats(ctx, + multiResult, err := s.StatusServer().(serverpb.StatusServer).SpanStats(ctx, &roachpb.SpanStatsRequest{ NodeID: "0", Spans: spans, @@ -127,10 +125,30 @@ func TestLocalSpanStats(t *testing.T) { // Assert expected values from multi-span request spanStats := multiResult.SpanToStats[tcase.span.String()] - require.Equal(t, spanStats.RangeCount, tcase.expectedRanges, fmt.Sprintf( - "Multi-span: expected %d ranges in span [%s - %s], found %d", tcase.expectedRanges, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.RangeCount)) - require.Equal(t, spanStats.TotalStats.LiveCount, tcase.expectedKeys, fmt.Sprintf( - "Multi-span: expected %d keys in span [%s - %s], found %d", tcase.expectedKeys, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.TotalStats.LiveCount)) + require.Equal( + t, + tcase.expectedRanges, + spanStats.RangeCount, + fmt.Sprintf( + "Multi-span: expected %d ranges in span [%s - %s], found %d", + tcase.expectedRanges, + rSpan.Key.String(), + rSpan.EndKey.String(), + spanStats.RangeCount, + ), + ) + require.Equal( + t, + tcase.expectedKeys, + spanStats.TotalStats.LiveCount, + fmt.Sprintf( + "Multi-span: expected %d keys in span [%s - %s], found %d", + tcase.expectedKeys, + rSpan.Key.String(), + rSpan.EndKey.String(), + spanStats.TotalStats.LiveCount, + ), + ) } } @@ -193,22 +211,24 @@ func BenchmarkSpanStats(b *testing.B) { var spans []roachpb.Span // Create a table spans - var spanStartKey roachpb.Key for i := 0; i < ts.numSpans; i++ { - spanStartKey = nil + spanStartKey := makeKey( + tenantPrefix, + []byte{byte(i)}, + ) + spanEndKey := makeKey( + tenantPrefix, + []byte{byte(i + 1)}, + ) // Create ranges. - var key roachpb.Key for j := 0; j < ts.numRanges; j++ { - key = makeKey(tenantPrefix, []byte(strconv.Itoa(i*j))) - if spanStartKey == nil { - spanStartKey = key - } - _, _, err := tc.Server(0).SplitRange(key) + splitKey := makeKey(spanStartKey, []byte{byte(j)}) + _, _, err := tc.Server(0).SplitRange(splitKey) require.NoError(b, err) } spans = append(spans, roachpb.Span{ Key: spanStartKey, - EndKey: key, + EndKey: spanEndKey, }) } diff --git a/pkg/server/status.go b/pkg/server/status.go index 51106754f466..f1aef2a2b6f9 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keyvisualizer/keyvisstorage" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" @@ -509,7 +508,6 @@ type systemStatusServer struct { stores *kvserver.Stores nodeLiveness *liveness.NodeLiveness spanConfigReporter spanconfig.Reporter - distSender *kvcoord.DistSender rangeStatsFetcher *rangestats.Fetcher node *Node } @@ -619,7 +617,6 @@ func newSystemStatusServer( serverIterator ServerIterator, spanConfigReporter spanconfig.Reporter, clock *hlc.Clock, - distSender *kvcoord.DistSender, rangeStatsFetcher *rangestats.Fetcher, node *Node, ) *systemStatusServer { @@ -647,7 +644,6 @@ func newSystemStatusServer( stores: stores, nodeLiveness: nodeLiveness, spanConfigReporter: spanConfigReporter, - distSender: distSender, rangeStatsFetcher: rangeStatsFetcher, node: node, }