Skip to content

Commit

Permalink
ui, server: add a timeout per node while collecting hot ranges
Browse files Browse the repository at this point in the history
Requests for hot ranges are serviced by a cluster wide fan-out,
where non-trivial work is done on each node to provide a response.
For each store, and for each hot range, we start a transaction with KV to look
up descriptor info.

Previously, there was no upper-bound set on the time a node could take
to provide a response. This commit introduces a per-node timeout
in the pagination logic, and is configurable with the new cluster setting
server.hot_ranges.node.timeout. A value of 0 will disable the timeout.

Error behavior and semantics are preserved. If a particular node times out,
The fan-out continues as before, as if a node failed to provide a response.

Informs #104269
Resolves #107627
Epic: none
Release note (ops change): Added a new cluster setting named
server.hot_ranges.node.timeout, with a default value of 5 minutes.
The setting controls the maximum amount of time that a hot ranges request
will spend waiting for a node to provide a response.
Set to 0 to disable timeouts.
  • Loading branch information
zachlite committed Jul 28, 2023
1 parent c986d84 commit 4f74c92
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ server.clock.persist_upper_bound_interval duration 0s the interval between persi
server.eventlog.enabled boolean true if set, logged notable events are also stored in the table system.eventlog tenant-rw
server.eventlog.ttl duration 2160h0m0s if nonzero, entries in system.eventlog older than this duration are periodically purged tenant-rw
server.host_based_authentication.configuration string host-based authentication configuration to use during connection authentication tenant-rw
server.hot_ranges.node.timeout duration 5m0s the duration allowed for a single node to return hot range data before the request is cancelled; if set to 0, there is no timeout tenant-rw
server.hsts.enabled boolean false if true, HSTS headers will be sent along with all HTTP requests. The headers will contain a max-age setting of one year. Browsers honoring the header will always use HTTPS to access the DB Console. Ensure that TLS is correctly configured prior to enabling. tenant-rw
server.identity_map.configuration string system-identity to database-username mappings tenant-rw
server.log_gc.max_deletions_per_cycle integer 1000 the maximum number of entries to delete on each purge of log-like system tables tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<tr><td><div id="setting-server-eventlog-enabled" class="anchored"><code>server.eventlog.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, logged notable events are also stored in the table system.eventlog</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-eventlog-ttl" class="anchored"><code>server.eventlog.ttl</code></div></td><td>duration</td><td><code>2160h0m0s</code></td><td>if nonzero, entries in system.eventlog older than this duration are periodically purged</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-host-based-authentication-configuration" class="anchored"><code>server.host_based_authentication.configuration</code></div></td><td>string</td><td><code></code></td><td>host-based authentication configuration to use during connection authentication</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-hot-ranges-node-timeout" class="anchored"><code>server.hot_ranges.node.timeout</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the duration allowed for a single node to return hot range data before the request is cancelled; if set to 0, there is no timeout</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-hsts-enabled" class="anchored"><code>server.hsts.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, HSTS headers will be sent along with all HTTP requests. The headers will contain a max-age setting of one year. Browsers honoring the header will always use HTTPS to access the DB Console. Ensure that TLS is correctly configured prior to enabling.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-identity-map-configuration" class="anchored"><code>server.identity_map.configuration</code></div></td><td>string</td><td><code></code></td><td>system-identity to database-username mappings</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-log-gc-max-deletions-per-cycle" class="anchored"><code>server.log_gc.max_deletions_per_cycle</code></div></td><td>integer</td><td><code>1000</code></td><td>the maximum number of entries to delete on each purge of log-like system tables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"fanout_clients.go",
"grpc_gateway.go",
"grpc_server.go",
"hot_ranges.go",
"import_ts.go",
"index_usage_stats.go",
"init.go",
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/api_v2_ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,9 @@ func (a *apiV2Server) listHotRanges(w http.ResponseWriter, r *http.Request) {
})
}

timeout := HotRangesNodeTimeout.Get(&a.status.st.SV)
next, err := a.status.paginatedIterateNodes(
ctx, "hot ranges", limit, start, requestedNodes, dialFn,
ctx, "hot ranges", limit, start, requestedNodes, timeout, dialFn,
nodeFn, responseFn, errorFn)

if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions pkg/server/hot_ranges.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
)

var HotRangesNodeTimeout = settings.RegisterDurationSetting(
settings.TenantWritable,
"server.hot_ranges.node.timeout",
"the duration allowed for a single node to return hot range data before the request is cancelled; if set to 0, there is no timeout",
time.Minute*5,
settings.NonNegativeDuration,
).WithPublic()
20 changes: 18 additions & 2 deletions pkg/server/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
Expand Down Expand Up @@ -323,10 +324,14 @@ func (r *rpcNodePaginator) init() {
r.responseChan = make(chan paginatedNodeResponse, r.numNodes)
}

const noTimeout time.Duration = 0

// queryNode queries the given node, and sends the responses back through responseChan
// in order of idx (i.e. when all nodes with a lower idx have already sent theirs).
// Safe for concurrent use.
func (r *rpcNodePaginator) queryNode(ctx context.Context, nodeID roachpb.NodeID, idx int) {
func (r *rpcNodePaginator) queryNode(
ctx context.Context, nodeID roachpb.NodeID, idx int, timeout time.Duration,
) {
if atomic.LoadInt32(&r.done) != 0 {
// There are more values than we need. currentLen >= limit.
return
Expand Down Expand Up @@ -379,7 +384,18 @@ func (r *rpcNodePaginator) queryNode(ctx context.Context, nodeID roachpb.NodeID,
return
}

res, err := r.nodeFn(ctx, client, nodeID)
var res interface{}
var err error
if timeout == noTimeout {
res, err = r.nodeFn(ctx, client, nodeID)
} else {
err = timeutil.RunWithTimeout(ctx, "node fn", timeout, func(ctx context.Context) error {
var _err error
res, _err = r.nodeFn(ctx, client, nodeID)
return _err
})
}

if err != nil {
err = errors.Wrapf(err, "error requesting %s from node %d (%s)",
r.errorCtx, nodeID, r.nodeStatuses[serverID(nodeID)])
Expand Down
55 changes: 54 additions & 1 deletion pkg/server/pagination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -389,7 +391,7 @@ func TestRPCPaginator(t *testing.T) {
// Issue requests in parallel.
for idx, nodeID := range nodesToQuery {
go func(nodeID roachpb.NodeID, idx int) {
paginator.queryNode(ctx, nodeID, idx)
paginator.queryNode(ctx, nodeID, idx, noTimeout)
}(nodeID, idx)
}

Expand All @@ -412,3 +414,54 @@ func TestRPCPaginator(t *testing.T) {
}

}

func TestRPCPaginatorWithTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
server, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer server.Stopper().Stop(ctx)

s := server.StatusServer().(*systemStatusServer)

dialFn := func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error) {
client, err := s.dialNode(ctx, nodeID)
return client, err
}
nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) {
select {
case <-time.After(time.Second * 10):
case <-ctx.Done():
break
}

// Return an error that mimics the error returned when a rpc's context is cancelled:
return nil, errors.New("some error")
}
responseFn := func(nodeID roachpb.NodeID, resp interface{}) {
// noop
}

var timeoutError error
errorFn := func(nodeID roachpb.NodeID, err error) {
timeoutError = err
log.Infof(ctx, "error from node %d: %v", nodeID, err)
}

pagState := paginationState{}

_, _ = s.paginatedIterateNodes(
ctx,
"test-paginate-with-timeout",
1000,
pagState,
[]roachpb.NodeID{},
time.Second*2,
dialFn,
nodeFn,
responseFn,
errorFn,
)

require.ErrorContains(t, timeoutError, "operation \"node fn\" timed out")
}
11 changes: 8 additions & 3 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2809,8 +2809,9 @@ func (s *systemStatusServer) HotRangesV2(
response.ErrorsByNodeID[nodeID] = err.Error()
}

timeout := HotRangesNodeTimeout.Get(&s.st.SV)
next, err := s.paginatedIterateNodes(
ctx, "hotRanges", size, start, requestedNodes, dialFn,
ctx, "hotRanges", size, start, requestedNodes, timeout, dialFn,
nodeFn, responseFn, errorFn)

if err != nil {
Expand Down Expand Up @@ -3087,12 +3088,14 @@ func (s *statusServer) iterateNodes(
// and nodeError on every error result. It returns the next `limit` results
// after `start`. If `requestedNodes` is specified and non-empty, iteration is
// only done on that subset of nodes in addition to any nodes already in pagState.
// If non-zero, nodeFn will run with a timeout specified by nodeFnTimeout.
func (s *statusServer) paginatedIterateNodes(
ctx context.Context,
errorCtx string,
limit int,
pagState paginationState,
requestedNodes []roachpb.NodeID,
nodeFnTimeout time.Duration,
dialFn func(ctx context.Context, nodeID roachpb.NodeID) (interface{}, error),
nodeFn func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error),
responseFn func(nodeID roachpb.NodeID, resp interface{}),
Expand Down Expand Up @@ -3154,7 +3157,9 @@ func (s *statusServer) paginatedIterateNodes(
Sem: sem,
WaitForSem: true,
},
func(ctx context.Context) { paginator.queryNode(ctx, nodeID, idx) },
func(ctx context.Context) {
paginator.queryNode(ctx, nodeID, idx, nodeFnTimeout)
},
); err != nil {
return pagState, err
}
Expand Down Expand Up @@ -3208,7 +3213,7 @@ func (s *statusServer) listSessionsHelper(
var err error
var pagState paginationState
if pagState, err = s.paginatedIterateNodes(
ctx, "session list", limit, start, nil, dialFn, nodeFn, responseFn, errorFn); err != nil {
ctx, "session list", limit, start, nil, noTimeout, dialFn, nodeFn, responseFn, errorFn); err != nil {
err := serverpb.ListSessionsError{Message: err.Error()}
response.Errors = append(response.Errors, err)
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/ui/workspaces/db-console/src/views/hotRanges/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ const HotRangesPage = () => {
}
}, [dispatch, isValid]);

useEffect(() => {
dispatch(
refreshHotRanges(
new HotRangesRequest({
page_size: 1000,
}),
),
);
}, [dispatch]);

const [filteredHotRanges, setFilteredHotRanges] =
useState<HotRange[]>(hotRanges);

Expand Down

0 comments on commit 4f74c92

Please sign in to comment.