diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 8e121bf2e96a..82ade7ab9636 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -121,7 +121,7 @@ sql.defaults.transaction_rows_written_err integer 0 the limit for the number of
sql.defaults.transaction_rows_written_log integer 0 the threshold for the number of rows written by a SQL transaction which - once exceeded - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable
sql.defaults.vectorize enumeration on default vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4]
sql.defaults.zigzag_join.enabled boolean true default value for enable_zigzag_join session setting; allows use of zig-zag join by default
-sql.distsql.max_running_flows integer 500 maximum number of concurrent flows that can be run on a node
+sql.distsql.max_running_flows integer -128 the value - when positive - used as is, or the value - when negative - multiplied by the number of CPUs on a node, to determine the maximum number of concurrent remote flows that can be run on the node
sql.distsql.temp_storage.workmem byte size 64 MiB maximum amount of memory in bytes a processor can use before falling back to temp storage
sql.guardrails.max_row_size_err byte size 512 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable
sql.guardrails.max_row_size_log byte size 64 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index cb5d818d2e16..12ae9609c299 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -126,7 +126,7 @@
sql.defaults.transaction_rows_written_log | integer | 0 | the threshold for the number of rows written by a SQL transaction which - once exceeded - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable |
sql.defaults.vectorize | enumeration | on | default vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4] |
sql.defaults.zigzag_join.enabled | boolean | true | default value for enable_zigzag_join session setting; allows use of zig-zag join by default |
-sql.distsql.max_running_flows | integer | 500 | maximum number of concurrent flows that can be run on a node |
+sql.distsql.max_running_flows | integer | -128 | the value - when positive - used as is, or the value - when negative - multiplied by the number of CPUs on a node, to determine the maximum number of concurrent remote flows that can be run on the node |
sql.distsql.temp_storage.workmem | byte size | 64 MiB | maximum amount of memory in bytes a processor can use before falling back to temp storage |
sql.guardrails.max_row_size_err | byte size | 512 MiB | maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable |
sql.guardrails.max_row_size_log | byte size | 64 MiB | maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable |
diff --git a/pkg/sql/flowinfra/flow_scheduler.go b/pkg/sql/flowinfra/flow_scheduler.go
index b9c8540809fc..089a2c2470f5 100644
--- a/pkg/sql/flowinfra/flow_scheduler.go
+++ b/pkg/sql/flowinfra/flow_scheduler.go
@@ -13,6 +13,7 @@ package flowinfra
import (
"container/list"
"context"
+ "runtime"
"sync/atomic"
"time"
@@ -31,12 +32,35 @@ import (
const flowDoneChanSize = 8
+// We think that it makes sense to scale the default value for
+// max_running_flows based on how beefy the machines are, so we make it a
+// multiple of the number of available CPU cores.
+//
+// The choice of 128 as the default multiple is driven by the old default value
+// of 500 and is such that if we have 4 CPUs, then we'll get the value of 512,
+// pretty close to the old default.
+// TODO(yuzefovich): we probably want to remove / disable this limit completely
+// when we enable the admission control.
var settingMaxRunningFlows = settings.RegisterIntSetting(
"sql.distsql.max_running_flows",
- "maximum number of concurrent flows that can be run on a node",
- 500,
+ "the value - when positive - used as is, or the value - when negative - "+
+ "multiplied by the number of CPUs on a node, to determine the "+
+ "maximum number of concurrent remote flows that can be run on the node",
+ -128,
).WithPublic()
+// getMaxRunningFlows returns an absolute value that determines the maximum
+// number of concurrent remote flows on this node.
+func getMaxRunningFlows(settings *cluster.Settings) int64 {
+ maxRunningFlows := settingMaxRunningFlows.Get(&settings.SV)
+ if maxRunningFlows < 0 {
+ // We use GOMAXPROCS instead of NumCPU because the former could be
+ // adjusted based on cgroup limits (see cgroups.AdjustMaxProcs).
+ return -maxRunningFlows * int64(runtime.GOMAXPROCS(0))
+ }
+ return maxRunningFlows
+}
+
// FlowScheduler manages running flows and decides when to queue and when to
// start flows. The main interface it presents is ScheduleFlows, which passes a
// flow to be run.
@@ -94,11 +118,11 @@ func NewFlowScheduler(
flowDoneCh: make(chan Flow, flowDoneChanSize),
}
fs.mu.queue = list.New()
- maxRunningFlows := settingMaxRunningFlows.Get(&settings.SV)
+ maxRunningFlows := getMaxRunningFlows(settings)
fs.mu.runningFlows = make(map[execinfrapb.FlowID]execinfrapb.DistSQLRemoteFlowInfo, maxRunningFlows)
fs.atomics.maxRunningFlows = int32(maxRunningFlows)
settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) {
- atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(settingMaxRunningFlows.Get(&settings.SV)))
+ atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(getMaxRunningFlows(settings)))
})
return fs
}