Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: server: make the span stats fan-out more fault tolerant #109008

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/roachpb/span_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package roachpb

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)
Expand All @@ -30,6 +31,15 @@ var SpanStatsBatchLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

var SpanStatsNodeTimeout = settings.RegisterDurationSetting(
settings.TenantWritable,
"server.span_stats.node.timeout",
"the duration allowed for a single node to return span stats data before"+
" the request is cancelled; if set to 0, there is no timeout",
time.Minute,
settings.NonNegativeDuration,
)

const defaultRangeStatsBatchLimit = 100

// RangeStatsBatchLimit registers the maximum number of ranges to be batched
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/span_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ message SpanStatsResponse {
int32 range_count = 2;
uint64 approximate_disk_bytes = 3;
map<string, SpanStats> span_to_stats = 4;
repeated string errors = 5;
// NEXT ID: 6.
}
1 change: 1 addition & 0 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3297,6 +3297,7 @@ func (s *systemAdminServer) EnqueueRange(
if err := contextutil.RunWithTimeout(ctx, "enqueue range", time.Minute, func(ctx context.Context) error {
return s.server.status.iterateNodes(
ctx, fmt.Sprintf("enqueue r%d in queue %s", req.RangeID, req.Queue),
noTimeout,
dialFn, nodeFn, responseFn, errorFn,
)
}); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/api_v2_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,11 @@ func (a *apiV2Server) listRange(w http.ResponseWriter, r *http.Request) {
}

if err := a.status.iterateNodes(
ctx, fmt.Sprintf("details about range %d", rangeID), dialFn, nodeFn, responseFn, errorFn,
ctx,
fmt.Sprintf("details about range %d", rangeID),
noTimeout,
dialFn, nodeFn,
responseFn, errorFn,
); err != nil {
apiV2InternalError(ctx, err, w)
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/index_usage_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (s *statusServer) IndexUsageStatistics(
// yields an incorrect result.
if err := s.iterateNodes(ctx,
"requesting index usage stats",
noTimeout,
dialFn, fetchIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,6 +195,7 @@ func (s *statusServer) ResetIndexUsageStats(

if err := s.iterateNodes(ctx,
"Resetting index usage stats",
noTimeout,
dialFn, resetIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/server/key_visualizer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@ func (s *KeyVisualizerServer) getSamplesFromFanOut(
}

err := s.status.iterateNodes(ctx,
"iterating nodes for key visualizer samples", dialFn, nodeFn,
responseFn, errorFn)
"iterating nodes for key visualizer samples",
noTimeout,
dialFn,
nodeFn,
responseFn,
errorFn,
)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}

// Instantiate the status API server.
var serverTestingKnobs *TestingKnobs
if cfg.TestingKnobs.Server != nil {
serverTestingKnobs = cfg.TestingKnobs.Server.(*TestingKnobs)
}

sStatus := newSystemStatusServer(
cfg.AmbientCtx,
st,
Expand All @@ -903,6 +908,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
clock,
rangestats.NewFetcher(db),
node,
serverTestingKnobs,
)

keyVisualizerServer := &KeyVisualizerServer{
Expand Down
45 changes: 35 additions & 10 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package server

import (
"context"
"fmt"
"strconv"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -37,8 +38,12 @@ func (s *systemStatusServer) spanStatsFanOut(
res := &roachpb.SpanStatsResponse{
SpanToStats: make(map[string]*roachpb.SpanStats),
}
// Response level error
var respErr error
// Populate SpanToStats with empty values for each span,
// so that clients may still access stats for a specific span
// in the extreme case of an error encountered on every node.
for _, sp := range req.Spans {
res.SpanToStats[sp.String()] = &roachpb.SpanStats{}
}

spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
Expand All @@ -51,13 +56,29 @@ func (s *systemStatusServer) spanStatsFanOut(
ctx context.Context,
nodeID roachpb.NodeID,
) (interface{}, error) {
if s.knobs != nil {
if s.knobs.IterateNodesDialCallback != nil {
if err := s.knobs.IterateNodesDialCallback(nodeID); err != nil {
return nil, err
}
}
}

if _, ok := spansPerNode[nodeID]; ok {
return s.dialNode(ctx, nodeID)
}
return nil, nil
}

nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) {
if s.knobs != nil {
if s.knobs.IterateNodesNodeCallback != nil {
if err := s.knobs.IterateNodesNodeCallback(ctx, nodeID); err != nil {
return nil, err
}
}
}

// `smartDial` may skip this node, so check to see if the client is nil.
// If it is, return nil response.
if client == nil {
Expand All @@ -81,24 +102,28 @@ func (s *systemStatusServer) spanStatsFanOut(
nodeResponse := resp.(*roachpb.SpanStatsResponse)

for spanStr, spanStats := range nodeResponse.SpanToStats {
_, exists := res.SpanToStats[spanStr]
if !exists {
res.SpanToStats[spanStr] = spanStats
} else {
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats)
// We are not counting replicas, so only consider range count
// if it has not been set.
if res.SpanToStats[spanStr].RangeCount == 0 {
res.SpanToStats[spanStr].RangeCount = spanStats.RangeCount
}

res.SpanToStats[spanStr].TotalStats.Add(spanStats.TotalStats)
res.SpanToStats[spanStr].ApproximateDiskBytes += spanStats.ApproximateDiskBytes
}
}

errorFn := func(nodeID roachpb.NodeID, err error) {
log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err)
respErr = err
errorMessage := fmt.Sprintf("%v", err)
res.Errors = append(res.Errors, errorMessage)
}

timeout := roachpb.SpanStatsNodeTimeout.Get(&s.st.SV)
if err := s.statusServer.iterateNodes(
ctx,
"iterating nodes for span stats",
timeout,
smartDial,
nodeFn,
responseFn,
Expand All @@ -107,7 +132,7 @@ func (s *systemStatusServer) spanStatsFanOut(
return nil, err
}

return res, respErr
return res, nil
}

func (s *systemStatusServer) getLocalStats(
Expand Down
132 changes: 132 additions & 0 deletions pkg/server/span_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -191,6 +192,137 @@ func TestSpanStatsFanOut(t *testing.T) {

}

func TestSpanStatsFanOutFaultTolerance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStressWithIssue(t, 108534)
ctx := context.Background()
const numNodes = 5

type testCase struct {
name string
dialCallback func(nodeID roachpb.NodeID) error
nodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error
assertions func(res *roachpb.SpanStatsResponse)
}

containsError := func(errors []string, testString string) bool {
for _, e := range errors {
if strings.Contains(e, testString) {
return true
}
}
return false
}

testCases := []testCase{
{
// In a complete failure, no node is able to service requests successfully.
name: "complete-fanout-failure",
dialCallback: func(nodeID roachpb.NodeID) error {
// On the 1st and 2nd node, simulate a connection error.
if nodeID == 1 || nodeID == 2 {
return errors.Newf("error dialing node %d", nodeID)
}
return nil
},
nodeCallback: func(ctx context.Context, nodeID roachpb.NodeID) error {
// On the 3rd node, simulate some sort of KV error.
if nodeID == 3 {
return errors.Newf("kv error on node %d", nodeID)
}

// On the 4th and 5th node, simulate a request that takes a very long time.
// In this case, nodeFn will block until the context is cancelled
// i.e. if iterateNodes respects the timeout cluster setting.
if nodeID == 4 || nodeID == 5 {
<-ctx.Done()
// Return an error that mimics the error returned
// when a rpc's context is cancelled:
return errors.Newf("node %d timed out", nodeID)
}
return nil
},
assertions: func(res *roachpb.SpanStatsResponse) {
// Expect to still be able to access SpanToStats for keys.EverythingSpan
// without panicking, even though there was a failure on every node.
require.Equal(t, int64(0), res.SpanToStats[keys.EverythingSpan.String()].TotalStats.LiveCount)
require.Equal(t, 5, len(res.Errors))

require.Equal(t, true, containsError(res.Errors, "error dialing node 1"))
require.Equal(t, true, containsError(res.Errors, "error dialing node 2"))
require.Equal(t, true, containsError(res.Errors, "kv error on node 3"))
require.Equal(t, true, containsError(res.Errors, "node 4 timed out"))
require.Equal(t, true, containsError(res.Errors, "node 5 timed out"))
},
},
{
// In a partial failure, nodes 1, 3, and 4 fail, and nodes 2 and 5 succeed.
name: "partial-fanout-failure",
dialCallback: func(nodeID roachpb.NodeID) error {
if nodeID == 1 {
return errors.Newf("error dialing node %d", nodeID)
}
return nil
},
nodeCallback: func(ctx context.Context, nodeID roachpb.NodeID) error {
if nodeID == 3 {
return errors.Newf("kv error on node %d", nodeID)
}

if nodeID == 4 {
<-ctx.Done()
// Return an error that mimics the error returned
// when a rpc's context is cancelled:
return errors.Newf("node %d timed out", nodeID)
}
return nil
},
assertions: func(res *roachpb.SpanStatsResponse) {
require.Greater(t, res.SpanToStats[keys.EverythingSpan.String()].TotalStats.LiveCount, int64(0))
// 3 nodes could not service their requests.
require.Equal(t, 3, len(res.Errors))

require.Equal(t, true, containsError(res.Errors, "error dialing node 1"))
require.Equal(t, true, containsError(res.Errors, "kv error on node 3"))
require.Equal(t, true, containsError(res.Errors, "node 4 timed out"))

// There should not be any errors for node 2 or node 5.
require.Equal(t, false, containsError(res.Errors, "error dialing node 2"))
require.Equal(t, false, containsError(res.Errors, "node 5 timed out"))
},
},
}

for _, tCase := range testCases {
tCase := tCase
t.Run(tCase.name, func(t *testing.T) {
serverArgs := base.TestServerArgs{}
serverArgs.Knobs.Server = &server.TestingKnobs{
IterateNodesDialCallback: tCase.dialCallback,
IterateNodesNodeCallback: tCase.nodeCallback,
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: serverArgs})
defer tc.Stopper().Stop(ctx)

srv := tc.Server(0)
sqlDB := serverutils.OpenDBConn(t, srv.SQLAddr(), "", false, srv.Stopper())
_, err := sqlDB.Exec("SET CLUSTER SETTING server.span_stats.node.timeout = '3s'")
require.NoError(t, err)

statusServer := srv.(*server.TestServer).StatusServer().(serverpb.StatusServer)
res, err := statusServer.SpanStats(ctx, &roachpb.SpanStatsRequest{
NodeID: "0", // Indicates we want a fan-out.
Spans: []roachpb.Span{keys.EverythingSpan},
})

require.NoError(t, err)
tCase.assertions(res)
})
}
}

// BenchmarkSpanStats measures the cost of collecting span statistics.
func BenchmarkSpanStats(b *testing.B) {
skip.UnderShort(b)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (s *statusServer) ResetSQLStats(
var fanoutError error

if err := s.iterateNodes(ctx, "reset SQL statistics",
noTimeout,
dialFn,
resetSQLStats,
func(nodeID roachpb.NodeID, resp interface{}) {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (s *statusServer) Statements(
}

if err := s.iterateNodes(ctx, "statement statistics",
noTimeout,
dialFn,
nodeStatement,
func(nodeID roachpb.NodeID, resp interface{}) {
Expand Down
Loading