Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

base: reduce network timeouts #92542

Merged
merged 4 commits into from
Dec 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/acceptance/cluster/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ package cluster
import (
"crypto/tls"
"net/http"

"github.com/cockroachdb/cockroach/pkg/base"
"time"
)

// HTTPClient is an http.Client configured for querying a cluster. We need to
// run with "InsecureSkipVerify" (at least on Docker) due to the fact that we
// cannot use a fixed hostname to reach the cluster. This in turn means that we
// do not have a verified server name in the certs.
var HTTPClient = http.Client{
Timeout: base.NetworkTimeout,
Timeout: 3 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
Expand Down
59 changes: 44 additions & 15 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ const (
defaultSQLAddr = ":" + DefaultPort
defaultHTTPAddr = ":" + DefaultHTTPPort

// NetworkTimeout is the timeout used for network operations.
NetworkTimeout = 3 * time.Second

// defaultRaftTickInterval is the default resolution of the Raft timer.
defaultRaftTickInterval = 200 * time.Millisecond

Expand All @@ -66,10 +63,6 @@ const (
// each heartbeat.
defaultRaftHeartbeatIntervalTicks = 5

// defaultRPCHeartbeatInterval is the default value of RPCHeartbeatIntervalAndHalfTimeout
// used by the rpc context.
defaultRPCHeartbeatInterval = 3 * time.Second

// defaultRangeLeaseRenewalFraction specifies what fraction the range lease
// renewal duration should be of the range lease active time. For example,
// with a value of 0.2 and a lease duration of 10 seconds, leases would be
Expand Down Expand Up @@ -118,6 +111,44 @@ func DefaultHistogramWindowInterval() time.Duration {
}

var (
// NetworkTimeout is the timeout used for network operations that require a
// single network round trip. It is conservatively defined as one maximum
// network round trip time (RTT) plus one TCP packet retransmit (RTO), then
// multiplied by 2 as a safety margin.
//
// The maximum RTT between cloud regions is roughly 350ms both in GCP
// (asia-south2 to southamerica-west1) and AWS (af-south-1 to sa-east-1). It
// can occasionally be up to 500ms, but 400ms is a reasonable upper bound
// under nominal conditions.
// https://datastudio.google.com/reporting/fc733b10-9744-4a72-a502-92290f608571/page/70YCB
// https://www.cloudping.co/grid/p_99/timeframe/1W
//
// Linux has an RTT-dependant retransmission timeout (RTO) which we can
// approximate as 1.5x RTT (smoothed RTT + 4x RTT variance), with a lower
// bound of 200ms. Under nominal conditions, this is approximately 600ms.
//
// The maximum p99 RPC heartbeat latency in any Cockroach Cloud cluster over a
// 90-day period was 557ms. This was a single-region US cluster, where the
// high latency appeared to be due to CPU overload or throttling: the cluster
// had 2 vCPU nodes running at 100%.
//
// The NetworkTimeout is thus set to 2 * (400ms + 600ms) = 2s.
//
// TODO(erikgrinaker): Consider reducing this to 1 second, which should be
// sufficient but may be fragile under latency fluctuations.
NetworkTimeout = envutil.EnvOrDefaultDuration("COCKROACH_NETWORK_TIMEOUT", 2*time.Second)

// DialTimeout is the timeout used when dialing a node. gRPC connections take
// up to 3 roundtrips for the TCP + TLS handshakes. Because NetworkTimeout
// allows for both a network roundtrip (RTT) and a TCP retransmit (RTO), with
// the RTO being greater than the RTT, and we don't need to tolerate more than
// 1 retransmit per connection attempt, 2 * NetworkTimeout is sufficient.
DialTimeout = 2 * NetworkTimeout

// defaultRPCHeartbeatIntervalAndTimeout is the default value of
// RPCHeartbeatIntervalAndTimeout used by the RPC context.
defaultRPCHeartbeatIntervalAndTimeout = NetworkTimeout

// defaultRaftElectionTimeoutTicks specifies the number of Raft Tick
// invocations that must pass between elections.
defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
Expand Down Expand Up @@ -221,13 +252,11 @@ type Config struct {
// This is computed from HTTPAddr if specified otherwise Addr.
HTTPAdvertiseAddr string

// RPCHeartbeatIntervalAndHalfTimeout controls how often a Ping request is
// sent on peer connections to determine connection health and update the
// local view of remote clocks.
//
// Twice this value is used as a timeout for heartbeats, so don't set this too
// low.
RPCHeartbeatIntervalAndHalfTimeout time.Duration
// RPCHeartbeatIntervalAndTimeout controls how often a Ping request is sent on
// peer connections to determine connection health and update the local view
// of remote clocks. This is also used as a timeout for heartbeats, so don't
// set this too low.
RPCHeartbeatIntervalAndTimeout time.Duration

// SecondaryTenantPortOffset is the increment to add to the various
// addresses to generate the network configuration for the in-memory
Expand Down Expand Up @@ -285,7 +314,7 @@ func (cfg *Config) InitDefaults() {
cfg.SQLAdvertiseAddr = cfg.SQLAddr
cfg.SocketFile = ""
cfg.SSLCertsDir = DefaultCertsDirectory
cfg.RPCHeartbeatIntervalAndHalfTimeout = defaultRPCHeartbeatInterval
cfg.RPCHeartbeatIntervalAndTimeout = defaultRPCHeartbeatIntervalAndTimeout
cfg.ClusterName = ""
cfg.DisableClusterNameVerification = false
cfg.ClockDevicePath = ""
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
Expand Down Expand Up @@ -1426,9 +1425,10 @@ func (rq *replicateQueue) findRemoveVoter(
MaxBackoff: 200 * time.Millisecond,
Multiplier: 2,
}
timeout := 5 * time.Second

var candidates []roachpb.ReplicaDescriptor
deadline := timeutil.Now().Add(2 * base.NetworkTimeout)
deadline := timeutil.Now().Add(timeout)
for r := retry.StartWithCtx(ctx, retryOpts); r.Next() && timeutil.Now().Before(deadline); {
lastReplAdded, lastAddedTime := repl.LastReplicaAdded()
if timeutil.Since(lastAddedTime) > newReplicaGracePeriod {
Expand Down
3 changes: 2 additions & 1 deletion pkg/rpc/addjoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
Expand All @@ -38,7 +39,7 @@ func GetAddJoinDialOptions(certPool *x509.CertPool) []grpc.DialOption {
backoffConfig.MaxDelay = time.Second
dialOpts = append(dialOpts, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfig,
MinConnectTimeout: minConnectionTimeout}))
MinConnectTimeout: base.DialTimeout}))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive))
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
Expand Down
22 changes: 9 additions & 13 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ var (
"COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128K */)
)

// GRPC Dialer connection timeout.
var minConnectionTimeout = 5 * time.Second

// errDialRejected is returned from client interceptors when the server's
// stopper is quiescing. The error is constructed to return true in
// `grpcutil.IsConnectionRejected` which prevents infinite retry loops during
Expand Down Expand Up @@ -574,15 +571,15 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
rpcCompression: enableRPCCompression,
MasterCtx: masterCtx,
metrics: makeMetrics(),
heartbeatTimeout: 2 * opts.Config.RPCHeartbeatIntervalAndHalfTimeout,
heartbeatTimeout: opts.Config.RPCHeartbeatIntervalAndTimeout,
logClosingConnEvery: log.Every(time.Second),
}

// We only monitor remote clocks in server-to-server connections.
// CLI commands are exempted.
if !opts.ClientOnly {
rpcCtx.RemoteClocks = newRemoteClockMonitor(
opts.Clock, opts.MaxOffset, 10*opts.Config.RPCHeartbeatIntervalAndHalfTimeout, opts.Config.HistogramWindowInterval())
opts.Clock, opts.MaxOffset, 10*opts.Config.RPCHeartbeatIntervalAndTimeout, opts.Config.HistogramWindowInterval())
}

if id := opts.Knobs.StorageClusterID; id != nil {
Expand Down Expand Up @@ -1751,18 +1748,17 @@ func (rpcCtx *Context) grpcDialRaw(
// our setup with onlyOnceDialer below. So note that our choice here is
// inconsequential assuming all works as designed.
backoff := time.Second
if backoff > minConnectionTimeout {
// This is for testing where we set a small minConnectionTimeout.
// gRPC will internally round up the min connection timeout to the max
// backoff. This can be unintuitive and so we opt out of it by lowering the
// max backoff.
backoff = minConnectionTimeout
if backoff > base.DialTimeout {
// This is for testing where we set a small DialTimeout. gRPC will
// internally round up the min connection timeout to the max backoff. This
// can be unintuitive and so we opt out of it by lowering the max backoff.
backoff = base.DialTimeout
}
backoffConfig.BaseDelay = backoff
backoffConfig.MaxDelay = backoff
dialOpts = append(dialOpts, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfig,
MinConnectTimeout: minConnectionTimeout}))
MinConnectTimeout: base.DialTimeout}))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive))
dialOpts = append(dialOpts, grpc.WithInitialConnWindowSize(initialConnWindowSize))
if class == RangefeedClass {
Expand Down Expand Up @@ -2197,7 +2193,7 @@ func (rpcCtx *Context) runHeartbeat(
})
}

heartbeatTimer.Reset(rpcCtx.Config.RPCHeartbeatIntervalAndHalfTimeout)
heartbeatTimer.Reset(rpcCtx.Config.RPCHeartbeatIntervalAndTimeout)
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func testClockOffsetInPingRequestInternal(t *testing.T, clientOnly bool) {
clientOpts := opts
clientOpts.Config = testutils.NewNodeTestBaseContext()
// Experimentally, values below 50ms seem to incur flakiness.
clientOpts.Config.RPCHeartbeatIntervalAndHalfTimeout = 100 * time.Millisecond
clientOpts.Config.RPCHeartbeatIntervalAndTimeout = 100 * time.Millisecond
clientOpts.ClientOnly = clientOnly
clientOpts.OnOutgoingPing = func(ctx context.Context, req *PingRequest) error {
select {
Expand Down Expand Up @@ -897,7 +897,7 @@ func TestHeartbeatHealth(t *testing.T) {
clientCtx.Config.AdvertiseAddr = lisLocalServer.Addr().String()

// Make the interval shorter to speed up the test.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = 1 * time.Millisecond
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = 1 * time.Millisecond

m := clientCtx.Metrics()

Expand Down Expand Up @@ -1142,7 +1142,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {

clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
// Make the interval shorter to speed up the test.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = 1 * time.Millisecond
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = 1 * time.Millisecond
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(context.Background()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1253,7 +1253,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {
})

// Should stay unhealthy despite reconnection attempts.
for then := timeutil.Now(); timeutil.Since(then) < 50*clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout; {
for then := timeutil.Now(); timeutil.Since(then) < 50*clientCtx.Config.RPCHeartbeatIntervalAndTimeout; {
err := clientCtx.TestingConnHealth(remoteAddr, serverNodeID)
if !isUnhealthy(err) {
t.Fatal(err)
Expand Down Expand Up @@ -1297,7 +1297,7 @@ func TestOffsetMeasurement(t *testing.T) {
clientMaxOffset := time.Duration(0)
clientCtx := newTestContext(clusterID, clientClock, clientMaxOffset, stopper)
// Make the interval shorter to speed up the test.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = 1 * time.Millisecond
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = 1 * time.Millisecond
clientCtx.RemoteClocks.offsetTTL = 5 * clientClock.getAdvancementInterval()
if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1459,7 +1459,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
clock := timeutil.NewManualTime(timeutil.Unix(0, start.Add(nodeCtxs[i].offset).UnixNano()))
nodeCtxs[i].errChan = make(chan error, 1)
nodeCtxs[i].ctx = newTestContext(clusterID, clock, maxOffset, stopper)
nodeCtxs[i].ctx.Config.RPCHeartbeatIntervalAndHalfTimeout = maxOffset
nodeCtxs[i].ctx.Config.RPCHeartbeatIntervalAndTimeout = maxOffset
nodeCtxs[i].ctx.NodeID.Set(context.Background(), roachpb.NodeID(i+1))

s := newTestServer(t, nodeCtxs[i].ctx)
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase)
log.Infof(ctx, "setting up client")
clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
// Disable automatic heartbeats. We'll send them by hand.
clientCtx.Config.RPCHeartbeatIntervalAndHalfTimeout = math.MaxInt64
clientCtx.Config.RPCHeartbeatIntervalAndTimeout = math.MaxInt64

var firstConn int32 = 1

Expand Down
7 changes: 4 additions & 3 deletions pkg/rpc/down_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -33,9 +34,9 @@ func TestConnectingToDownNode(t *testing.T) {

{
defer func(prev time.Duration) {
minConnectionTimeout = prev
}(minConnectionTimeout)
minConnectionTimeout = time.Millisecond
base.DialTimeout = prev
}(base.DialTimeout)
base.DialTimeout = time.Millisecond
}

testutils.RunTrueAndFalse(t, "refused", func(t *testing.T, refused bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/nodedialer/nodedialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func newTestContext(
) *rpc.Context {
cfg := testutils.NewNodeTestBaseContext()
cfg.Insecure = true
cfg.RPCHeartbeatIntervalAndHalfTimeout = 100 * time.Millisecond
cfg.RPCHeartbeatIntervalAndTimeout = 100 * time.Millisecond
ctx := context.Background()
rctx := rpc.NewContext(ctx, rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Expand Down
3 changes: 0 additions & 3 deletions pkg/security/tls_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ var ocspMode = settings.RegisterEnumSetting(
"and in lax mode all certificates will be accepted.",
"off", map[int64]string{ocspOff: "off", ocspLax: "lax", ocspStrict: "strict"}).WithPublic()

// TODO(bdarnell): 3 seconds is the same as base.NetworkTimeout, but
// we can't use it here due to import cycles. We need a real
// no-dependencies base package for constants like this.
var ocspTimeout = settings.RegisterDurationSetting(
settings.TenantWritable, "security.ocsp.timeout",
"timeout before considering the OCSP server unreachable",
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ func (s *adminServer) statsForSpan(
func(ctx context.Context) {
// Set a generous timeout on the context for each individual query.
var spanResponse *serverpb.SpanStatsResponse
err := contextutil.RunWithTimeout(ctx, "request remote stats", 5*base.NetworkTimeout,
err := contextutil.RunWithTimeout(ctx, "request remote stats", 20*time.Second,
func(ctx context.Context) error {
client, err := s.server.status.dialNode(ctx, nodeID)
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (r *rpcNodePaginator) queryNode(ctx context.Context, nodeID roachpb.NodeID,
r.mu.currentIdx++
r.mu.turnCond.Broadcast()
}
if err := contextutil.RunWithTimeout(ctx, "dial node", base.NetworkTimeout, func(ctx context.Context) error {
if err := contextutil.RunWithTimeout(ctx, "dial node", base.DialTimeout, func(ctx context.Context) error {
var err error
client, err = r.dialFn(ctx, nodeID)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func makeInMemoryTenantServerConfig(
baseCfg.Config.User = kvServerCfg.Config.User
baseCfg.Config.DisableTLSForHTTP = kvServerCfg.Config.DisableTLSForHTTP
baseCfg.Config.AcceptSQLWithoutTLS = kvServerCfg.Config.AcceptSQLWithoutTLS
baseCfg.Config.RPCHeartbeatIntervalAndHalfTimeout = kvServerCfg.Config.RPCHeartbeatIntervalAndHalfTimeout
baseCfg.Config.RPCHeartbeatIntervalAndTimeout = kvServerCfg.Config.RPCHeartbeatIntervalAndTimeout
baseCfg.Config.ClockDevicePath = kvServerCfg.Config.ClockDevicePath
baseCfg.Config.ClusterName = kvServerCfg.Config.ClusterName
baseCfg.Config.DisableClusterNameVerification = kvServerCfg.Config.DisableClusterNameVerification
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func (s *statusServer) AllocatorRange(
ctx,
"server.statusServer: requesting remote Allocator simulation",
func(ctx context.Context) {
_ = contextutil.RunWithTimeout(ctx, "allocator range", base.NetworkTimeout, func(ctx context.Context) error {
_ = contextutil.RunWithTimeout(ctx, "allocator range", 3*time.Second, func(ctx context.Context) error {
status, err := s.dialNode(ctx, nodeID)
var allocatorResponse *serverpb.AllocatorResponse
if err == nil {
Expand Down Expand Up @@ -2681,7 +2681,7 @@ func (s *statusServer) iterateNodes(

nodeQuery := func(ctx context.Context, nodeID roachpb.NodeID) {
var client interface{}
err := contextutil.RunWithTimeout(ctx, "dial node", base.NetworkTimeout, func(ctx context.Context) error {
err := contextutil.RunWithTimeout(ctx, "dial node", base.DialTimeout, func(ctx context.Context) error {
var err error
client, err = dialFn(ctx, nodeID)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func (t *tenantStatusServer) iteratePods(

instanceQuery := func(ctx context.Context, instance sqlinstance.InstanceInfo) {
var client interface{}
err := contextutil.RunWithTimeout(ctx, "dial instance", base.NetworkTimeout, func(ctx context.Context) error {
err := contextutil.RunWithTimeout(ctx, "dial instance", base.DialTimeout, func(ctx context.Context) error {
var err error
client, err = dialFn(ctx, instance.InstanceID, instance.InstanceAddr)
return err
Expand Down