diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index eb954afb7d2b..ebaaf530244b 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -84,6 +84,7 @@ server.shutdown.connection_wait duration 0s the maximum amount of time a server
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
+server.span_stats.node.timeout duration 1m0s the duration allowed for a single node to return span stats data before the request is cancelled; if set to 0, there is no timeout tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
server.user_login.downgrade_scram_stored_passwords_to_bcrypt.enabled boolean true if server.user_login.password_encryption=crdb-bcrypt, this controls whether to automatically re-encode stored passwords using scram-sha-256 to crdb-bcrypt tenant-rw
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index ecc6ef2f0611..2eb6c1691352 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -116,6 +116,7 @@
server.shutdown.jobs_wait
| duration | 10s | the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown | Serverless/Dedicated/Self-Hosted |
server.shutdown.lease_transfer_wait
| duration | 5s | the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) | Dedicated/Self-Hosted |
server.shutdown.query_wait
| duration | 10s | the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) | Serverless/Dedicated/Self-Hosted |
+server.span_stats.node.timeout
| duration | 1m0s | the duration allowed for a single node to return span stats data before the request is cancelled; if set to 0, there is no timeout | Serverless/Dedicated/Self-Hosted |
server.time_until_store_dead
| duration | 5m0s | the time after which if there is no new gossiped information about a store, it is considered dead | Serverless/Dedicated/Self-Hosted |
server.user_login.cert_password_method.auto_scram_promotion.enabled
| boolean | true | whether to automatically promote cert-password authentication to use SCRAM | Serverless/Dedicated/Self-Hosted |
server.user_login.downgrade_scram_stored_passwords_to_bcrypt.enabled
| boolean | true | if server.user_login.password_encryption=crdb-bcrypt, this controls whether to automatically re-encode stored passwords using scram-sha-256 to crdb-bcrypt | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/roachpb/span_stats.go b/pkg/roachpb/span_stats.go
index d750c23ef1a4..43718fb7fbea 100644
--- a/pkg/roachpb/span_stats.go
+++ b/pkg/roachpb/span_stats.go
@@ -12,6 +12,7 @@ package roachpb
import (
"fmt"
+ "time"
"github.com/cockroachdb/cockroach/pkg/settings"
)
@@ -30,6 +31,15 @@ var SpanStatsBatchLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)
+var SpanStatsNodeTimeout = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "server.span_stats.node.timeout",
+ "the duration allowed for a single node to return span stats data before"+
+ " the request is cancelled; if set to 0, there is no timeout",
+ time.Minute,
+ settings.NonNegativeDuration,
+).WithPublic()
+
const defaultRangeStatsBatchLimit = 100
// RangeStatsBatchLimit registers the maximum number of ranges to be batched
diff --git a/pkg/server/admin.go b/pkg/server/admin.go
index c126a3adfb6c..0c7e4239ee44 100644
--- a/pkg/server/admin.go
+++ b/pkg/server/admin.go
@@ -3201,6 +3201,7 @@ func (s *systemAdminServer) EnqueueRange(
if err := timeutil.RunWithTimeout(ctx, "enqueue range", time.Minute, func(ctx context.Context) error {
return s.server.status.iterateNodes(
ctx, fmt.Sprintf("enqueue r%d in queue %s", req.RangeID, req.Queue),
+ noTimeout,
dialFn, nodeFn, responseFn, errorFn,
)
}); err != nil {
diff --git a/pkg/server/api_v2_ranges.go b/pkg/server/api_v2_ranges.go
index af737c591fc2..fc952ed4a06b 100644
--- a/pkg/server/api_v2_ranges.go
+++ b/pkg/server/api_v2_ranges.go
@@ -247,7 +247,11 @@ func (a *apiV2Server) listRange(w http.ResponseWriter, r *http.Request) {
}
if err := a.status.iterateNodes(
- ctx, fmt.Sprintf("details about range %d", rangeID), dialFn, nodeFn, responseFn, errorFn,
+ ctx,
+ fmt.Sprintf("details about range %d", rangeID),
+ noTimeout,
+ dialFn, nodeFn,
+ responseFn, errorFn,
); err != nil {
srverrors.APIV2InternalError(ctx, err, w)
return
diff --git a/pkg/server/index_usage_stats.go b/pkg/server/index_usage_stats.go
index 81eeb28dc21c..6bf8bef46d75 100644
--- a/pkg/server/index_usage_stats.go
+++ b/pkg/server/index_usage_stats.go
@@ -100,6 +100,7 @@ func (s *statusServer) IndexUsageStatistics(
// yields an incorrect result.
if err := s.iterateNodes(ctx,
"requesting index usage stats",
+ noTimeout,
dialFn, fetchIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
@@ -196,6 +197,7 @@ func (s *statusServer) ResetIndexUsageStats(
if err := s.iterateNodes(ctx,
"Resetting index usage stats",
+ noTimeout,
dialFn, resetIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
diff --git a/pkg/server/key_visualizer_server.go b/pkg/server/key_visualizer_server.go
index 7e8b0144cb8d..9e1c48e035dc 100644
--- a/pkg/server/key_visualizer_server.go
+++ b/pkg/server/key_visualizer_server.go
@@ -110,8 +110,13 @@ func (s *KeyVisualizerServer) getSamplesFromFanOut(
}
err := s.status.iterateNodes(ctx,
- "iterating nodes for key visualizer samples", dialFn, nodeFn,
- responseFn, errorFn)
+ "iterating nodes for key visualizer samples",
+ noTimeout,
+ dialFn,
+ nodeFn,
+ responseFn,
+ errorFn,
+ )
if err != nil {
return nil, err
}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 7b2ce8aef9e3..31a465e4998e 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -976,6 +976,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
}
// Instantiate the status API server.
+ var serverTestingKnobs *TestingKnobs
+ if cfg.TestingKnobs.Server != nil {
+ serverTestingKnobs = cfg.TestingKnobs.Server.(*TestingKnobs)
+ }
+
sStatus := newSystemStatusServer(
cfg.AmbientCtx,
st,
@@ -998,6 +1003,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
clock,
rangestats.NewFetcher(db),
node,
+ serverTestingKnobs,
)
keyVisualizerServer := &KeyVisualizerServer{
diff --git a/pkg/server/span_stats_server.go b/pkg/server/span_stats_server.go
index 0d149c9bf437..04810101bfaa 100644
--- a/pkg/server/span_stats_server.go
+++ b/pkg/server/span_stats_server.go
@@ -37,8 +37,6 @@ func (s *systemStatusServer) spanStatsFanOut(
res := &roachpb.SpanStatsResponse{
SpanToStats: make(map[string]*roachpb.SpanStats),
}
- // Response level error
- var respErr error
spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
@@ -92,12 +90,25 @@ func (s *systemStatusServer) spanStatsFanOut(
errorFn := func(nodeID roachpb.NodeID, err error) {
log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err)
- respErr = err
}
+ if s.knobs != nil {
+ if s.knobs.IterateNodesDialFn != nil {
+ smartDial = s.knobs.IterateNodesDialFn
+ }
+ if s.knobs.IterateNodesNodeFn != nil {
+ nodeFn = s.knobs.IterateNodesNodeFn
+ }
+ if s.knobs.IterateNodesErrorFn != nil {
+ errorFn = s.knobs.IterateNodesErrorFn
+ }
+ }
+
+ timeout := roachpb.SpanStatsNodeTimeout.Get(&s.st.SV)
if err := s.statusServer.iterateNodes(
ctx,
"iterating nodes for span stats",
+ timeout,
smartDial,
nodeFn,
responseFn,
@@ -106,7 +117,8 @@ func (s *systemStatusServer) spanStatsFanOut(
return nil, err
}
- return res, respErr
+ // Return a map of errors from all nodes
+ return res, nil
}
func (s *systemStatusServer) getLocalStats(
diff --git a/pkg/server/span_stats_test.go b/pkg/server/span_stats_test.go
index db561a3e6930..6f675a82aa47 100644
--- a/pkg/server/span_stats_test.go
+++ b/pkg/server/span_stats_test.go
@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -27,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
@@ -191,6 +193,63 @@ func TestSpanStatsFanOut(t *testing.T) {
}
+func TestSpanStatsFanOutFaultTolerance(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ const numNodes = 5
+
+ mu := syncutil.Mutex{}
+ nodeErrors := make(map[roachpb.NodeID]error)
+
+ serverArgs := base.TestServerArgs{}
+ serverArgs.Knobs.Server = &server.TestingKnobs{
+ IterateNodesDialFn: func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) {
+ if nodeID == 2 {
+ return nil, errors.Newf("error dialing node %d", nodeID)
+ }
+ return nil, nil
+ },
+ IterateNodesNodeFn: func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) {
+ // On the 3rd node, simulate some sort of KV error.
+ if nodeID == 3 {
+ return nil, errors.Newf("error getting span stats from node %d", nodeID)
+ }
+
+ // On the 4th node, simulate a request that takes a very long time.
+ // In this case, nodeFn will block until the timeout is reached.
+ if nodeID == 4 {
+ <-ctx.Done()
+ // Return an error that mimics the error returned
+ // when a rpc's context is cancelled:
+ return nil, errors.New("node 4 timed out")
+ }
+ return &roachpb.SpanStatsResponse{}, nil
+ },
+ IterateNodesErrorFn: func(nodeID roachpb.NodeID, err error) {
+ mu.Lock()
+ defer mu.Unlock()
+ nodeErrors[nodeID] = err
+ },
+ }
+
+ tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: serverArgs})
+ defer tc.Stopper().Stop(ctx)
+
+ sqlDB := tc.Server(0).SQLConn(t, "defaultdb")
+ _, err := sqlDB.Exec("SET CLUSTER SETTING server.span_stats.node.timeout = '3s'")
+ require.NoError(t, err)
+
+ _, err = tc.GetStatusClient(t, 0).SpanStats(ctx, &roachpb.SpanStatsRequest{
+ NodeID: "0",
+ Spans: []roachpb.Span{},
+ })
+ require.NoError(t, err)
+ require.ErrorContains(t, nodeErrors[2], "error dialing node 2")
+ require.ErrorContains(t, nodeErrors[3], "error getting span stats from node 3")
+ require.ErrorContains(t, nodeErrors[4], "node 4 timed out")
+}
+
// BenchmarkSpanStats measures the cost of collecting span statistics.
func BenchmarkSpanStats(b *testing.B) {
skip.UnderShort(b)
diff --git a/pkg/server/sql_stats.go b/pkg/server/sql_stats.go
index ac8404b8eb02..304ea26d3c0c 100644
--- a/pkg/server/sql_stats.go
+++ b/pkg/server/sql_stats.go
@@ -79,6 +79,7 @@ func (s *statusServer) ResetSQLStats(
var fanoutError error
if err := s.iterateNodes(ctx, "reset SQL statistics",
+ noTimeout,
dialFn,
resetSQLStats,
func(nodeID roachpb.NodeID, resp interface{}) {
diff --git a/pkg/server/statements.go b/pkg/server/statements.go
index 35c0d9b48206..dd07a4b3851f 100644
--- a/pkg/server/statements.go
+++ b/pkg/server/statements.go
@@ -85,6 +85,7 @@ func (s *statusServer) Statements(
}
if err := s.iterateNodes(ctx, "statement statistics",
+ noTimeout,
dialFn,
nodeStatement,
func(nodeID roachpb.NodeID, resp interface{}) {
diff --git a/pkg/server/status.go b/pkg/server/status.go
index d0820d5dd4ea..087117ee73d2 100644
--- a/pkg/server/status.go
+++ b/pkg/server/status.go
@@ -495,6 +495,7 @@ type systemStatusServer struct {
spanConfigReporter spanconfig.Reporter
rangeStatsFetcher *rangestats.Fetcher
node *Node
+ knobs *TestingKnobs
}
// StmtDiagnosticsRequester is the interface into *stmtdiagnostics.Registry
@@ -613,6 +614,7 @@ func newSystemStatusServer(
clock *hlc.Clock,
rangeStatsFetcher *rangestats.Fetcher,
node *Node,
+ knobs *TestingKnobs,
) *systemStatusServer {
server := newStatusServer(
ambient,
@@ -640,6 +642,7 @@ func newSystemStatusServer(
spanConfigReporter: spanConfigReporter,
rangeStatsFetcher: rangeStatsFetcher,
node: node,
+ knobs: knobs,
}
}
@@ -1630,7 +1633,9 @@ func (s *statusServer) fetchProfileFromAllNodes(
errorFn := func(nodeID roachpb.NodeID, err error) {
response.profDataByNodeID[nodeID] = &profData{err: err}
}
- if err := s.iterateNodes(ctx, opName, dialFn, nodeFn, responseFn, errorFn); err != nil {
+ if err := s.iterateNodes(
+ ctx, opName, noTimeout, dialFn, nodeFn, responseFn, errorFn,
+ ); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
var data []byte
@@ -2048,7 +2053,13 @@ func (s *systemStatusServer) NetworkConnectivity(
response.ErrorsByNodeID[nodeID] = err.Error()
}
- if err := s.iterateNodes(ctx, "network connectivity", dialFn, nodeFn, responseFn, errorFn); err != nil {
+ if err := s.iterateNodes(ctx, "network connectivity",
+ noTimeout,
+ dialFn,
+ nodeFn,
+ responseFn,
+ errorFn,
+ ); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
@@ -2632,7 +2643,13 @@ func (s *systemStatusServer) HotRanges(
}
}
- if err := s.iterateNodes(ctx, "hot ranges", dialFn, nodeFn, responseFn, errorFn); err != nil {
+ if err := s.iterateNodes(ctx, "hot ranges",
+ noTimeout,
+ dialFn,
+ nodeFn,
+ responseFn,
+ errorFn,
+ ); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
@@ -2985,7 +3002,9 @@ func (s *statusServer) Range(
}
if err := s.iterateNodes(
- ctx, fmt.Sprintf("details about range %d", req.RangeId), dialFn, nodeFn, responseFn, errorFn,
+ ctx, fmt.Sprintf("details about range %d", req.RangeId), noTimeout,
+ dialFn,
+ nodeFn, responseFn, errorFn,
); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
@@ -3014,6 +3033,7 @@ func (s *statusServer) ListLocalSessions(
func (s *statusServer) iterateNodes(
ctx context.Context,
errorCtx string,
+ nodeFnTimeout time.Duration,
dialFn func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error),
nodeFn func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error),
responseFn func(nodeID roachpb.NodeID, resp interface{}),
@@ -3048,7 +3068,18 @@ func (s *statusServer) iterateNodes(
return
}
- res, err := nodeFn(ctx, client, nodeID)
+ var res interface{}
+ if nodeFnTimeout == noTimeout {
+ res, err = nodeFn(ctx, client, nodeID)
+ } else {
+ err = timeutil.RunWithTimeout(ctx, "iterate-nodes-fn",
+ nodeFnTimeout, func(ctx context.Context) error {
+ var _err error
+ res, _err = nodeFn(ctx, client, nodeID)
+ return _err
+ })
+ }
+
if err != nil {
err = errors.Wrapf(err, "error requesting %s from node %d (%s)",
errorCtx, nodeID, nodeStatuses[serverID(nodeID)])
@@ -3111,7 +3142,9 @@ func (s *statusServer) paginatedIterateNodes(
errorFn func(nodeID roachpb.NodeID, nodeFnError error),
) (next paginationState, err error) {
if limit == 0 {
- return paginationState{}, s.iterateNodes(ctx, errorCtx, dialFn, nodeFn, responseFn, errorFn)
+ return paginationState{}, s.iterateNodes(ctx, errorCtx, noTimeout,
+ dialFn,
+ nodeFn, responseFn, errorFn)
}
nodeStatuses, err := s.serverIterator.getAllNodes(ctx)
if err != nil {
@@ -3457,7 +3490,9 @@ func (s *statusServer) ListContentionEvents(
response.Errors = append(response.Errors, errResponse)
}
- if err := s.iterateNodes(ctx, "contention events list", dialFn, nodeFn, responseFn, errorFn); err != nil {
+ if err := s.iterateNodes(ctx, "contention events list", noTimeout,
+ dialFn, nodeFn,
+ responseFn, errorFn); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
return &response, nil
@@ -3504,7 +3539,9 @@ func (s *statusServer) ListDistSQLFlows(
response.Errors = append(response.Errors, errResponse)
}
- if err := s.iterateNodes(ctx, "distsql flows list", dialFn, nodeFn, responseFn, errorFn); err != nil {
+ if err := s.iterateNodes(ctx, "distsql flows list", noTimeout, dialFn,
+ nodeFn,
+ responseFn, errorFn); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
return &response, nil
@@ -3564,7 +3601,9 @@ func (s *statusServer) ListExecutionInsights(
response.Errors = append(response.Errors, errors.EncodeError(ctx, err))
}
- if err := s.iterateNodes(ctx, "execution insights list", dialFn, nodeFn, responseFn, errorFn); err != nil {
+ if err := s.iterateNodes(ctx, "execution insights list", noTimeout,
+ dialFn, nodeFn,
+ responseFn, errorFn); err != nil {
return nil, srverrors.ServerError(ctx, err)
}
return &response, nil
@@ -3924,6 +3963,7 @@ func (s *statusServer) TransactionContentionEvents(
}
if err := s.iterateNodes(ctx, "txn contention events for node",
+ noTimeout,
dialFn,
rpcCallFn,
func(nodeID roachpb.NodeID, nodeResp interface{}) {
diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go
index ccb3fbd9a935..46c0b8a07185 100644
--- a/pkg/server/testing_knobs.go
+++ b/pkg/server/testing_knobs.go
@@ -11,6 +11,7 @@
package server
import (
+ "context"
"net"
"time"
@@ -147,6 +148,19 @@ type TestingKnobs struct {
// system.tenants table. This is useful for tests that want to verify that
// the tenant connector can't start when the record doesn't exist.
ShutdownTenantConnectorEarlyIfNoRecordPresent bool
+
+ // IterateNodesDialFn is used to mock the node dialing behavior in a cluster
+ // fan-out. It is invoked by server.iterateNodes.
+ IterateNodesDialFn func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error)
+
+ // IterateNodesNodeFn is used to mock the behavior of the rpc invoked on
+ // a remote node in a cluster fan-out. It is invoked by server.iterateNodes.
+ IterateNodesNodeFn func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error)
+
+ // IterateNodesErrorFn is used to mock the behavior of the error handling
+ // logic if either the fan-out's dialFn or nodeFn return an error.
+ // It is invoked by server.iterateNodes.
+ IterateNodesErrorFn func(nodeID roachpb.NodeID, err error)
}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.