Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: flowinfra: make max_running_flows default depend on the number of CPUs #75509

Merged
merged 1 commit into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ sql.defaults.transaction_rows_written_err integer 0 the limit for the number of
sql.defaults.transaction_rows_written_log integer 0 the threshold for the number of rows written by a SQL transaction which - once exceeded - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable
sql.defaults.vectorize enumeration on default vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4]
sql.defaults.zigzag_join.enabled boolean true default value for enable_zigzag_join session setting; allows use of zig-zag join by default
sql.distsql.max_running_flows integer 500 maximum number of concurrent flows that can be run on a node
sql.distsql.max_running_flows integer -128 the value - when positive - used as is, or the value - when negative - multiplied by the number of CPUs on a node, to determine the maximum number of concurrent remote flows that can be run on the node
sql.distsql.temp_storage.workmem byte size 64 MiB maximum amount of memory in bytes a processor can use before falling back to temp storage
sql.guardrails.max_row_size_err byte size 512 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable
sql.guardrails.max_row_size_log byte size 64 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
<tr><td><code>sql.defaults.transaction_rows_written_log</code></td><td>integer</td><td><code>0</code></td><td>the threshold for the number of rows written by a SQL transaction which - once exceeded - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable</td></tr>
<tr><td><code>sql.defaults.vectorize</code></td><td>enumeration</td><td><code>on</code></td><td>default vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4]</td></tr>
<tr><td><code>sql.defaults.zigzag_join.enabled</code></td><td>boolean</td><td><code>true</code></td><td>default value for enable_zigzag_join session setting; allows use of zig-zag join by default</td></tr>
<tr><td><code>sql.distsql.max_running_flows</code></td><td>integer</td><td><code>500</code></td><td>maximum number of concurrent flows that can be run on a node</td></tr>
<tr><td><code>sql.distsql.max_running_flows</code></td><td>integer</td><td><code>-128</code></td><td>the value - when positive - used as is, or the value - when negative - multiplied by the number of CPUs on a node, to determine the maximum number of concurrent remote flows that can be run on the node</td></tr>
<tr><td><code>sql.distsql.temp_storage.workmem</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum amount of memory in bytes a processor can use before falling back to temp storage</td></tr>
<tr><td><code>sql.guardrails.max_row_size_err</code></td><td>byte size</td><td><code>512 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable</td></tr>
<tr><td><code>sql.guardrails.max_row_size_log</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable</td></tr>
Expand Down
32 changes: 28 additions & 4 deletions pkg/sql/flowinfra/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package flowinfra
import (
"container/list"
"context"
"runtime"
"sync/atomic"
"time"

Expand All @@ -31,12 +32,35 @@ import (

const flowDoneChanSize = 8

// We think that it makes sense to scale the default value for
// max_running_flows based on how beefy the machines are, so we make it a
// multiple of the number of available CPU cores.
//
// The choice of 128 as the default multiple is driven by the old default value
// of 500 and is such that if we have 4 CPUs, then we'll get the value of 512,
// pretty close to the old default.
// TODO(yuzefovich): we probably want to remove / disable this limit completely
// when we enable the admission control.
var settingMaxRunningFlows = settings.RegisterIntSetting(
"sql.distsql.max_running_flows",
"maximum number of concurrent flows that can be run on a node",
500,
"the value - when positive - used as is, or the value - when negative - "+
"multiplied by the number of CPUs on a node, to determine the "+
"maximum number of concurrent remote flows that can be run on the node",
-128,
).WithPublic()

// getMaxRunningFlows returns an absolute value that determines the maximum
// number of concurrent remote flows on this node.
func getMaxRunningFlows(settings *cluster.Settings) int64 {
maxRunningFlows := settingMaxRunningFlows.Get(&settings.SV)
if maxRunningFlows < 0 {
// We use GOMAXPROCS instead of NumCPU because the former could be
// adjusted based on cgroup limits (see cgroups.AdjustMaxProcs).
return -maxRunningFlows * int64(runtime.GOMAXPROCS(0))
}
return maxRunningFlows
}

// FlowScheduler manages running flows and decides when to queue and when to
// start flows. The main interface it presents is ScheduleFlows, which passes a
// flow to be run.
Expand Down Expand Up @@ -94,11 +118,11 @@ func NewFlowScheduler(
flowDoneCh: make(chan Flow, flowDoneChanSize),
}
fs.mu.queue = list.New()
maxRunningFlows := settingMaxRunningFlows.Get(&settings.SV)
maxRunningFlows := getMaxRunningFlows(settings)
fs.mu.runningFlows = make(map[execinfrapb.FlowID]execinfrapb.DistSQLRemoteFlowInfo, maxRunningFlows)
fs.atomics.maxRunningFlows = int32(maxRunningFlows)
settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) {
atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(settingMaxRunningFlows.Get(&settings.SV)))
atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(getMaxRunningFlows(settings)))
})
return fs
}
Expand Down