Skip to content

Commit

Permalink
server: make the span stats fan-out more fault tolerant
Browse files Browse the repository at this point in the history
This commit adds improved fault tolerance to the span stats fan-out:

1. Errors encountered during the fan-out will not invalidate the
entire request. Now, a span stats fan-out will always return a
roachpb.SpanStatsResponse that has been updated by values from nodes that
service their requests without error.

Errors that are encountered are logged. Errors may be due to a connection
error, or due to a KV-related error while a node is servicing a request.

2. Nodes must service requests within the timeout passed to `iterateNodes`.
For span stats, the value comes from a new cluster setting:
'server.span_stats.node.timeout', with a default value of 1 minute.

Resolves cockroachdb#106097
Epic: none
Release note (ops change): Added a new cluster setting,
'server.span_stats.node.timeout' to control the maximum duration that
a node is allowed to spend servicing a span stats request. A value of
'0' will not timeout.
  • Loading branch information
zachlite authored and Zach Lite committed Aug 9, 2023
1 parent 4c34b48 commit bc57125
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ server.shutdown.connection_wait duration 0s the maximum amount of time a server
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.span_stats.node.timeout duration 1m0s the duration allowed for a single node to return span stats data before the request is cancelled; if set to 0, there is no timeout tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
server.user_login.downgrade_scram_stored_passwords_to_bcrypt.enabled boolean true if server.user_login.password_encryption=crdb-bcrypt, this controls whether to automatically re-encode stored passwords using scram-sha-256 to crdb-bcrypt tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-lease-transfer-wait" class="anchored"><code>server.shutdown.lease_transfer_wait</code></div></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-query-wait" class="anchored"><code>server.shutdown.query_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-span-stats-node-timeout" class="anchored"><code>server.span_stats.node.timeout</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the duration allowed for a single node to return span stats data before the request is cancelled; if set to 0, there is no timeout</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-time-until-store-dead" class="anchored"><code>server.time_until_store_dead</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-user-login-cert-password-method-auto-scram-promotion-enabled" class="anchored"><code>server.user_login.cert_password_method.auto_scram_promotion.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>whether to automatically promote cert-password authentication to use SCRAM</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-user-login-downgrade-scram-stored-passwords-to-bcrypt-enabled" class="anchored"><code>server.user_login.downgrade_scram_stored_passwords_to_bcrypt.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if server.user_login.password_encryption=crdb-bcrypt, this controls whether to automatically re-encode stored passwords using scram-sha-256 to crdb-bcrypt</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/span_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package roachpb

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)
Expand All @@ -30,6 +31,15 @@ var SpanStatsBatchLimit = settings.RegisterIntSetting(
settings.PositiveInt,
)

var SpanStatsNodeTimeout = settings.RegisterDurationSetting(
settings.TenantWritable,
"server.span_stats.node.timeout",
"the duration allowed for a single node to return span stats data before"+
" the request is cancelled; if set to 0, there is no timeout",
time.Minute,
settings.NonNegativeDuration,
).WithPublic()

const defaultRangeStatsBatchLimit = 100

// RangeStatsBatchLimit registers the maximum number of ranges to be batched
Expand Down
1 change: 1 addition & 0 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3201,6 +3201,7 @@ func (s *systemAdminServer) EnqueueRange(
if err := timeutil.RunWithTimeout(ctx, "enqueue range", time.Minute, func(ctx context.Context) error {
return s.server.status.iterateNodes(
ctx, fmt.Sprintf("enqueue r%d in queue %s", req.RangeID, req.Queue),
noTimeout,
dialFn, nodeFn, responseFn, errorFn,
)
}); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/api_v2_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,11 @@ func (a *apiV2Server) listRange(w http.ResponseWriter, r *http.Request) {
}

if err := a.status.iterateNodes(
ctx, fmt.Sprintf("details about range %d", rangeID), dialFn, nodeFn, responseFn, errorFn,
ctx,
fmt.Sprintf("details about range %d", rangeID),
noTimeout,
dialFn, nodeFn,
responseFn, errorFn,
); err != nil {
srverrors.APIV2InternalError(ctx, err, w)
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/index_usage_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (s *statusServer) IndexUsageStatistics(
// yields an incorrect result.
if err := s.iterateNodes(ctx,
"requesting index usage stats",
noTimeout,
dialFn, fetchIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
Expand Down Expand Up @@ -196,6 +197,7 @@ func (s *statusServer) ResetIndexUsageStats(

if err := s.iterateNodes(ctx,
"Resetting index usage stats",
noTimeout,
dialFn, resetIndexUsageStats, aggFn, errFn); err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/server/key_visualizer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@ func (s *KeyVisualizerServer) getSamplesFromFanOut(
}

err := s.status.iterateNodes(ctx,
"iterating nodes for key visualizer samples", dialFn, nodeFn,
responseFn, errorFn)
"iterating nodes for key visualizer samples",
noTimeout,
dialFn,
nodeFn,
responseFn,
errorFn,
)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
}

// Instantiate the status API server.
var serverTestingKnobs *TestingKnobs
if cfg.TestingKnobs.Server != nil {
serverTestingKnobs = cfg.TestingKnobs.Server.(*TestingKnobs)
}

sStatus := newSystemStatusServer(
cfg.AmbientCtx,
st,
Expand All @@ -998,6 +1003,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
clock,
rangestats.NewFetcher(db),
node,
serverTestingKnobs,
)

keyVisualizerServer := &KeyVisualizerServer{
Expand Down
20 changes: 16 additions & 4 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func (s *systemStatusServer) spanStatsFanOut(
res := &roachpb.SpanStatsResponse{
SpanToStats: make(map[string]*roachpb.SpanStats),
}
// Response level error
var respErr error

spansPerNode, err := s.getSpansPerNode(ctx, req)
if err != nil {
Expand Down Expand Up @@ -92,12 +90,25 @@ func (s *systemStatusServer) spanStatsFanOut(

errorFn := func(nodeID roachpb.NodeID, err error) {
log.Errorf(ctx, nodeErrorMsgPlaceholder, nodeID, err)
respErr = err
}

if s.knobs != nil {
if s.knobs.IterateNodesDialFn != nil {
smartDial = s.knobs.IterateNodesDialFn
}
if s.knobs.IterateNodesNodeFn != nil {
nodeFn = s.knobs.IterateNodesNodeFn
}
if s.knobs.IterateNodesErrorFn != nil {
errorFn = s.knobs.IterateNodesErrorFn
}
}

timeout := roachpb.SpanStatsNodeTimeout.Get(&s.st.SV)
if err := s.statusServer.iterateNodes(
ctx,
"iterating nodes for span stats",
timeout,
smartDial,
nodeFn,
responseFn,
Expand All @@ -106,7 +117,8 @@ func (s *systemStatusServer) spanStatsFanOut(
return nil, err
}

return res, respErr
// Return a map of errors from all nodes
return res, nil
}

func (s *systemStatusServer) getLocalStats(
Expand Down
59 changes: 59 additions & 0 deletions pkg/server/span_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -191,6 +193,63 @@ func TestSpanStatsFanOut(t *testing.T) {

}

func TestSpanStatsFanOutFaultTolerance(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
const numNodes = 5

mu := syncutil.Mutex{}
nodeErrors := make(map[roachpb.NodeID]error)

serverArgs := base.TestServerArgs{}
serverArgs.Knobs.Server = &server.TestingKnobs{
IterateNodesDialFn: func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) {
if nodeID == 2 {
return nil, errors.Newf("error dialing node %d", nodeID)
}
return nil, nil
},
IterateNodesNodeFn: func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) {
// On the 3rd node, simulate some sort of KV error.
if nodeID == 3 {
return nil, errors.Newf("error getting span stats from node %d", nodeID)
}

// On the 4th node, simulate a request that takes a very long time.
// In this case, nodeFn will block until the timeout is reached.
if nodeID == 4 {
<-ctx.Done()
// Return an error that mimics the error returned
// when a rpc's context is cancelled:
return nil, errors.New("node 4 timed out")
}
return &roachpb.SpanStatsResponse{}, nil
},
IterateNodesErrorFn: func(nodeID roachpb.NodeID, err error) {
mu.Lock()
defer mu.Unlock()
nodeErrors[nodeID] = err
},
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: serverArgs})
defer tc.Stopper().Stop(ctx)

sqlDB := tc.Server(0).SQLConn(t, "defaultdb")
_, err := sqlDB.Exec("SET CLUSTER SETTING server.span_stats.node.timeout = '3s'")
require.NoError(t, err)

_, err = tc.GetStatusClient(t, 0).SpanStats(ctx, &roachpb.SpanStatsRequest{
NodeID: "0",
Spans: []roachpb.Span{},
})
require.NoError(t, err)
require.ErrorContains(t, nodeErrors[2], "error dialing node 2")
require.ErrorContains(t, nodeErrors[3], "error getting span stats from node 3")
require.ErrorContains(t, nodeErrors[4], "node 4 timed out")
}

// BenchmarkSpanStats measures the cost of collecting span statistics.
func BenchmarkSpanStats(b *testing.B) {
skip.UnderShort(b)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (s *statusServer) ResetSQLStats(
var fanoutError error

if err := s.iterateNodes(ctx, "reset SQL statistics",
noTimeout,
dialFn,
resetSQLStats,
func(nodeID roachpb.NodeID, resp interface{}) {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (s *statusServer) Statements(
}

if err := s.iterateNodes(ctx, "statement statistics",
noTimeout,
dialFn,
nodeStatement,
func(nodeID roachpb.NodeID, resp interface{}) {
Expand Down
Loading

0 comments on commit bc57125

Please sign in to comment.