Skip to content

Commit

Permalink
server: make the span stats fan-out more fault tolerant
Browse files Browse the repository at this point in the history
This commit adds improved fault tolerance to the span stats fan-out:

1. Errors encountered during the fan-out will not invalidate the
entire request. Now, a span stats fan-out will always return a
roachpb.SpanStatsResponse that has been updated by values from nodes that
service their requests without error. In the extreme case where there's
a failure encountered on every node, an empty response is returned.

Errors that are encountered are logged, and then appended to the response
in the newly added `Errors` field.

2. Nodes must service requests within the timeout passed to `iterateNodes`.
For span stats, the value comes from a new cluster setting:
'server.span_stats.node.timeout', with a default value of 1 minute.

Resolves cockroachdb#106097
Epic: none
Release note (ops change): Span stats requests will return a partial
result if the request encounters any errors. Errors that would have
previously terminated the request are now included in the response.
  • Loading branch information
zachlite committed Aug 18, 2023
1 parent 581d897 commit f137ca2
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 20 deletions.
10 changes: 10 additions & 0 deletions pkg/roachpb/span_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package roachpb

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)
Expand All @@ -30,6 +31,15 @@ var SpanStatsBatchLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

var SpanStatsNodeTimeout = settings.RegisterDurationSetting(
settings.TenantWritable,
"server.span_stats.node.timeout",
"the duration allowed for a single node to return span stats data before"+
" the request is cancelled; if set to 0, there is no timeout",
time.Minute,
settings.NonNegativeDuration,
)

const defaultRangeStatsBatchLimit = 100

// RangeStatsBatchLimit registers the maximum number of ranges to be batched
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/span_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ message SpanStatsResponse {
int32 range_count = 2;
uint64 approximate_disk_bytes = 3;
map<string, SpanStats> span_to_stats = 4;
repeated string errors = 5;
// NEXT ID: 6.
}
1 change: 1 addition & 0 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3297,6 +3297,7 @@ func (s *systemAdminServer) EnqueueRange(
if err := contextutil.RunWithTimeout(ctx, "enqueue range", time.Minute, func(ctx context.Context) error {
return s.server.status.iterateNodes(
ctx, fmt.Sprintf("enqueue r%d in queue %s", req.RangeID, req.Queue),
noTimeout,
dialFn, nodeFn, responseFn, errorFn,
)
}); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/api_v2_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,11 @@ func (a *apiV2Server) listRange(w http.ResponseWriter, r *http.Request) {
}

if err := a.status.iterateNodes(
ctx, fmt.Sprintf("details about range %d", rangeID), dialFn, nodeFn, responseFn, errorFn,
ctx,
fmt.Sprintf("details about range %d", rangeID),
noTimeout,
dialFn, nodeFn,
responseFn, errorFn,
); err != nil {
apiV2InternalError(ctx, err, w)
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/index_usage_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (s *statusServer) IndexUsageStatistics(
// yields an incorrect result.
if err := s.iterateNodes(ctx,
"requesting index usage stats",
noTimeout,
dialFn, fetchIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,6 +195,7 @@ func (s *statusServer) ResetIndexUsageStats(

if err := s.iterateNodes(ctx,
"Resetting index usage stats",
noTimeout,
dialFn, resetIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/server/key_visualizer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@ func (s *KeyVisualizerServer) getSamplesFromFanOut(
}

err := s.status.iterateNodes(ctx,
"iterating nodes for key visualizer samples", dialFn, nodeFn,
responseFn, errorFn)
"iterating nodes for key visualizer samples",
noTimeout,
dialFn,
nodeFn,
responseFn,
errorFn,
)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}

// Instantiate the status API server.
var serverTestingKnobs *TestingKnobs
if cfg.TestingKnobs.Server != nil {
serverTestingKnobs = cfg.TestingKnobs.Server.(*TestingKnobs)
}

sStatus := newSystemStatusServer(
cfg.AmbientCtx,
st,
Expand All @@ -903,6 +908,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
clock,
rangestats.NewFetcher(db),
node,
serverTestingKnobs,
)

keyVisualizerServer := &KeyVisualizerServer{
Expand Down
45 changes: 35 additions & 10 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package server

import (
"context"
"fmt"
"strconv"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -37,8 +38,12 @@ func (s *systemStatusServer) spanStatsFanOut(
res := &roachpb.SpanStatsResponse{
SpanToStats: make(map[string]*roachpb.SpanStats),
}
// Response level error
var respErr error
// Populate SpanToStats with empty values for each span,
// so that clients may still access stats for a specific span
// in the extreme case of an error encountered on every node.
for _, sp := range req.Spans {
res.SpanToStats[sp.String()] = &roachpb.SpanStats{}
}

spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
Expand All @@ -51,13 +56,29 @@ func (s *systemStatusServer) spanStatsFanOut(
ctx context.Context,
nodeID roachpb.NodeID,
) (interface{}, error) {
if s.knobs != nil {
if s.knobs.IterateNodesDialCallback != nil {
if err := s.knobs.IterateNodesDialCallback(nodeID); err != nil {
return nil, err
}
}
}

if _, ok := spansPerNode[nodeID]; ok {
return s.dialNode(ctx, nodeID)
}
return nil, nil
}

nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) {
if s.knobs != nil {
if s.knobs.IterateNodesNodeCallback != nil {
if err := s.knobs.IterateNodesNodeCallback(ctx, nodeID); err != nil {
return nil, err
}
}
}

// `smartDial` may skip this node, so check to see if the client is nil.
// If it is, return nil response.
if client == nil {
Expand All @@ -81,24 +102,28 @@ func (s *systemStatusServer) spanStatsFanOut(
nodeResponse := resp.(*roachpb.SpanStatsResponse)

for spanStr, spanStats := range nodeResponse.SpanToStats {
_, exists := res.SpanToStats[spanStr]
if !exists {
res.SpanToStats[spanStr] = spanStats
} else {
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats)
// We are not counting replicas, so only consider range count
// if it has not been set.
if res.SpanToStats[spanStr].RangeCount == 0 {
res.SpanToStats[spanStr].RangeCount = spanStats.RangeCount
}

res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats)
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
}
}

errorFn := func(nodeID roachpb.NodeID, err error) {
log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err)
respErr = err
errorMessage := fmt.Sprintf("%v", err)
res.Errors = append(res.Errors, errorMessage)
}

timeout := roachpb.SpanStatsNodeTimeout.Get(&s.st.SV)
if err := s.statusServer.iterateNodes(
ctx,
"iterating nodes for span stats",
timeout,
smartDial,
nodeFn,
responseFn,
Expand All @@ -107,7 +132,7 @@ func (s *systemStatusServer) spanStatsFanOut(
return nil, err
}

return res, respErr
return res, nil
}

func (s *systemStatusServer) getLocalStats(
Expand Down
132 changes: 132 additions & 0 deletions pkg/server/span_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -191,6 +192,137 @@ func TestSpanStatsFanOut(t *testing.T) {

}

func TestSpanStatsFanOutFaultTolerance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStressWithIssue(t, 108534)
ctx := context.Background()
const numNodes = 5

type testCase struct {
name string
dialCallback func(nodeID roachpb.NodeID) error
nodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error
assertions func(res *roachpb.SpanStatsResponse)
}

containsError := func(errors []string, testString string) bool {
for _, e := range errors {
if strings.Contains(e, testString) {
return true
}
}
return false
}

testCases := []testCase{
{
// In a complete failure, no node is able to service requests successfully.
name: "complete-fanout-failure",
dialCallback: func(nodeID roachpb.NodeID) error {
// On the 1st and 2nd node, simulate a connection error.
if nodeID == 1 || nodeID == 2 {
return errors.Newf("error dialing node %d", nodeID)
}
return nil
},
nodeCallback: func(ctx context.Context, nodeID roachpb.NodeID) error {
// On the 3rd node, simulate some sort of KV error.
if nodeID == 3 {
return errors.Newf("kv error on node %d", nodeID)
}

// On the 4th and 5th node, simulate a request that takes a very long time.
// In this case, nodeFn will block until the context is cancelled
// i.e. if iterateNodes respects the timeout cluster setting.
if nodeID == 4 || nodeID == 5 {
<-ctx.Done()
// Return an error that mimics the error returned
// when a rpc's context is cancelled:
return errors.Newf("node %d timed out", nodeID)
}
return nil
},
assertions: func(res *roachpb.SpanStatsResponse) {
// Expect to still be able to access SpanToStats for keys.EverythingSpan
// without panicking, even though there was a failure on every node.
require.Equal(t, int64(0), res.SpanToStats[keys.EverythingSpan.String()].TotalStats.LiveCount)
require.Equal(t, 5, len(res.Errors))

require.Equal(t, true, containsError(res.Errors, "error dialing node 1"))
require.Equal(t, true, containsError(res.Errors, "error dialing node 2"))
require.Equal(t, true, containsError(res.Errors, "kv error on node 3"))
require.Equal(t, true, containsError(res.Errors, "node 4 timed out"))
require.Equal(t, true, containsError(res.Errors, "node 5 timed out"))
},
},
{
// In a partial failure, nodes 1, 3, and 4 fail, and nodes 2 and 5 succeed.
name: "partial-fanout-failure",
dialCallback: func(nodeID roachpb.NodeID) error {
if nodeID == 1 {
return errors.Newf("error dialing node %d", nodeID)
}
return nil
},
nodeCallback: func(ctx context.Context, nodeID roachpb.NodeID) error {
if nodeID == 3 {
return errors.Newf("kv error on node %d", nodeID)
}

if nodeID == 4 {
<-ctx.Done()
// Return an error that mimics the error returned
// when a rpc's context is cancelled:
return errors.Newf("node %d timed out", nodeID)
}
return nil
},
assertions: func(res *roachpb.SpanStatsResponse) {
require.Greater(t, res.SpanToStats[keys.EverythingSpan.String()].TotalStats.LiveCount, int64(0))
// 3 nodes could not service their requests.
require.Equal(t, 3, len(res.Errors))

require.Equal(t, true, containsError(res.Errors, "error dialing node 1"))
require.Equal(t, true, containsError(res.Errors, "kv error on node 3"))
require.Equal(t, true, containsError(res.Errors, "node 4 timed out"))

// There should not be any errors for node 2 or node 5.
require.Equal(t, false, containsError(res.Errors, "error dialing node 2"))
require.Equal(t, false, containsError(res.Errors, "node 5 timed out"))
},
},
}

for _, tCase := range testCases {
tCase := tCase
t.Run(tCase.name, func(t *testing.T) {
serverArgs := base.TestServerArgs{}
serverArgs.Knobs.Server = &server.TestingKnobs{
IterateNodesDialCallback: tCase.dialCallback,
IterateNodesNodeCallback: tCase.nodeCallback,
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: serverArgs})
defer tc.Stopper().Stop(ctx)

srv := tc.Server(0)
sqlDB := serverutils.OpenDBConn(t, srv.SQLAddr(), "", false, srv.Stopper())
_, err := sqlDB.Exec("SET CLUSTER SETTING server.span_stats.node.timeout = '3s'")
require.NoError(t, err)

statusServer := srv.(*server.TestServer).StatusServer().(serverpb.StatusServer)
res, err := statusServer.SpanStats(ctx, &roachpb.SpanStatsRequest{
NodeID: "0", // Indicates we want a fan-out.
Spans: []roachpb.Span{keys.EverythingSpan},
})

require.NoError(t, err)
tCase.assertions(res)
})
}
}

// BenchmarkSpanStats measures the cost of collecting span statistics.
func BenchmarkSpanStats(b *testing.B) {
skip.UnderShort(b)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *statusServer) ResetSQLStats(
var fanoutError error

if err := s.iterateNodes(ctx, "reset SQL statistics",
noTimeout,
dialFn,
resetSQLStats,
func(nodeID roachpb.NodeID, resp interface{}) {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (s *statusServer) Statements(
}

if err := s.iterateNodes(ctx, "statement statistics",
noTimeout,
dialFn,
nodeStatement,
func(nodeID roachpb.NodeID, resp interface{}) {
Expand Down
Loading

0 comments on commit f137ca2

Please sign in to comment.