Skip to content

Commit

Permalink
server: make SpanStats authoritative
Browse files Browse the repository at this point in the history
This commit guarantees that SpanStats uses up-to-date range
descriptors on all nodes. The dependency on DistSender is removed
and is replaced with a dependency on rangedesc.Scanner.

rangedesc.Scanner is used to:
1) Locate nodes that house replicas of a span to avoid cluster-wide fan-outs.
2) Find Range Descriptors that touch a span, to build a SpanStatsResponse.

This commit also fixes cockroachdb#103809, where a SpanStatsResponse incorrectly returned
the replica_count for a span, instead of the range_count.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-24928
Issue: cockroachdb#103957
Release note (bug fix): SpanStats is no longer subject to stale
information, and should be considered authoritative.
  • Loading branch information
Zach Lite committed Jun 14, 2023
1 parent 0caad15 commit c4ae806
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 81 deletions.
21 changes: 20 additions & 1 deletion pkg/roachpb/span_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

package roachpb

import "github.com/cockroachdb/cockroach/pkg/settings"
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/settings"
)

// Put span statistics cluster settings here to avoid import cycle.

Expand All @@ -37,3 +41,18 @@ var RangeStatsBatchLimit = settings.RegisterIntSetting(
defaultRangeStatsBatchLimit,
settings.PositiveInt,
)

// RangeDescPageSize controls the page size when iterating through range
// descriptors.
var RangeDescPageSize = settings.RegisterIntSetting(
settings.TenantWritable,
"server.span_stats.range_desc_page_size",
"the page size when iterating through range descriptors",
100,
func(i int64) error {
if i < 5 || i > 25000 {
return fmt.Errorf("expected range_desc_page_size to be in range [5, 25000], got %d", i)
}
return nil
},
)
32 changes: 24 additions & 8 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,8 +1368,9 @@ func (s *adminServer) statsForSpan(
return nil, err
}

// Get a list of node ids and range count for the specified span.
nodeIDs, rangeCount, err := nodeIDsAndRangeCountForSpan(
// Get a list of nodeIDs, range counts, and replica counts per node
// for the specified span.
nodeIDs, rangeCount, replCounts, err := getNodeIDsRangeCountReplCountForSpan(
ctx, s.distSender, rSpan,
)
if err != nil {
Expand Down Expand Up @@ -1439,6 +1440,15 @@ func (s *adminServer) statsForSpan(
return nil, err
}
}

// The semantics of tableStatResponse.ReplicaCount counts replicas
// found for this span returned by a cluster-wide fan-out.
// We can use descriptors to know what the final count _should_ be,
// if we assume every request succeeds (nodes and replicas are reachable).
for _, replCount := range replCounts {
tableStatResponse.ReplicaCount += replCount
}

for remainingResponses := len(nodeIDs); remainingResponses > 0; remainingResponses-- {
select {
case resp := <-responses:
Expand All @@ -1449,6 +1459,10 @@ func (s *adminServer) statsForSpan(
return nil, serverError(ctx, resp.err)
}

// If this node is unreachable,
// it's replicas can not be counted.
tableStatResponse.ReplicaCount -= replCounts[resp.nodeID]

tableStatResponse.MissingNodes = append(
tableStatResponse.MissingNodes,
serverpb.TableStatsResponse_MissingNode{
Expand All @@ -1458,7 +1472,6 @@ func (s *adminServer) statsForSpan(
)
} else {
tableStatResponse.Stats.Add(resp.resp.SpanToStats[span.String()].TotalStats)
tableStatResponse.ReplicaCount += int64(resp.resp.SpanToStats[span.String()].RangeCount)
tableStatResponse.ApproximateDiskBytes += resp.resp.SpanToStats[span.String()].ApproximateDiskBytes
}
case <-ctx.Done():
Expand All @@ -1470,24 +1483,27 @@ func (s *adminServer) statsForSpan(
return &tableStatResponse, nil
}

// Returns the list of node ids for the specified span.
func nodeIDsAndRangeCountForSpan(
// Returns the list of node ids, range count,
// and replica count for the specified span.
func getNodeIDsRangeCountReplCountForSpan(
ctx context.Context, ds *kvcoord.DistSender, rSpan roachpb.RSpan,
) (nodeIDList []roachpb.NodeID, rangeCount int64, _ error) {
) (nodeIDList []roachpb.NodeID, rangeCount int64, replCounts map[roachpb.NodeID]int64, _ error) {
nodeIDs := make(map[roachpb.NodeID]struct{})
replCountForNodeID := make(map[roachpb.NodeID]int64)
ri := kvcoord.MakeRangeIterator(ds)
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)
for ; ri.Valid(); ri.Next(ctx) {
rangeCount++
for _, repl := range ri.Desc().Replicas().Descriptors() {
replCountForNodeID[repl.NodeID]++
nodeIDs[repl.NodeID] = struct{}{}
}
if !ri.NeedAnother(rSpan) {
break
}
}
if err := ri.Error(); err != nil {
return nil, 0, err
return nil, 0, nil, err
}

nodeIDList = make([]roachpb.NodeID, 0, len(nodeIDs))
Expand All @@ -1497,7 +1513,7 @@ func nodeIDsAndRangeCountForSpan(
sort.Slice(nodeIDList, func(i, j int) bool {
return nodeIDList[i] < nodeIDList[j]
})
return nodeIDList, rangeCount, nil
return nodeIDList, rangeCount, replCountForNodeID, nil
}

// Users returns a list of users, stripped of any passwords.
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
serverIterator,
spanConfig.reporter,
clock,
distSender,
rangestats.NewFetcher(db),
node,
)
Expand Down
97 changes: 61 additions & 36 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"strconv"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand All @@ -39,7 +40,7 @@ func (s *systemStatusServer) spanStatsFanOut(
// Response level error
var respErr error

spansPerNode, err := s.getSpansPerNode(ctx, req, s.distSender)
spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,7 +87,6 @@ func (s *systemStatusServer) spanStatsFanOut(
} else {
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats)
res.SpanToStats[spanStr].RangeCount += spanStats.RangeCount
}
}
}
Expand Down Expand Up @@ -114,17 +114,8 @@ func (s *systemStatusServer) getLocalStats(
ctx context.Context, req *roachpb.SpanStatsRequest,
) (*roachpb.SpanStatsResponse, error) {
var res = &roachpb.SpanStatsResponse{SpanToStats: make(map[string]*roachpb.SpanStats)}
ri := kvcoord.MakeRangeIterator(s.distSender)

// For each span
for _, span := range req.Spans {
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}
// Seek to the span's start key.
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)
spanStats, err := s.statsForSpan(ctx, ri, rSpan)
spanStats, err := s.statsForSpan(ctx, span)
if err != nil {
return nil, err
}
Expand All @@ -134,21 +125,35 @@ func (s *systemStatusServer) getLocalStats(
}

func (s *systemStatusServer) statsForSpan(
ctx context.Context, ri kvcoord.RangeIterator, rSpan roachpb.RSpan,
ctx context.Context, span roachpb.Span,
) (*roachpb.SpanStats, error) {
// Seek to the span's start key.
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)

var descriptors []roachpb.RangeDescriptor
scanner := rangedesc.NewScanner(s.db)
pageSize := int(roachpb.RangeDescPageSize.Get(&s.st.SV))
err := scanner.Scan(ctx, pageSize, func() {
// If the underlying txn fails and needs to be retried,
// clear the descriptors we've collected so far.
descriptors = nil
}, span, func(scanned ...roachpb.RangeDescriptor) error {
descriptors = append(descriptors, scanned...)
return nil
})

if err != nil {
return nil, err
}

spanStats := &roachpb.SpanStats{}
var fullyContainedKeysBatch []roachpb.Key
var err error
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}
// Iterate through the span's ranges.
for {
if !ri.Valid() {
return nil, ri.Error()
}
for _, desc := range descriptors {

// Get the descriptor for the current range of the span.
desc := ri.Desc()
descSpan := desc.RSpan()
spanStats.RangeCount += 1

Expand Down Expand Up @@ -200,11 +205,6 @@ func (s *systemStatusServer) statsForSpan(
return nil, err
}
}

if !ri.NeedAnother(rSpan) {
break
}
ri.Next(ctx)
}
// If we still have some remaining ranges, request range stats for the current batch.
if len(fullyContainedKeysBatch) > 0 {
Expand Down Expand Up @@ -270,19 +270,13 @@ func (s *systemStatusServer) getSpanStatsInternal(
}

func (s *systemStatusServer) getSpansPerNode(
ctx context.Context, req *roachpb.SpanStatsRequest, ds *kvcoord.DistSender,
ctx context.Context, req *roachpb.SpanStatsRequest,
) (map[roachpb.NodeID][]roachpb.Span, error) {
// Mapping of node ids to spans with a replica on the node.
spansPerNode := make(map[roachpb.NodeID][]roachpb.Span)

// Iterate over the request spans.
pageSize := int(roachpb.RangeDescPageSize.Get(&s.st.SV))
for _, span := range req.Spans {
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}
// Get the node ids belonging to the span.
nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, ds, rSpan)
nodeIDs, err := nodeIDsForSpan(ctx, s.db, span, pageSize)
if err != nil {
return nil, err
}
Expand All @@ -294,6 +288,37 @@ func (s *systemStatusServer) getSpansPerNode(
return spansPerNode, nil
}

// nodeIDsForSpan returns a list of nodeIDs that contain at least one replica
// for the argument span. Descriptors are found via ScanMetaKVs.
func nodeIDsForSpan(
ctx context.Context, db *kv.DB, span roachpb.Span, pageSize int,
) ([]roachpb.NodeID, error) {
nodeIDs := make(map[roachpb.NodeID]struct{})
scanner := rangedesc.NewScanner(db)
err := scanner.Scan(ctx, pageSize, func() {
// If the underlying txn fails and needs to be retried,
// clear the nodeIDs we've collected so far.
nodeIDs = map[roachpb.NodeID]struct{}{}
}, span, func(scanned ...roachpb.RangeDescriptor) error {
for _, desc := range scanned {
for _, repl := range desc.Replicas().Descriptors() {
nodeIDs[repl.NodeID] = struct{}{}
}
}
return nil
})
if err != nil {
return nil, err
}

nodeIDList := make([]roachpb.NodeID, 0, len(nodeIDs))
for id := range nodeIDs {
nodeIDList = append(nodeIDList, id)
}

return nodeIDList, nil
}

func flushBatchedContainedKeys(
ctx context.Context,
fetcher *rangestats.Fetcher,
Expand Down
Loading

0 comments on commit c4ae806

Please sign in to comment.