From bc5712512de46ec7a4e0594e27babd0e2a5d1524 Mon Sep 17 00:00:00 2001 From: zachlite Date: Wed, 9 Aug 2023 10:12:55 -0400 Subject: [PATCH] server: make the span stats fan-out more fault tolerant This commit adds improved fault tolerance to the span stats fan-out: 1. Errors encountered during the fan-out will not invalidate the entire request. Now, a span stats fan-out will always return a roachpb.SpanStatsResponse that has been updated by values from nodes that service their requests without error. Errors that are encountered are logged. Errors may be due to a connection error, or due to a KV-related error while a node is servicing a request. 2. Nodes must service requests within the timeout passed to `iterateNodes`. For span stats, the value comes from a new cluster setting: 'server.span_stats.node.timeout', with a default value of 1 minute. Resolves #106097 Epic: none Release note (ops change): Added a new cluster setting, 'server.span_stats.node.timeout' to control the maximum duration that a node is allowed to spend servicing a span stats request. A value of '0' will not timeout. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/roachpb/span_stats.go | 10 ++++ pkg/server/admin.go | 1 + pkg/server/api_v2_ranges.go | 6 +- pkg/server/index_usage_stats.go | 2 + pkg/server/key_visualizer_server.go | 9 ++- pkg/server/server.go | 6 ++ pkg/server/span_stats_server.go | 20 +++++-- pkg/server/span_stats_test.go | 59 +++++++++++++++++++ pkg/server/sql_stats.go | 1 + pkg/server/statements.go | 1 + pkg/server/status.go | 58 +++++++++++++++--- pkg/server/testing_knobs.go | 14 +++++ 14 files changed, 173 insertions(+), 16 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index eb954afb7d2b..ebaaf530244b 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -84,6 +84,7 @@ server.shutdown.connection_wait duration 0s the maximum amount of time a server server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw +server.span_stats.node.timeout duration 1m0s the duration allowed for a single node to return span stats data before the request is cancelled; if set to 0, there is no timeout tenant-rw server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw server.user_login.downgrade_scram_stored_passwords_to_bcrypt.enabled boolean true if server.user_login.password_encryption=crdb-bcrypt, this controls whether to automatically re-encode stored passwords using scram-sha-256 to crdb-bcrypt tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index ecc6ef2f0611..2eb6c1691352 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -116,6 +116,7 @@
server.shutdown.jobs_wait
duration10sthe maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdownServerless/Dedicated/Self-Hosted
server.shutdown.lease_transfer_wait
duration5sthe timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Dedicated/Self-Hosted
server.shutdown.query_wait
duration10sthe timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)Serverless/Dedicated/Self-Hosted +
server.span_stats.node.timeout
duration1m0sthe duration allowed for a single node to return span stats data before the request is cancelled; if set to 0, there is no timeoutServerless/Dedicated/Self-Hosted
server.time_until_store_dead
duration5m0sthe time after which if there is no new gossiped information about a store, it is considered deadServerless/Dedicated/Self-Hosted
server.user_login.cert_password_method.auto_scram_promotion.enabled
booleantruewhether to automatically promote cert-password authentication to use SCRAMServerless/Dedicated/Self-Hosted
server.user_login.downgrade_scram_stored_passwords_to_bcrypt.enabled
booleantrueif server.user_login.password_encryption=crdb-bcrypt, this controls whether to automatically re-encode stored passwords using scram-sha-256 to crdb-bcryptServerless/Dedicated/Self-Hosted diff --git a/pkg/roachpb/span_stats.go b/pkg/roachpb/span_stats.go index d750c23ef1a4..43718fb7fbea 100644 --- a/pkg/roachpb/span_stats.go +++ b/pkg/roachpb/span_stats.go @@ -12,6 +12,7 @@ package roachpb import ( "fmt" + "time" "github.com/cockroachdb/cockroach/pkg/settings" ) @@ -30,6 +31,15 @@ var SpanStatsBatchLimit = settings.RegisterIntSetting( settings.PositiveInt, ) +var SpanStatsNodeTimeout = settings.RegisterDurationSetting( + settings.TenantWritable, + "server.span_stats.node.timeout", + "the duration allowed for a single node to return span stats data before"+ + " the request is cancelled; if set to 0, there is no timeout", + time.Minute, + settings.NonNegativeDuration, +).WithPublic() + const defaultRangeStatsBatchLimit = 100 // RangeStatsBatchLimit registers the maximum number of ranges to be batched diff --git a/pkg/server/admin.go b/pkg/server/admin.go index c126a3adfb6c..0c7e4239ee44 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -3201,6 +3201,7 @@ func (s *systemAdminServer) EnqueueRange( if err := timeutil.RunWithTimeout(ctx, "enqueue range", time.Minute, func(ctx context.Context) error { return s.server.status.iterateNodes( ctx, fmt.Sprintf("enqueue r%d in queue %s", req.RangeID, req.Queue), + noTimeout, dialFn, nodeFn, responseFn, errorFn, ) }); err != nil { diff --git a/pkg/server/api_v2_ranges.go b/pkg/server/api_v2_ranges.go index af737c591fc2..fc952ed4a06b 100644 --- a/pkg/server/api_v2_ranges.go +++ b/pkg/server/api_v2_ranges.go @@ -247,7 +247,11 @@ func (a *apiV2Server) listRange(w http.ResponseWriter, r *http.Request) { } if err := a.status.iterateNodes( - ctx, fmt.Sprintf("details about range %d", rangeID), dialFn, nodeFn, responseFn, errorFn, + ctx, + fmt.Sprintf("details about range %d", rangeID), + noTimeout, + dialFn, nodeFn, + responseFn, errorFn, ); err != nil { srverrors.APIV2InternalError(ctx, err, w) return diff --git a/pkg/server/index_usage_stats.go b/pkg/server/index_usage_stats.go index 81eeb28dc21c..6bf8bef46d75 100644 --- a/pkg/server/index_usage_stats.go +++ b/pkg/server/index_usage_stats.go @@ -100,6 +100,7 @@ func (s *statusServer) IndexUsageStatistics( // yields an incorrect result. if err := s.iterateNodes(ctx, "requesting index usage stats", + noTimeout, dialFn, fetchIndexUsageStats, aggFn, errFn); err != nil { return nil, err } @@ -196,6 +197,7 @@ func (s *statusServer) ResetIndexUsageStats( if err := s.iterateNodes(ctx, "Resetting index usage stats", + noTimeout, dialFn, resetIndexUsageStats, aggFn, errFn); err != nil { return nil, err } diff --git a/pkg/server/key_visualizer_server.go b/pkg/server/key_visualizer_server.go index 7e8b0144cb8d..9e1c48e035dc 100644 --- a/pkg/server/key_visualizer_server.go +++ b/pkg/server/key_visualizer_server.go @@ -110,8 +110,13 @@ func (s *KeyVisualizerServer) getSamplesFromFanOut( } err := s.status.iterateNodes(ctx, - "iterating nodes for key visualizer samples", dialFn, nodeFn, - responseFn, errorFn) + "iterating nodes for key visualizer samples", + noTimeout, + dialFn, + nodeFn, + responseFn, + errorFn, + ) if err != nil { return nil, err } diff --git a/pkg/server/server.go b/pkg/server/server.go index 7b2ce8aef9e3..31a465e4998e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -976,6 +976,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf } // Instantiate the status API server. + var serverTestingKnobs *TestingKnobs + if cfg.TestingKnobs.Server != nil { + serverTestingKnobs = cfg.TestingKnobs.Server.(*TestingKnobs) + } + sStatus := newSystemStatusServer( cfg.AmbientCtx, st, @@ -998,6 +1003,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf clock, rangestats.NewFetcher(db), node, + serverTestingKnobs, ) keyVisualizerServer := &KeyVisualizerServer{ diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go index 0d149c9bf437..04810101bfaa 100644 --- a/pkg/server/span_stats_server.go +++ b/pkg/server/span_stats_server.go @@ -37,8 +37,6 @@ func (s *systemStatusServer) spanStatsFanOut( res := &roachpb.SpanStatsResponse{ SpanToStats: make(map[string]*roachpb.SpanStats), } - // Response level error - var respErr error spansPerNode, err := s.getSpansPerNode(ctx, req) if err != nil { @@ -92,12 +90,25 @@ func (s *systemStatusServer) spanStatsFanOut( errorFn := func(nodeID roachpb.NodeID, err error) { log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err) - respErr = err } + if s.knobs != nil { + if s.knobs.IterateNodesDialFn != nil { + smartDial = s.knobs.IterateNodesDialFn + } + if s.knobs.IterateNodesNodeFn != nil { + nodeFn = s.knobs.IterateNodesNodeFn + } + if s.knobs.IterateNodesErrorFn != nil { + errorFn = s.knobs.IterateNodesErrorFn + } + } + + timeout := roachpb.SpanStatsNodeTimeout.Get(&s.st.SV) if err := s.statusServer.iterateNodes( ctx, "iterating nodes for span stats", + timeout, smartDial, nodeFn, responseFn, @@ -106,7 +117,8 @@ func (s *systemStatusServer) spanStatsFanOut( return nil, err } - return res, respErr + // Return a map of errors from all nodes + return res, nil } func (s *systemStatusServer) getLocalStats( diff --git a/pkg/server/span_stats_test.go b/pkg/server/span_stats_test.go index db561a3e6930..6f675a82aa47 100644 --- a/pkg/server/span_stats_test.go +++ b/pkg/server/span_stats_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -191,6 +193,63 @@ func TestSpanStatsFanOut(t *testing.T) { } +func TestSpanStatsFanOutFaultTolerance(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + const numNodes = 5 + + mu := syncutil.Mutex{} + nodeErrors := make(map[roachpb.NodeID]error) + + serverArgs := base.TestServerArgs{} + serverArgs.Knobs.Server = &server.TestingKnobs{ + IterateNodesDialFn: func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) { + if nodeID == 2 { + return nil, errors.Newf("error dialing node %d", nodeID) + } + return nil, nil + }, + IterateNodesNodeFn: func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { + // On the 3rd node, simulate some sort of KV error. + if nodeID == 3 { + return nil, errors.Newf("error getting span stats from node %d", nodeID) + } + + // On the 4th node, simulate a request that takes a very long time. + // In this case, nodeFn will block until the timeout is reached. + if nodeID == 4 { + <-ctx.Done() + // Return an error that mimics the error returned + // when a rpc's context is cancelled: + return nil, errors.New("node 4 timed out") + } + return &roachpb.SpanStatsResponse{}, nil + }, + IterateNodesErrorFn: func(nodeID roachpb.NodeID, err error) { + mu.Lock() + defer mu.Unlock() + nodeErrors[nodeID] = err + }, + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: serverArgs}) + defer tc.Stopper().Stop(ctx) + + sqlDB := tc.Server(0).SQLConn(t, "defaultdb") + _, err := sqlDB.Exec("SET CLUSTER SETTING server.span_stats.node.timeout = '3s'") + require.NoError(t, err) + + _, err = tc.GetStatusClient(t, 0).SpanStats(ctx, &roachpb.SpanStatsRequest{ + NodeID: "0", + Spans: []roachpb.Span{}, + }) + require.NoError(t, err) + require.ErrorContains(t, nodeErrors[2], "error dialing node 2") + require.ErrorContains(t, nodeErrors[3], "error getting span stats from node 3") + require.ErrorContains(t, nodeErrors[4], "node 4 timed out") +} + // BenchmarkSpanStats measures the cost of collecting span statistics. func BenchmarkSpanStats(b *testing.B) { skip.UnderShort(b) diff --git a/pkg/server/sql_stats.go b/pkg/server/sql_stats.go index ac8404b8eb02..304ea26d3c0c 100644 --- a/pkg/server/sql_stats.go +++ b/pkg/server/sql_stats.go @@ -79,6 +79,7 @@ func (s *statusServer) ResetSQLStats( var fanoutError error if err := s.iterateNodes(ctx, "reset SQL statistics", + noTimeout, dialFn, resetSQLStats, func(nodeID roachpb.NodeID, resp interface{}) { diff --git a/pkg/server/statements.go b/pkg/server/statements.go index 35c0d9b48206..dd07a4b3851f 100644 --- a/pkg/server/statements.go +++ b/pkg/server/statements.go @@ -85,6 +85,7 @@ func (s *statusServer) Statements( } if err := s.iterateNodes(ctx, "statement statistics", + noTimeout, dialFn, nodeStatement, func(nodeID roachpb.NodeID, resp interface{}) { diff --git a/pkg/server/status.go b/pkg/server/status.go index d0820d5dd4ea..087117ee73d2 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -495,6 +495,7 @@ type systemStatusServer struct { spanConfigReporter spanconfig.Reporter rangeStatsFetcher *rangestats.Fetcher node *Node + knobs *TestingKnobs } // StmtDiagnosticsRequester is the interface into *stmtdiagnostics.Registry @@ -613,6 +614,7 @@ func newSystemStatusServer( clock *hlc.Clock, rangeStatsFetcher *rangestats.Fetcher, node *Node, + knobs *TestingKnobs, ) *systemStatusServer { server := newStatusServer( ambient, @@ -640,6 +642,7 @@ func newSystemStatusServer( spanConfigReporter: spanConfigReporter, rangeStatsFetcher: rangeStatsFetcher, node: node, + knobs: knobs, } } @@ -1630,7 +1633,9 @@ func (s *statusServer) fetchProfileFromAllNodes( errorFn := func(nodeID roachpb.NodeID, err error) { response.profDataByNodeID[nodeID] = &profData{err: err} } - if err := s.iterateNodes(ctx, opName, dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes( + ctx, opName, noTimeout, dialFn, nodeFn, responseFn, errorFn, + ); err != nil { return nil, srverrors.ServerError(ctx, err) } var data []byte @@ -2048,7 +2053,13 @@ func (s *systemStatusServer) NetworkConnectivity( response.ErrorsByNodeID[nodeID] = err.Error() } - if err := s.iterateNodes(ctx, "network connectivity", dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes(ctx, "network connectivity", + noTimeout, + dialFn, + nodeFn, + responseFn, + errorFn, + ); err != nil { return nil, srverrors.ServerError(ctx, err) } @@ -2632,7 +2643,13 @@ func (s *systemStatusServer) HotRanges( } } - if err := s.iterateNodes(ctx, "hot ranges", dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes(ctx, "hot ranges", + noTimeout, + dialFn, + nodeFn, + responseFn, + errorFn, + ); err != nil { return nil, srverrors.ServerError(ctx, err) } @@ -2985,7 +3002,9 @@ func (s *statusServer) Range( } if err := s.iterateNodes( - ctx, fmt.Sprintf("details about range %d", req.RangeId), dialFn, nodeFn, responseFn, errorFn, + ctx, fmt.Sprintf("details about range %d", req.RangeId), noTimeout, + dialFn, + nodeFn, responseFn, errorFn, ); err != nil { return nil, srverrors.ServerError(ctx, err) } @@ -3014,6 +3033,7 @@ func (s *statusServer) ListLocalSessions( func (s *statusServer) iterateNodes( ctx context.Context, errorCtx string, + nodeFnTimeout time.Duration, dialFn func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error), nodeFn func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error), responseFn func(nodeID roachpb.NodeID, resp interface{}), @@ -3048,7 +3068,18 @@ func (s *statusServer) iterateNodes( return } - res, err := nodeFn(ctx, client, nodeID) + var res interface{} + if nodeFnTimeout == noTimeout { + res, err = nodeFn(ctx, client, nodeID) + } else { + err = timeutil.RunWithTimeout(ctx, "iterate-nodes-fn", + nodeFnTimeout, func(ctx context.Context) error { + var _err error + res, _err = nodeFn(ctx, client, nodeID) + return _err + }) + } + if err != nil { err = errors.Wrapf(err, "error requesting %s from node %d (%s)", errorCtx, nodeID, nodeStatuses[serverID(nodeID)]) @@ -3111,7 +3142,9 @@ func (s *statusServer) paginatedIterateNodes( errorFn func(nodeID roachpb.NodeID, nodeFnError error), ) (next paginationState, err error) { if limit == 0 { - return paginationState{}, s.iterateNodes(ctx, errorCtx, dialFn, nodeFn, responseFn, errorFn) + return paginationState{}, s.iterateNodes(ctx, errorCtx, noTimeout, + dialFn, + nodeFn, responseFn, errorFn) } nodeStatuses, err := s.serverIterator.getAllNodes(ctx) if err != nil { @@ -3457,7 +3490,9 @@ func (s *statusServer) ListContentionEvents( response.Errors = append(response.Errors, errResponse) } - if err := s.iterateNodes(ctx, "contention events list", dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes(ctx, "contention events list", noTimeout, + dialFn, nodeFn, + responseFn, errorFn); err != nil { return nil, srverrors.ServerError(ctx, err) } return &response, nil @@ -3504,7 +3539,9 @@ func (s *statusServer) ListDistSQLFlows( response.Errors = append(response.Errors, errResponse) } - if err := s.iterateNodes(ctx, "distsql flows list", dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes(ctx, "distsql flows list", noTimeout, dialFn, + nodeFn, + responseFn, errorFn); err != nil { return nil, srverrors.ServerError(ctx, err) } return &response, nil @@ -3564,7 +3601,9 @@ func (s *statusServer) ListExecutionInsights( response.Errors = append(response.Errors, errors.EncodeError(ctx, err)) } - if err := s.iterateNodes(ctx, "execution insights list", dialFn, nodeFn, responseFn, errorFn); err != nil { + if err := s.iterateNodes(ctx, "execution insights list", noTimeout, + dialFn, nodeFn, + responseFn, errorFn); err != nil { return nil, srverrors.ServerError(ctx, err) } return &response, nil @@ -3924,6 +3963,7 @@ func (s *statusServer) TransactionContentionEvents( } if err := s.iterateNodes(ctx, "txn contention events for node", + noTimeout, dialFn, rpcCallFn, func(nodeID roachpb.NodeID, nodeResp interface{}) { diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index ccb3fbd9a935..46c0b8a07185 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -11,6 +11,7 @@ package server import ( + "context" "net" "time" @@ -147,6 +148,19 @@ type TestingKnobs struct { // system.tenants table. This is useful for tests that want to verify that // the tenant connector can't start when the record doesn't exist. ShutdownTenantConnectorEarlyIfNoRecordPresent bool + + // IterateNodesDialFn is used to mock the node dialing behavior in a cluster + // fan-out. It is invoked by server.iterateNodes. + IterateNodesDialFn func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) + + // IterateNodesNodeFn is used to mock the behavior of the rpc invoked on + // a remote node in a cluster fan-out. It is invoked by server.iterateNodes. + IterateNodesNodeFn func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) + + // IterateNodesErrorFn is used to mock the behavior of the error handling + // logic if either the fan-out's dialFn or nodeFn return an error. + // It is invoked by server.iterateNodes. + IterateNodesErrorFn func(nodeID roachpb.NodeID, err error) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.