Skip to content

Commit

Permalink
kvserver: distribute COCKROACH_SCHEDULER_CONCURRENCY across stores
Browse files Browse the repository at this point in the history
`COCKROACH_SCHEDULER_CONCURRENCY` defaults to 8 per CPU core, capped at
96, to avoid putting excessive pressure on the Go scheduler. However, it
was applied individually to each store, which means that nodes with e.g.
10 stores would run up to 960 workers. This can lead to scheduler
thrashing, as well as excessive memory usage since it also serves to
bound the amount of data pulled into memory for concurrent Raft ready
processing.

This patch instead divides the worker count across stores.

Epic: none
Release note (ops change): the default Raft scheduler concurrency,
controlled by `COCKROACH_SCHEDULER_CONCURRENCY` and defaulting to 8 per
CPU core capped at 96, is now divided evenly across stores instead of
applying individually per store. This avoids excessive Go scheduler
pressure and memory usage on nodes with many stores. The common case of
1 store per node is not affected.
  • Loading branch information
erikgrinaker committed May 7, 2023
1 parent e514a07 commit a9ed7be
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ const (
)

// defaultRaftSchedulerConcurrency specifies the default number of Raft
// scheduler worker goroutines.
// scheduler worker goroutines. These are evenly distributed across stores.
//
// For small machines, we scale the scheduler concurrency by the number of
// CPUs. 8*NumCPU was determined in 9a68241 (April 2017) as the optimal
Expand Down Expand Up @@ -277,7 +277,7 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig {
sc.RaftElectionTimeoutTicks = 3
sc.RaftReproposalTimeoutTicks = 5
sc.RaftTickInterval = 100 * time.Millisecond
sc.SetDefaults()
sc.SetDefaults(1 /* numStores */)
return sc
}

Expand Down Expand Up @@ -1180,14 +1180,20 @@ func (sc *StoreConfig) Valid() bool {
// SetDefaults initializes unset fields in StoreConfig to values
// suitable for use on a local network.
// TODO(tschottdorf): see if this ought to be configurable via flags.
func (sc *StoreConfig) SetDefaults() {
func (sc *StoreConfig) SetDefaults(numStores int) {
sc.RaftConfig.SetDefaults()

if sc.CoalescedHeartbeatsInterval == 0 {
sc.CoalescedHeartbeatsInterval = sc.RaftTickInterval / 2
}
if sc.RaftSchedulerConcurrency == 0 {
sc.RaftSchedulerConcurrency = defaultRaftSchedulerConcurrency
// If we have more than one store, evenly divide the default workers across
// stores, since the default value is a function of CPU count and should not
// scale with the number of stores.
if numStores > 1 && sc.RaftSchedulerConcurrency > 1 {
sc.RaftSchedulerConcurrency = (sc.RaftSchedulerConcurrency-1)/numStores + 1 // ceil division
}
}
if sc.RaftSchedulerShardSize == 0 {
sc.RaftSchedulerShardSize = defaultRaftSchedulerShardSize
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestStoreGossipDeltaTrigger(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
cfg := &StoreConfig{}
cfg.SetDefaults()
cfg.SetDefaults(1 /* numStores */)
sg := NewStoreGossip(nil, nil, cfg.TestingKnobs.GossipTestingKnobs)
sg.cachedCapacity.cached = tc.cached
sg.cachedCapacity.lastGossiped = tc.lastGossiped
Expand Down
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,40 @@ func TestIterateIDPrefixKeys(t *testing.T) {
}
}

// TestStoreConfigSetDefaults checks that StoreConfig.SetDefaults() sets proper
// defaults based on numStores.
func TestStoreConfigSetDefaultsNumStores(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testcases := map[string]struct {
conc int
defaultConc int
numStores int
expectConc int
}{
"zero default is retained": {defaultConc: 0, numStores: 4, expectConc: 0},
"negative default is retained": {defaultConc: -1, numStores: 4, expectConc: -1},
"zero stores retains default": {defaultConc: 4, expectConc: 4},
"explicit value not distributed": {conc: 4, numStores: 2, expectConc: 4},
"explicit value overrides default": {conc: 4, defaultConc: 16, numStores: 2, expectConc: 4},
"default value is distributed": {defaultConc: 16, numStores: 4, expectConc: 4},
"default value uses ceil division": {defaultConc: 16, numStores: 5, expectConc: 4},
"all stores have at least 1": {defaultConc: 4, numStores: 10, expectConc: 1},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
defer func(original int) {
defaultRaftSchedulerConcurrency = original // restore global default
}(defaultRaftSchedulerConcurrency)
defaultRaftSchedulerConcurrency = tc.defaultConc
cfg := StoreConfig{RaftSchedulerConcurrency: tc.conc}
cfg.SetDefaults(tc.numStores)
require.Equal(t, tc.expectConc, cfg.RaftSchedulerConcurrency)
})
}
}

// TestStoreInitAndBootstrap verifies store initialization and bootstrap.
func TestStoreInitAndBootstrap(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
if storeTestingKnobs := cfg.TestingKnobs.Store; storeTestingKnobs != nil {
storeCfg.TestingKnobs = *storeTestingKnobs.(*kvserver.StoreTestingKnobs)
}
storeCfg.SetDefaults()
storeCfg.SetDefaults(len(engines))

systemTenantNameContainer := roachpb.NewTenantNameContainer(catconstants.SystemTenantName)

Expand Down

0 comments on commit a9ed7be

Please sign in to comment.