Skip to content

Commit

Permalink
Merge #92542
Browse files Browse the repository at this point in the history
92542: base: reduce network timeouts r=erikgrinaker a=erikgrinaker

***: don't use `NetworkTimeout` where inappropriate**

`NetworkTimeout` should be used for network roundtrip timeouts, not for request processing timeouts.

Release note: None
  
**rpc: unify heartbeat interval and timeout**

Previously, the RPC heartbeat timeout (6s) was set to twice the heartbeat interval (3s). This is rather excessive, so this patch sets them to an equal value of 3s.

Release note: None
  
**base: add `DialTimeout`**

This patch adds a `DialTimeout` constant, set to `2 * NetworkTimeout` to account for the additional roundtrips in TCP + TLS handshakes.

**base: reduce network timeouts**

This patch reduces the network timeout from 3 seconds to 2 seconds. This change also affects gRPC keepalive intervals/timeouts (3 to 2 seconds), RPC heartbeats and timeouts (3 to 2 seconds), and the gRPC dial timeout (6 to 4 seconds).

When a peer is unresponsive, these timeouts determine how quickly RPC calls (and thus critical operations such as lease acquisitions) will be retried against a different node. Reducing them therefore improves recovery time during infrastructure outages.

An environment variable `COCKROACH_NETWORK_TIMEOUT` has been introduced to tweak this timeout if needed.

Touches #79494.
Epic: None.

Release note (ops change): The network timeout for RPC connections between cluster nodes has been reduced from 3 seconds to 2 seconds, with a connection timeout of 4 seconds, in order to reduce unavailability and tail latencies during infrastructure outages. This can now be changed via the environment variable `COCKROACH_NETWORK_TIMEOUT` which defaults to `2s`.

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Dec 4, 2022
2 parents 112bcf4 + 65a2bc3 commit 8a5cb51
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 54 deletions.
5 changes: 2 additions & 3 deletions pkg/acceptance/cluster/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ package cluster
import (
"crypto/tls"
"net/http"

"github.com/cockroachdb/cockroach/pkg/base"
"time"
)

// HTTPClient is an http.Client configured for querying a cluster. We need to
// run with "InsecureSkipVerify" (at least on Docker) due to the fact that we
// cannot use a fixed hostname to reach the cluster. This in turn means that we
// do not have a verified server name in the certs.
var HTTPClient = http.Client{
Timeout: base.NetworkTimeout,
Timeout: 3 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
Expand Down
59 changes: 44 additions & 15 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ const (
defaultSQLAddr = ":" + DefaultPort
defaultHTTPAddr = ":" + DefaultHTTPPort

// NetworkTimeout is the timeout used for network operations.
NetworkTimeout = 3 * time.Second

// defaultRaftTickInterval is the default resolution of the Raft timer.
defaultRaftTickInterval = 200 * time.Millisecond

Expand All @@ -66,10 +63,6 @@ const (
// each heartbeat.
defaultRaftHeartbeatIntervalTicks = 5

// defaultRPCHeartbeatInterval is the default value of RPCHeartbeatIntervalAndHalfTimeout
// used by the rpc context.
defaultRPCHeartbeatInterval = 3 * time.Second

// defaultRangeLeaseRenewalFraction specifies what fraction the range lease
// renewal duration should be of the range lease active time. For example,
// with a value of 0.2 and a lease duration of 10 seconds, leases would be
Expand Down Expand Up @@ -118,6 +111,44 @@ func DefaultHistogramWindowInterval() time.Duration {
}

var (
// NetworkTimeout is the timeout used for network operations that require a
// single network round trip. It is conservatively defined as one maximum
// network round trip time (RTT) plus one TCP packet retransmit (RTO), then
// multiplied by 2 as a safety margin.
//
// The maximum RTT between cloud regions is roughly 350ms both in GCP
// (asia-south2 to southamerica-west1) and AWS (af-south-1 to sa-east-1). It
// can occasionally be up to 500ms, but 400ms is a reasonable upper bound
// under nominal conditions.
// https://datastudio.google.com/reporting/fc733b10-9744-4a72-a502-92290f608571/page/70YCB
// https://www.cloudping.co/grid/p_99/timeframe/1W
//
// Linux has an RTT-dependant retransmission timeout (RTO) which we can
// approximate as 1.5x RTT (smoothed RTT + 4x RTT variance), with a lower
// bound of 200ms. Under nominal conditions, this is approximately 600ms.
//
// The maximum p99 RPC heartbeat latency in any Cockroach Cloud cluster over a
// 90-day period was 557ms. This was a single-region US cluster, where the
// high latency appeared to be due to CPU overload or throttling: the cluster
// had 2 vCPU nodes running at 100%.
//
// The NetworkTimeout is thus set to 2 * (400ms + 600ms) = 2s.
//
// TODO(erikgrinaker): Consider reducing this to 1 second, which should be
// sufficient but may be fragile under latency fluctuations.
NetworkTimeout = envutil.EnvOrDefaultDuration("COCKROACH_NETWORK_TIMEOUT", 2*time.Second)

// DialTimeout is the timeout used when dialing a node. gRPC connections take
// up to 3 roundtrips for the TCP + TLS handshakes. Because NetworkTimeout
// allows for both a network roundtrip (RTT) and a TCP retransmit (RTO), with
// the RTO being greater than the RTT, and we don't need to tolerate more than
// 1 retransmit per connection attempt, 2 * NetworkTimeout is sufficient.
DialTimeout = 2 * NetworkTimeout

// defaultRPCHeartbeatIntervalAndTimeout is the default value of
// RPCHeartbeatIntervalAndTimeout used by the RPC context.
defaultRPCHeartbeatIntervalAndTimeout = NetworkTimeout

// defaultRaftElectionTimeoutTicks specifies the number of Raft Tick
// invocations that must pass between elections.
defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
Expand Down Expand Up @@ -221,13 +252,11 @@ type Config struct {
// This is computed from HTTPAddr if specified otherwise Addr.
HTTPAdvertiseAddr string

// RPCHeartbeatIntervalAndHalfTimeout controls how often a Ping request is
// sent on peer connections to determine connection health and update the
// local view of remote clocks.
//
// Twice this value is used as a timeout for heartbeats, so don't set this too
// low.
RPCHeartbeatIntervalAndHalfTimeout time.Duration
// RPCHeartbeatIntervalAndTimeout controls how often a Ping request is sent on
// peer connections to determine connection health and update the local view
// of remote clocks. This is also used as a timeout for heartbeats, so don't
// set this too low.
RPCHeartbeatIntervalAndTimeout time.Duration

// SecondaryTenantPortOffset is the increment to add to the various
// addresses to generate the network configuration for the in-memory
Expand Down Expand Up @@ -285,7 +314,7 @@ func (cfg *Config) InitDefaults() {
cfg.SQLAdvertiseAddr = cfg.SQLAddr
cfg.SocketFile = ""
cfg.SSLCertsDir = DefaultCertsDirectory
cfg.RPCHeartbeatIntervalAndHalfTimeout = defaultRPCHeartbeatInterval
cfg.RPCHeartbeatIntervalAndTimeout = defaultRPCHeartbeatIntervalAndTimeout
cfg.ClusterName = ""
cfg.DisableClusterNameVerification = false
cfg.ClockDevicePath = ""
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
Expand Down Expand Up @@ -1426,9 +1425,10 @@ func (rq *replicateQueue) findRemoveVoter(
MaxBackoff: 200 * time.Millisecond,
Multiplier: 2,
}
timeout := 5 * time.Second

var candidates []roachpb.ReplicaDescriptor
deadline := timeutil.Now().Add(2 * base.NetworkTimeout)
deadline := timeutil.Now().Add(timeout)
for r := retry.StartWithCtx(ctx, retryOpts); r.Next() && timeutil.Now().Before(deadline); {
lastReplAdded, lastAddedTime := repl.LastReplicaAdded()
if timeutil.Since(lastAddedTime) > newReplicaGracePeriod {
Expand Down
3 changes: 2 additions & 1 deletion pkg/rpc/addjoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
Expand All @@ -38,7 +39,7 @@ func GetAddJoinDialOptions(certPool *x509.CertPool) []grpc.DialOption {
backoffConfig.MaxDelay = time.Second
dialOpts = append(dialOpts, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfig,
MinConnectTimeout: minConnectionTimeout}))
MinConnectTimeout: base.DialTimeout}))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive))
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
Expand Down
22 changes: 9 additions & 13 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ var (
"COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128K */)
)

// GRPC Dialer connection timeout.
var minConnectionTimeout = 5 * time.Second

// errDialRejected is returned from client interceptors when the server's
// stopper is quiescing. The error is constructed to return true in
// `grpcutil.IsConnectionRejected` which prevents infinite retry loops during
Expand Down Expand Up @@ -574,15 +571,15 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
rpcCompression: enableRPCCompression,
MasterCtx: masterCtx,
metrics: makeMetrics(),
heartbeatTimeout: 2 * opts.Config.RPCHeartbeatIntervalAndHalfTimeout,
heartbeatTimeout: opts.Config.RPCHeartbeatIntervalAndTimeout,
logClosingConnEvery: log.Every(time.Second),
}

// We only monitor remote clocks in server-to-server connections.
// CLI commands are exempted.
if !opts.ClientOnly {
rpcCtx.RemoteClocks = newRemoteClockMonitor(
opts.Clock, opts.MaxOffset, 10*opts.Config.RPCHeartbeatIntervalAndHalfTimeout, opts.Config.HistogramWindowInterval())
opts.Clock, opts.MaxOffset, 10*opts.Config.RPCHeartbeatIntervalAndTimeout, opts.Config.HistogramWindowInterval())
}

if id := opts.Knobs.StorageClusterID; id != nil {
Expand Down Expand Up @@ -1751,18 +1748,17 @@ func (rpcCtx *Context) grpcDialRaw(
// our setup with onlyOnceDialer below. So note that our choice here is
// inconsequential assuming all works as designed.
backoff := time.Second
if backoff > minConnectionTimeout {
// This is for testing where we set a small minConnectionTimeout.
// gRPC will internally round up the min connection timeout to the max
// backoff. This can be unintuitive and so we opt out of it by lowering the
// max backoff.
backoff = minConnectionTimeout
if backoff > base.DialTimeout {
// This is for testing where we set a small DialTimeout. gRPC will
// internally round up the min connection timeout to the max backoff. This
// can be unintuitive and so we opt out of it by lowering the max backoff.
backoff = base.DialTimeout
}
backoffConfig.BaseDelay = backoff
backoffConfig.MaxDelay = backoff
dialOpts = append(dialOpts, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfig,
MinConnectTimeout: minConnectionTimeout}))
MinConnectTimeout: base.DialTimeout}))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive))
dialOpts = append(dialOpts, grpc.WithInitialConnWindowSize(initialConnWindowSize))
if class == RangefeedClass {
Expand Down Expand Up @@ -2197,7 +2193,7 @@ func (rpcCtx *Context) runHeartbeat(
})
}

heartbeatTimer.Reset(rpcCtx.Config.RPCHeartbeatIntervalAndHalfTimeout)
heartbeatTimer.Reset(rpcCtx.Config.RPCHeartbeatIntervalAndTimeout)
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func testClockOffsetInPingRequestInternal(t *testing.T, clientOnly bool) {
clientOpts := opts
clientOpts.Config = testutils.NewNodeTestBaseContext()
// Experimentally, values below 50ms seem to incur flakiness.
clientOpts.Config.RPCHeartbeatIntervalAndHalfTimeout = 100 * time.Millisecond
clientOpts.Config.RPCHeartbeatIntervalAndTimeout = 100 * time.Millisecond
clientOpts.ClientOnly = clientOnly
clientOpts.OnOutgoingPing = func(ctx context.Context, req *PingRequest) error {
select {
Expand Down Expand Up @@ -903,7 +903,7 @@ func TestHeartbeatHealth(t *testing.T) {
clientCtx.Config.AdvertiseAddr = lisLocalServer.Addr().String()

// Make the interval shorter to speed up the test.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = 1 * time.Millisecond
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = 1 * time.Millisecond

m := clientCtx.Metrics()

Expand Down Expand Up @@ -1148,7 +1148,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {

clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
// Make the interval shorter to speed up the test.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = 1 * time.Millisecond
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = 1 * time.Millisecond
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1259,7 +1259,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {
})

// Should stay unhealthy despite reconnection attempts.
for then := timeutil.Now(); timeutil.Since(then) < 50*clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout; {
for then := timeutil.Now(); timeutil.Since(then) < 50*clientCtx.Config.RPCHeartbeatIntervalAndTimeout; {
err := clientCtx.TestingConnHealth(remoteAddr, serverNodeID)
if !isUnhealthy(err) {
t.Fatal(err)
Expand Down Expand Up @@ -1303,7 +1303,7 @@ func TestOffsetMeasurement(t *testing.T) {
clientMaxOffset := time.Duration(0)
clientCtx := newTestContext(clusterID, clientClock, clientMaxOffset, stopper)
// Make the interval shorter to speed up the test.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = 1 * time.Millisecond
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = 1 * time.Millisecond
clientCtx.RemoteClocks.offsetTTL = 5 * clientClock.getAdvancementInterval()
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1465,7 +1465,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
clock := timeutil.NewManualTime(timeutil.Unix(0, start.Add(nodeCtxs[i].offset).UnixNano()))
nodeCtxs[i].errChan = make(chan error, 1)
nodeCtxs[i].ctx = newTestContext(clusterID, clock, maxOffset, stopper)
nodeCtxs[i].ctx.Config.RPCHeartbeatIntervalAndHalfTimeout = maxOffset
nodeCtxs[i].ctx.Config.RPCHeartbeatIntervalAndTimeout = maxOffset
nodeCtxs[i].ctx.NodeID.Set(context.Background(), roachpb.NodeID(i+1))

s := newTestServer(t, nodeCtxs[i].ctx)
Expand Down Expand Up @@ -1681,7 +1681,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
log.Infof(ctx, "setting up client")
clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
// Disable automatic heartbeats. We'll send them by hand.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = math.MaxInt64
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = math.MaxInt64

var firstConn int32 = 1

Expand Down
7 changes: 4 additions & 3 deletions pkg/rpc/down_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -33,9 +34,9 @@ func TestConnectingToDownNode(t *testing.T) {

{
defer func(prev time.Duration) {
minConnectionTimeout = prev
}(minConnectionTimeout)
minConnectionTimeout = time.Millisecond
base.DialTimeout = prev
}(base.DialTimeout)
base.DialTimeout = time.Millisecond
}

testutils.RunTrueAndFalse(t, "refused", func(t *testing.T, refused bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func newTestContext(
) *rpc.Context {
cfg := testutils.NewNodeTestBaseContext()
cfg.Insecure = true
cfg.RPCHeartbeatIntervalAndHalfTimeout = 100 * time.Millisecond
cfg.RPCHeartbeatIntervalAndTimeout = 100 * time.Millisecond
ctx := context.Background()
rctx := rpc.NewContext(ctx, rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Expand Down
3 changes: 0 additions & 3 deletions pkg/security/tls_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ var ocspMode = settings.RegisterEnumSetting(
"and in lax mode all certificates will be accepted.",
"off", map[int64]string{ocspOff: "off", ocspLax: "lax", ocspStrict: "strict"}).WithPublic()

// TODO(bdarnell): 3 seconds is the same as base.NetworkTimeout, but
// we can't use it here due to import cycles. We need a real
// no-dependencies base package for constants like this.
var ocspTimeout = settings.RegisterDurationSetting(
settings.TenantWritable, "security.ocsp.timeout",
"timeout before considering the OCSP server unreachable",
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ func (s *adminServer) statsForSpan(
func(ctx context.Context) {
// Set a generous timeout on the context for each individual query.
var spanResponse *serverpb.SpanStatsResponse
err := contextutil.RunWithTimeout(ctx, "request remote stats", 5*base.NetworkTimeout,
err := contextutil.RunWithTimeout(ctx, "request remote stats", 20*time.Second,
func(ctx context.Context) error {
client, err := s.server.status.dialNode(ctx, nodeID)
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (r *rpcNodePaginator) queryNode(ctx context.Context, nodeID roachpb.NodeID,
r.mu.currentIdx++
r.mu.turnCond.Broadcast()
}
if err := contextutil.RunWithTimeout(ctx, "dial node", base.NetworkTimeout, func(ctx context.Context) error {
if err := contextutil.RunWithTimeout(ctx, "dial node", base.DialTimeout, func(ctx context.Context) error {
var err error
client, err = r.dialFn(ctx, nodeID)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func makeInMemoryTenantServerConfig(
baseCfg.Config.User = kvServerCfg.Config.User
baseCfg.Config.DisableTLSForHTTP = kvServerCfg.Config.DisableTLSForHTTP
baseCfg.Config.AcceptSQLWithoutTLS = kvServerCfg.Config.AcceptSQLWithoutTLS
baseCfg.Config.RPCHeartbeatIntervalAndHalfTimeout = kvServerCfg.Config.RPCHeartbeatIntervalAndHalfTimeout
baseCfg.Config.RPCHeartbeatIntervalAndTimeout = kvServerCfg.Config.RPCHeartbeatIntervalAndTimeout
baseCfg.Config.ClockDevicePath = kvServerCfg.Config.ClockDevicePath
baseCfg.Config.ClusterName = kvServerCfg.Config.ClusterName
baseCfg.Config.DisableClusterNameVerification = kvServerCfg.Config.DisableClusterNameVerification
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func (s *statusServer) AllocatorRange(
ctx,
"server.statusServer: requesting remote Allocator simulation",
func(ctx context.Context) {
_ = contextutil.RunWithTimeout(ctx, "allocator range", base.NetworkTimeout, func(ctx context.Context) error {
_ = contextutil.RunWithTimeout(ctx, "allocator range", 3*time.Second, func(ctx context.Context) error {
status, err := s.dialNode(ctx, nodeID)
var allocatorResponse *serverpb.AllocatorResponse
if err == nil {
Expand Down Expand Up @@ -2664,7 +2664,7 @@ func (s *statusServer) iterateNodes(

nodeQuery := func(ctx context.Context, nodeID roachpb.NodeID) {
var client interface{}
err := contextutil.RunWithTimeout(ctx, "dial node", base.NetworkTimeout, func(ctx context.Context) error {
err := contextutil.RunWithTimeout(ctx, "dial node", base.DialTimeout, func(ctx context.Context) error {
var err error
client, err = dialFn(ctx, nodeID)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func (t *tenantStatusServer) iteratePods(

instanceQuery := func(ctx context.Context, instance sqlinstance.InstanceInfo) {
var client interface{}
err := contextutil.RunWithTimeout(ctx, "dial instance", base.NetworkTimeout, func(ctx context.Context) error {
err := contextutil.RunWithTimeout(ctx, "dial instance", base.DialTimeout, func(ctx context.Context) error {
var err error
client, err = dialFn(ctx, instance.InstanceID, instance.InstanceAddr)
return err
Expand Down

0 comments on commit 8a5cb51

Please sign in to comment.