Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: make SpanStats authoritative #104423

Merged
merged 1 commit into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/roachpb/span_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

package roachpb

import "github.com/cockroachdb/cockroach/pkg/settings"
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/settings"
)

// Put span statistics cluster settings here to avoid import cycle.

Expand All @@ -37,3 +41,18 @@ var RangeStatsBatchLimit = settings.RegisterIntSetting(
defaultRangeStatsBatchLimit,
settings.PositiveInt,
)

// RangeDescPageSize controls the page size when iterating through range
// descriptors.
var RangeDescPageSize = settings.RegisterIntSetting(
settings.TenantWritable,
"server.span_stats.range_desc_page_size",
"the page size when iterating through range descriptors",
100,
func(i int64) error {
if i < 5 || i > 25000 {
return fmt.Errorf("expected range_desc_page_size to be in range [5, 25000], got %d", i)
}
return nil
},
)
32 changes: 24 additions & 8 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,8 +1368,9 @@ func (s *adminServer) statsForSpan(
return nil, err
}

// Get a list of node ids and range count for the specified span.
nodeIDs, rangeCount, err := nodeIDsAndRangeCountForSpan(
// Get a list of nodeIDs, range counts, and replica counts per node
// for the specified span.
nodeIDs, rangeCount, replCounts, err := getNodeIDsRangeCountReplCountForSpan(
ctx, s.distSender, rSpan,
)
if err != nil {
Expand Down Expand Up @@ -1439,6 +1440,15 @@ func (s *adminServer) statsForSpan(
return nil, err
}
}

// The semantics of tableStatResponse.ReplicaCount counts replicas
// found for this span returned by a cluster-wide fan-out.
// We can use descriptors to know what the final count _should_ be,
// if we assume every request succeeds (nodes and replicas are reachable).
for _, replCount := range replCounts {
tableStatResponse.ReplicaCount += replCount
}

for remainingResponses := len(nodeIDs); remainingResponses > 0; remainingResponses-- {
select {
case resp := <-responses:
Expand All @@ -1449,6 +1459,10 @@ func (s *adminServer) statsForSpan(
return nil, serverError(ctx, resp.err)
}

// If this node is unreachable,
// it's replicas can not be counted.
tableStatResponse.ReplicaCount -= replCounts[resp.nodeID]

tableStatResponse.MissingNodes = append(
tableStatResponse.MissingNodes,
serverpb.TableStatsResponse_MissingNode{
Expand All @@ -1458,7 +1472,6 @@ func (s *adminServer) statsForSpan(
)
} else {
tableStatResponse.Stats.Add(resp.resp.SpanToStats[span.String()].TotalStats)
tableStatResponse.ReplicaCount += int64(resp.resp.SpanToStats[span.String()].RangeCount)
tableStatResponse.ApproximateDiskBytes += resp.resp.SpanToStats[span.String()].ApproximateDiskBytes
}
case <-ctx.Done():
Expand All @@ -1470,24 +1483,27 @@ func (s *adminServer) statsForSpan(
return &tableStatResponse, nil
}

// Returns the list of node ids for the specified span.
func nodeIDsAndRangeCountForSpan(
// Returns the list of node ids, range count,
// and replica count for the specified span.
func getNodeIDsRangeCountReplCountForSpan(
ctx context.Context, ds *kvcoord.DistSender, rSpan roachpb.RSpan,
) (nodeIDList []roachpb.NodeID, rangeCount int64, _ error) {
) (nodeIDList []roachpb.NodeID, rangeCount int64, replCounts map[roachpb.NodeID]int64, _ error) {
nodeIDs := make(map[roachpb.NodeID]struct{})
replCountForNodeID := make(map[roachpb.NodeID]int64)
ri := kvcoord.MakeRangeIterator(ds)
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)
for ; ri.Valid(); ri.Next(ctx) {
rangeCount++
for _, repl := range ri.Desc().Replicas().Descriptors() {
replCountForNodeID[repl.NodeID]++
nodeIDs[repl.NodeID] = struct{}{}
}
if !ri.NeedAnother(rSpan) {
break
}
}
if err := ri.Error(); err != nil {
return nil, 0, err
return nil, 0, nil, err
}

nodeIDList = make([]roachpb.NodeID, 0, len(nodeIDs))
Expand All @@ -1497,7 +1513,7 @@ func nodeIDsAndRangeCountForSpan(
sort.Slice(nodeIDList, func(i, j int) bool {
return nodeIDList[i] < nodeIDList[j]
})
return nodeIDList, rangeCount, nil
return nodeIDList, rangeCount, replCountForNodeID, nil
}

// Users returns a list of users, stripped of any passwords.
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
serverIterator,
spanConfig.reporter,
clock,
distSender,
rangestats.NewFetcher(db),
node,
)
Expand Down
97 changes: 61 additions & 36 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"strconv"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand All @@ -39,7 +40,7 @@ func (s *systemStatusServer) spanStatsFanOut(
// Response level error
var respErr error

spansPerNode, err := s.getSpansPerNode(ctx, req, s.distSender)
spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,7 +87,6 @@ func (s *systemStatusServer) spanStatsFanOut(
} else {
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats)
res.SpanToStats[spanStr].RangeCount += spanStats.RangeCount
}
}
}
Expand Down Expand Up @@ -114,17 +114,8 @@ func (s *systemStatusServer) getLocalStats(
ctx context.Context, req *roachpb.SpanStatsRequest,
) (*roachpb.SpanStatsResponse, error) {
var res = &roachpb.SpanStatsResponse{SpanToStats: make(map[string]*roachpb.SpanStats)}
ri := kvcoord.MakeRangeIterator(s.distSender)

// For each span
for _, span := range req.Spans {
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}
// Seek to the span's start key.
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)
spanStats, err := s.statsForSpan(ctx, ri, rSpan)
spanStats, err := s.statsForSpan(ctx, span)
if err != nil {
return nil, err
}
Expand All @@ -134,21 +125,35 @@ func (s *systemStatusServer) getLocalStats(
}

func (s *systemStatusServer) statsForSpan(
ctx context.Context, ri kvcoord.RangeIterator, rSpan roachpb.RSpan,
ctx context.Context, span roachpb.Span,
) (*roachpb.SpanStats, error) {
// Seek to the span's start key.
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)

var descriptors []roachpb.RangeDescriptor
scanner := rangedesc.NewScanner(s.db)
pageSize := int(roachpb.RangeDescPageSize.Get(&s.st.SV))
err := scanner.Scan(ctx, pageSize, func() {
// If the underlying txn fails and needs to be retried,
// clear the descriptors we've collected so far.
descriptors = nil
}, span, func(scanned ...roachpb.RangeDescriptor) error {
descriptors = append(descriptors, scanned...)
return nil
})

if err != nil {
return nil, err
}

spanStats := &roachpb.SpanStats{}
var fullyContainedKeysBatch []roachpb.Key
var err error
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}
// Iterate through the span's ranges.
for {
if !ri.Valid() {
return nil, ri.Error()
}
for _, desc := range descriptors {

// Get the descriptor for the current range of the span.
desc := ri.Desc()
descSpan := desc.RSpan()
spanStats.RangeCount += 1

Expand Down Expand Up @@ -200,11 +205,6 @@ func (s *systemStatusServer) statsForSpan(
return nil, err
}
}

if !ri.NeedAnother(rSpan) {
break
}
ri.Next(ctx)
}
// If we still have some remaining ranges, request range stats for the current batch.
if len(fullyContainedKeysBatch) > 0 {
Expand Down Expand Up @@ -270,19 +270,13 @@ func (s *systemStatusServer) getSpanStatsInternal(
}

func (s *systemStatusServer) getSpansPerNode(
ctx context.Context, req *roachpb.SpanStatsRequest, ds *kvcoord.DistSender,
ctx context.Context, req *roachpb.SpanStatsRequest,
) (map[roachpb.NodeID][]roachpb.Span, error) {
// Mapping of node ids to spans with a replica on the node.
spansPerNode := make(map[roachpb.NodeID][]roachpb.Span)

// Iterate over the request spans.
pageSize := int(roachpb.RangeDescPageSize.Get(&s.st.SV))
for _, span := range req.Spans {
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}
// Get the node ids belonging to the span.
nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, ds, rSpan)
nodeIDs, err := nodeIDsForSpan(ctx, s.db, span, pageSize)
if err != nil {
return nil, err
}
Expand All @@ -294,6 +288,37 @@ func (s *systemStatusServer) getSpansPerNode(
return spansPerNode, nil
}

// nodeIDsForSpan returns a list of nodeIDs that contain at least one replica
// for the argument span. Descriptors are found via ScanMetaKVs.
func nodeIDsForSpan(
ctx context.Context, db *kv.DB, span roachpb.Span, pageSize int,
) ([]roachpb.NodeID, error) {
nodeIDs := make(map[roachpb.NodeID]struct{})
scanner := rangedesc.NewScanner(db)
err := scanner.Scan(ctx, pageSize, func() {
// If the underlying txn fails and needs to be retried,
// clear the nodeIDs we've collected so far.
nodeIDs = map[roachpb.NodeID]struct{}{}
}, span, func(scanned ...roachpb.RangeDescriptor) error {
for _, desc := range scanned {
for _, repl := range desc.Replicas().Descriptors() {
nodeIDs[repl.NodeID] = struct{}{}
}
}
return nil
})
if err != nil {
return nil, err
}

nodeIDList := make([]roachpb.NodeID, 0, len(nodeIDs))
for id := range nodeIDs {
nodeIDList = append(nodeIDList, id)
}

return nodeIDList, nil
}

func flushBatchedContainedKeys(
ctx context.Context,
fetcher *rangestats.Fetcher,
Expand Down
Loading