Skip to content

Commit

Permalink
server: make SpanStats authoritative
Browse files Browse the repository at this point in the history
This commit guarantees that SpanStats uses up-to-date range
descriptors on all nodes. The dependency on DistSender is removed
and is replaced with a dependency on ScanMetaKVs.

ScanMetaKVs is used to:
1) Locate nodes that house replicas of a span to avoid cluster-wide fan-outs.
2) Find Range Descriptors that touch a span, to build a SpanStatsResponse.

This commit also fixes cockroachdb#103809, where a SpanStatsResponse incorrectly returned
the replica_count for a span, instead of the range_count.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-24928
Issue: cockroachdb#103957
Release note (bug fix): SpanStats is no longer subject to stale
information, and should be considered authoritative.
  • Loading branch information
Zach Lite committed Jun 6, 2023
1 parent 7207c46 commit 32ab463
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 88 deletions.
18 changes: 10 additions & 8 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ func (s *adminServer) statsForSpan(
}

// Get a list of node ids and range count for the specified span.
nodeIDs, rangeCount, err := nodeIDsAndRangeCountForSpan(
nodeIDs, rangeCount, replCount, err := getNodeIDsRangeCountReplCountForSpan(
ctx, s.distSender, rSpan,
)
if err != nil {
Expand All @@ -1394,7 +1394,8 @@ func (s *adminServer) statsForSpan(
// the advantage of populating the cache (without the disadvantage of
// potentially returning stale data).
// See GitHub #5435 for some discussion.
RangeCount: rangeCount,
RangeCount: rangeCount,
ReplicaCount: replCount,
}
type nodeResponse struct {
nodeID roachpb.NodeID
Expand Down Expand Up @@ -1458,7 +1459,6 @@ func (s *adminServer) statsForSpan(
)
} else {
tableStatResponse.Stats.Add(resp.resp.SpanToStats[span.String()].TotalStats)
tableStatResponse.ReplicaCount += int64(resp.resp.SpanToStats[span.String()].RangeCount)
tableStatResponse.ApproximateDiskBytes += resp.resp.SpanToStats[span.String()].ApproximateDiskBytes
}
case <-ctx.Done():
Expand All @@ -1470,24 +1470,26 @@ func (s *adminServer) statsForSpan(
return &tableStatResponse, nil
}

// Returns the list of node ids for the specified span.
func nodeIDsAndRangeCountForSpan(
// Returns the list of node ids, range count,
// and replica count for the specified span.
func getNodeIDsRangeCountReplCountForSpan(
ctx context.Context, ds *kvcoord.DistSender, rSpan roachpb.RSpan,
) (nodeIDList []roachpb.NodeID, rangeCount int64, _ error) {
) (nodeIDList []roachpb.NodeID, rangeCount int64, replCount int64, _ error) {
nodeIDs := make(map[roachpb.NodeID]struct{})
ri := kvcoord.MakeRangeIterator(ds)
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)
for ; ri.Valid(); ri.Next(ctx) {
rangeCount++
for _, repl := range ri.Desc().Replicas().Descriptors() {
replCount++
nodeIDs[repl.NodeID] = struct{}{}
}
if !ri.NeedAnother(rSpan) {
break
}
}
if err := ri.Error(); err != nil {
return nil, 0, err
return nil, 0, 0, err
}

nodeIDList = make([]roachpb.NodeID, 0, len(nodeIDs))
Expand All @@ -1497,7 +1499,7 @@ func nodeIDsAndRangeCountForSpan(
sort.Slice(nodeIDList, func(i, j int) bool {
return nodeIDList[i] < nodeIDList[j]
})
return nodeIDList, rangeCount, nil
return nodeIDList, rangeCount, replCount, nil
}

// Users returns a list of users, stripped of any passwords.
Expand Down
1 change: 0 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
serverIterator,
spanConfig.reporter,
clock,
distSender,
rangestats.NewFetcher(db),
node,
)
Expand Down
119 changes: 75 additions & 44 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
"strconv"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -39,7 +40,7 @@ func (s *systemStatusServer) spanStatsFanOut(
// Response level error
var respErr error

spansPerNode, err := s.getSpansPerNode(ctx, req, s.distSender)
spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,7 +87,6 @@ func (s *systemStatusServer) spanStatsFanOut(
} else {
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats)
res.SpanToStats[spanStr].RangeCount += spanStats.RangeCount
}
}
}
Expand Down Expand Up @@ -114,41 +114,48 @@ func (s *systemStatusServer) getLocalStats(
ctx context.Context, req *roachpb.SpanStatsRequest,
) (*roachpb.SpanStatsResponse, error) {
var res = &roachpb.SpanStatsResponse{SpanToStats: make(map[string]*roachpb.SpanStats)}
ri := kvcoord.MakeRangeIterator(s.distSender)

// For each span
for _, span := range req.Spans {
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
for _, span := range req.Spans {
spanStats, err := s.statsForSpan(ctx, txn, span)
if err != nil {
return err
}
// Note: Even if this function retries, this side effect is safe.
res.SpanToStats[span.String()] = spanStats
}
// Seek to the span's start key.
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)
spanStats, err := s.statsForSpan(ctx, ri, rSpan)
return nil
})
return res, err
}

func (s *systemStatusServer) statsForSpan(
ctx context.Context, txn *kv.Txn, span roachpb.Span,
) (*roachpb.SpanStats, error) {
kvs, err := kvclient.ScanMetaKVs(ctx, txn, span)
if err != nil {
return nil, err
}

var descriptors []roachpb.RangeDescriptor
for _, metaKv := range kvs {
var rangeDescriptor roachpb.RangeDescriptor
err := metaKv.ValueProto(&rangeDescriptor)
if err != nil {
return nil, err
}
res.SpanToStats[span.String()] = spanStats
descriptors = append(descriptors, rangeDescriptor)
}
return res, nil
}

func (s *systemStatusServer) statsForSpan(
ctx context.Context, ri kvcoord.RangeIterator, rSpan roachpb.RSpan,
) (*roachpb.SpanStats, error) {
// Seek to the span's start key.
ri.Seek(ctx, rSpan.Key, kvcoord.Ascending)
spanStats := &roachpb.SpanStats{}
var fullyContainedKeysBatch []roachpb.Key
var err error
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}
// Iterate through the span's ranges.
for {
if !ri.Valid() {
return nil, ri.Error()
}
for _, desc := range descriptors {

// Get the descriptor for the current range of the span.
desc := ri.Desc()
descSpan := desc.RSpan()
spanStats.RangeCount += 1

Expand Down Expand Up @@ -200,11 +207,6 @@ func (s *systemStatusServer) statsForSpan(
return nil, err
}
}

if !ri.NeedAnother(rSpan) {
break
}
ri.Next(ctx)
}
// If we still have some remaining ranges, request range stats for the current batch.
if len(fullyContainedKeysBatch) > 0 {
Expand Down Expand Up @@ -270,28 +272,57 @@ func (s *systemStatusServer) getSpanStatsInternal(
}

func (s *systemStatusServer) getSpansPerNode(
ctx context.Context, req *roachpb.SpanStatsRequest, ds *kvcoord.DistSender,
ctx context.Context, req *roachpb.SpanStatsRequest,
) (map[roachpb.NodeID][]roachpb.Span, error) {
// Mapping of node ids to spans with a replica on the node.
spansPerNode := make(map[roachpb.NodeID][]roachpb.Span)

// Iterate over the request spans.
for _, span := range req.Spans {
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
// Get the node ids belonging to the span.
var nodeIDs []roachpb.NodeID
err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
// Iterate over the request spans.
for _, span := range req.Spans {
nodeIDs, err = nodeIDsForSpan(ctx, txn, span)
if err != nil {
return err
}
// Add the span to the map for each of the node IDs it belongs to.
for _, nodeID := range nodeIDs {
spansPerNode[nodeID] = append(spansPerNode[nodeID], span)
}
}
// Get the node ids belonging to the span.
nodeIDs, _, err := nodeIDsAndRangeCountForSpan(ctx, ds, rSpan)
return nil
})

return spansPerNode, err
}

// nodeIDsForSpan returns a list of nodeIDs that contain at least one replica
// for the argument span. Descriptors are found via ScanMetaKVs.
func nodeIDsForSpan(ctx context.Context, txn *kv.Txn, span roachpb.Span) ([]roachpb.NodeID, error) {
nodeIDs := make(map[roachpb.NodeID]struct{})
kvs, err := kvclient.ScanMetaKVs(ctx, txn, span)
if err != nil {
return nil, err
}

for _, metaKV := range kvs {
var rangeDescriptor roachpb.RangeDescriptor
err := metaKV.ValueProto(&rangeDescriptor)
if err != nil {
return nil, err
}
// Add the span to the map for each of the node IDs it belongs to.
for _, nodeID := range nodeIDs {
spansPerNode[nodeID] = append(spansPerNode[nodeID], span)
for _, repl := range rangeDescriptor.Replicas().Descriptors() {
nodeIDs[repl.NodeID] = struct{}{}
}
}
return spansPerNode, nil

nodeIDList := make([]roachpb.NodeID, 0, len(nodeIDs))
for id := range nodeIDs {
nodeIDList = append(nodeIDList, id)
}

return nodeIDList, nil
}

func flushBatchedContainedKeys(
Expand Down
82 changes: 51 additions & 31 deletions pkg/server/span_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server
package server_test

import (
"bytes"
"context"
"fmt"
"strconv"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand All @@ -34,15 +34,13 @@ import (
func TestLocalSpanStats(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{DisableCanAckBeforeApplication: true},
},
})
s := serv.(*TestServer)
defer s.Stopper().Stop(ctx)
numNodes := 3
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

s := tc.Server(0).(*server.TestServer)

store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)
// Create a number of ranges using splits.
Expand Down Expand Up @@ -104,15 +102,15 @@ func TestLocalSpanStats(t *testing.T) {
}

testCases := []testCase{
{spans[0], 4, 6},
{spans[1], 1, 3},
{spans[2], 2, 5},
{spans[3], 2, 1},
{spans[4], 2, 3},
{spans[5], 1, 2},
{spans[0], 4, int64(numNodes * 6)},
{spans[1], 1, int64(numNodes * 3)},
{spans[2], 2, int64(numNodes * 5)},
{spans[3], 2, int64(numNodes * 1)},
{spans[4], 2, int64(numNodes * 3)},
{spans[5], 1, int64(numNodes * 2)},
}
// Multi-span request
multiResult, err := s.status.getLocalStats(ctx,
multiResult, err := s.StatusServer().(serverpb.StatusServer).SpanStats(ctx,
&roachpb.SpanStatsRequest{
NodeID: "0",
Spans: spans,
Expand All @@ -127,10 +125,30 @@ func TestLocalSpanStats(t *testing.T) {

// Assert expected values from multi-span request
spanStats := multiResult.SpanToStats[tcase.span.String()]
require.Equal(t, spanStats.RangeCount, tcase.expectedRanges, fmt.Sprintf(
"Multi-span: expected %d ranges in span [%s - %s], found %d", tcase.expectedRanges, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.RangeCount))
require.Equal(t, spanStats.TotalStats.LiveCount, tcase.expectedKeys, fmt.Sprintf(
"Multi-span: expected %d keys in span [%s - %s], found %d", tcase.expectedKeys, rSpan.Key.String(), rSpan.EndKey.String(), spanStats.TotalStats.LiveCount))
require.Equal(
t,
tcase.expectedRanges,
spanStats.RangeCount,
fmt.Sprintf(
"Multi-span: expected %d ranges in span [%s - %s], found %d",
tcase.expectedRanges,
rSpan.Key.String(),
rSpan.EndKey.String(),
spanStats.RangeCount,
),
)
require.Equal(
t,
tcase.expectedKeys,
spanStats.TotalStats.LiveCount,
fmt.Sprintf(
"Multi-span: expected %d keys in span [%s - %s], found %d",
tcase.expectedKeys,
rSpan.Key.String(),
rSpan.EndKey.String(),
spanStats.TotalStats.LiveCount,
),
)
}
}

Expand Down Expand Up @@ -193,22 +211,24 @@ func BenchmarkSpanStats(b *testing.B) {
var spans []roachpb.Span

// Create a table spans
var spanStartKey roachpb.Key
for i := 0; i < ts.numSpans; i++ {
spanStartKey = nil
spanStartKey := makeKey(
tenantPrefix,
[]byte{byte(i)},
)
spanEndKey := makeKey(
tenantPrefix,
[]byte{byte(i + 1)},
)
// Create ranges.
var key roachpb.Key
for j := 0; j < ts.numRanges; j++ {
key = makeKey(tenantPrefix, []byte(strconv.Itoa(i*j)))
if spanStartKey == nil {
spanStartKey = key
}
_, _, err := tc.Server(0).SplitRange(key)
splitKey := makeKey(spanStartKey, []byte{byte(j)})
_, _, err := tc.Server(0).SplitRange(splitKey)
require.NoError(b, err)
}
spans = append(spans, roachpb.Span{
Key: spanStartKey,
EndKey: key,
EndKey: spanEndKey,
})
}

Expand Down
Loading

0 comments on commit 32ab463

Please sign in to comment.