Skip to content

Commit

Permalink
Merge #94692
Browse files Browse the repository at this point in the history
94692: kvserver: use MaxInflightBytes raft config option r=erikgrinaker,tbg a=pavelkalinnikov

This change starts using the raft's `MaxInflightBytes` option which limits the inflight total byte size of the log entries sent via `MsgApp`. The default limit is set to a conservatively large value, to not change the current behaviour for now.

Part of #90314
Epic: none
Release note (ops change): Added COCKROACH_RAFT_MAX_INFLIGHT_BYTES env variable which helps strictly limiting inflight traffic from a Raft leader to followers, particularly in situations when many large messages are sent and significantly exceed COCKROACH_RAFT_MAX_SIZE_PER_MSG * COCKROACH_RAFT_MAX_INFLIGHT_MSGS which is a softer limit.

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Jan 16, 2023
2 parents d20be50 + 1c41f8d commit 4ed157d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 7 deletions.
78 changes: 71 additions & 7 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package base

import (
"context"
"math"
"math/big"
"net/url"
"os"
"time"
Expand Down Expand Up @@ -252,6 +254,24 @@ var (
// will send to a given follower without hearing a response.
defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 128)

// defaultRaftMaxInflightBytes specifies the maximum aggregate byte size of
// Raft log entries that a leader will send to a follower without hearing
// responses.
//
// Previously it was assumed that RaftMaxInflightMsgs * RaftMaxSizePerMsg is a
// proxy for the actual max inflight traffic. However, RaftMaxSizePerMsg is
// not a hard limit, it's rather a "target" size for the message, so the
// actual inflight bytes could exceed this product by a large factor.
// RaftMaxInflightBytes is a more accurate limit, and should be used in
// conjunction with the two.
//
// TODO(#90314): lower this limit to something close to max rates observed in
// healthy clusters. Currently, this is a conservatively large multiple of
// defaultRaftMaxInflightMsgs * defaultRaftMaxSizePerMsg, so that we don't
// abruptly break the previous assumption and cut off traffic.
defaultRaftMaxInflightBytes = envutil.EnvOrDefaultBytes(
"COCKROACH_RAFT_MAX_INFLIGHT_BYTES", 256<<20 /* 256 MB */)
)

// Config is embedded by server.Config. A base config is not meant to be used
Expand Down Expand Up @@ -469,25 +489,45 @@ type RaftConfig struct {
// value lowers the raft recovery cost (during initial probing and after
// message loss during normal operation). On the other hand, it limits the
// throughput during normal replication.
//
// Used in combination with RaftMaxInflightMsgs and RaftMaxInflightBytes.
RaftMaxSizePerMsg uint64

// RaftMaxCommittedSizePerReady controls the maximum aggregate byte size of
// committed Raft log entries a replica will receive in a single Ready.
RaftMaxCommittedSizePerReady uint64

// RaftMaxInflightMsgs controls how many "inflight" MsgApps Raft will send
// to a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and RaftMaxSizePerMsg. The
// current default settings provide for up to 4 MB of raft log to be sent
// without acknowledgement. With an average entry size of 1 KB that
// translates to ~4096 commands that might be executed in the handling of a
// single raft.Ready operation.
// RaftMaxInflightMsgs controls how many "inflight" MsgApps Raft will send to
// a follower without hearing a response. The total size of inflight Raft log
// entries is thus roughly limited by RaftMaxInflightMsgs * RaftMaxSizePerMsg,
// but also by RaftMaxInflightBytes. The current default settings provide for
// up to 4 MB of Raft log to be sent without acknowledgement. With an average
// entry size of 1 KB that translates to ~4096 commands that might be executed
// in the handling of a single raft.Ready operation.
//
// This setting is used both by sending and receiving end of Raft messages. To
// minimize dropped messages on the receiver, its size should at least match
// the sender's (being it the default size, or taken from the env variables).
RaftMaxInflightMsgs int

// RaftMaxInflightBytes controls the maximum aggregate byte size of Raft log
// entries that a leader will send to a follower without hearing responses.
//
// Normally RaftMaxSizePerMsg * RaftMaxInflightMsgs is the actual limit. But
// the RaftMaxSizePerMsg is soft, and Raft may send individual messages
// arbitrarily larger than that (e.g. with a large write, or AddSST command),
// so it's possible that the overall limit is exceeded by a large multiple.
// RaftMaxInflightBytes is a stricter limit which can only be slightly
// exceeded (by a single message).
//
// This effectively bounds the bandwidth-delay product. Note that especially
// in high-latency deployments setting this too low can lead to a dramatic
// reduction in throughput. For example, with a peer that has a round-trip
// latency of 100ms to the leader and this setting is set to 1 MB, there is a
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
RaftMaxInflightBytes uint64

// Splitting a range which has a replica needing a snapshot results in two
// ranges in that state. The delay configured here slows down splits when in
// that situation (limiting to those splits not run through the split
Expand Down Expand Up @@ -545,6 +585,17 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RaftMaxInflightMsgs == 0 {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}

if cfg.RaftMaxInflightBytes == 0 {
cfg.RaftMaxInflightBytes = uint64(defaultRaftMaxInflightBytes)
}
// Fixup RaftMaxInflightBytes if it is lower than reasonable.
if other := maxInflightBytesFrom(
cfg.RaftMaxInflightMsgs, cfg.RaftMaxSizePerMsg,
); cfg.RaftMaxInflightBytes < other {
cfg.RaftMaxInflightBytes = other
}

if cfg.RaftDelaySplitToSuppressSnapshotTicks == 0 {
// The Raft Ticks interval defaults to 200ms, and an election is 10
// ticks. Add a generous amount of ticks to make sure even a backed up
Expand Down Expand Up @@ -641,6 +692,19 @@ func DefaultRetryOptions() retry.Options {
}
}

// maxInflightBytesFrom returns the minimal value for RaftMaxInflightBytes
// config option based on RaftMaxInflightMsgs and RaftMaxSizePerMsg.
func maxInflightBytesFrom(maxInflightMsgs int, maxSizePerMsg uint64) uint64 {
// Compute min(maxInflightMsgs * maxSizePerMsg, MaxUint64) safely.
if mul := new(big.Int).Mul(
big.NewInt(int64(maxInflightMsgs)),
new(big.Int).SetUint64(maxSizePerMsg),
); mul.IsUint64() {
return mul.Uint64()
}
return math.MaxUint64
}

// StorageConfig contains storage configs for all storage engine.
type StorageConfig struct {
Attrs roachpb.Attributes
Expand Down
35 changes: 35 additions & 0 deletions pkg/base/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package base_test

import (
"fmt"
"math"
"testing"
"time"

Expand All @@ -20,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/require"
)

func TestDefaultRaftConfig(t *testing.T) {
Expand Down Expand Up @@ -122,3 +124,36 @@ func TestDefaultRaftConfig(t *testing.T) {

echotest.Require(t, s, datapathutils.TestDataPath(t, "raft_config_recovery"))
}

func TestRaftMaxInflightBytes(t *testing.T) {
defer leaktest.AfterTest(t)()
for i, tc := range []struct {
msgSize uint64
maxMsgs int
maxInfl uint64
want uint64
}{
// If any of these tests fail, sync the corresponding default values with
// config.go, and update the comments that reason about default values.
{want: 256 << 20}, // assert 255 MB is still default
{maxMsgs: 128, want: 256 << 20}, // assert 128 is still default
{msgSize: 32 << 10, want: 256 << 20}, // assert 32 KB is still default

{maxMsgs: 1 << 30, want: 1 << 45}, // large maxMsgs
{msgSize: 1 << 50, want: 1 << 57}, // large msgSize

{msgSize: 100, maxMsgs: 10, maxInfl: 1000000, want: 1000000}, // reasonable
{msgSize: 100, maxMsgs: 10, maxInfl: 5, want: 1000}, // fixup applied
{msgSize: 1 << 50, maxMsgs: 1 << 20, want: math.MaxUint64}, // overflow
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
cfg := base.RaftConfig{
RaftMaxInflightMsgs: tc.maxMsgs,
RaftMaxSizePerMsg: tc.msgSize,
RaftMaxInflightBytes: tc.maxInfl,
}
cfg.SetDefaults()
require.Equal(t, tc.want, cfg.RaftMaxInflightBytes)
})
}
}
1 change: 1 addition & 0 deletions pkg/base/testdata/raft_config
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ echo
RaftMaxSizePerMsg: (uint64) 32768,
RaftMaxCommittedSizePerReady: (uint64) 67108864,
RaftMaxInflightMsgs: (int) 128,
RaftMaxInflightBytes: (uint64) 268435456,
RaftDelaySplitToSuppressSnapshotTicks: (int) 230
}
RaftHeartbeatInterval: 1s
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func newRaftConfig(
MaxCommittedSizePerReady: storeCfg.RaftMaxCommittedSizePerReady,
MaxSizePerMsg: storeCfg.RaftMaxSizePerMsg,
MaxInflightMsgs: storeCfg.RaftMaxInflightMsgs,
MaxInflightBytes: storeCfg.RaftMaxInflightBytes,
Storage: strg,
Logger: logger,

Expand Down

0 comments on commit 4ed157d

Please sign in to comment.