From 80a036670337f780858261b2d410bf810e9ea362 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 21 Oct 2021 11:14:28 -0700 Subject: [PATCH] flowinfra: make max_running_flows default depend on the number of CPUs We think that it makes sense to scale the default value for `max_running_flows` based on how beefy the machines are, so we make it a multiple of the number of available CPU cores. We do so in a backwards-compatible fashion by treating the positive values of `sql.distsql.max_running_flows` as absolute values (the previous meaning) and the negative values as multiples of the number of the CPUs. The choice of 128 as the default multiple is driven by the old default value of 500 and is such that if we have 4 CPUs, then we'll get the value of 512, pretty close to the old default. Release note (ops change): The meaning of `sql.distsql.max_running_flows` cluster setting has been extended so that when the value is negative, it would be multiplied by the number of CPUs on the node to get the maximum number of concurrent remote flows on the node. The default value is -128, meaning that on a 4 CPU machine we will have up to 512 concurrent remote DistSQL flows, but on a 8 CPU machine up to 1024. The previous default was 500. --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/sql/flowinfra/flow_scheduler.go | 32 ++++++++++++++++--- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 8e121bf2e96a..82ade7ab9636 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -121,7 +121,7 @@ sql.defaults.transaction_rows_written_err integer 0 the limit for the number of sql.defaults.transaction_rows_written_log integer 0 the threshold for the number of rows written by a SQL transaction which - once exceeded - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable sql.defaults.vectorize enumeration on default vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4] sql.defaults.zigzag_join.enabled boolean true default value for enable_zigzag_join session setting; allows use of zig-zag join by default -sql.distsql.max_running_flows integer 500 maximum number of concurrent flows that can be run on a node +sql.distsql.max_running_flows integer -128 the value - when positive - used as is, or the value - when negative - multiplied by the number of CPUs on a node, to determine the maximum number of concurrent remote flows that can be run on the node sql.distsql.temp_storage.workmem byte size 64 MiB maximum amount of memory in bytes a processor can use before falling back to temp storage sql.guardrails.max_row_size_err byte size 512 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable sql.guardrails.max_row_size_log byte size 64 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index cb5d818d2e16..12ae9609c299 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -126,7 +126,7 @@ sql.defaults.transaction_rows_written_loginteger0the threshold for the number of rows written by a SQL transaction which - once exceeded - will trigger a logging event to SQL_PERF (or SQL_INTERNAL_PERF for internal transactions); use 0 to disable sql.defaults.vectorizeenumerationondefault vectorize mode [on = 0, on = 2, experimental_always = 3, off = 4] sql.defaults.zigzag_join.enabledbooleantruedefault value for enable_zigzag_join session setting; allows use of zig-zag join by default -sql.distsql.max_running_flowsinteger500maximum number of concurrent flows that can be run on a node +sql.distsql.max_running_flowsinteger-128the value - when positive - used as is, or the value - when negative - multiplied by the number of CPUs on a node, to determine the maximum number of concurrent remote flows that can be run on the node sql.distsql.temp_storage.workmembyte size64 MiBmaximum amount of memory in bytes a processor can use before falling back to temp storage sql.guardrails.max_row_size_errbyte size512 MiBmaximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable sql.guardrails.max_row_size_logbyte size64 MiBmaximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable diff --git a/pkg/sql/flowinfra/flow_scheduler.go b/pkg/sql/flowinfra/flow_scheduler.go index b9c8540809fc..089a2c2470f5 100644 --- a/pkg/sql/flowinfra/flow_scheduler.go +++ b/pkg/sql/flowinfra/flow_scheduler.go @@ -13,6 +13,7 @@ package flowinfra import ( "container/list" "context" + "runtime" "sync/atomic" "time" @@ -31,12 +32,35 @@ import ( const flowDoneChanSize = 8 +// We think that it makes sense to scale the default value for +// max_running_flows based on how beefy the machines are, so we make it a +// multiple of the number of available CPU cores. +// +// The choice of 128 as the default multiple is driven by the old default value +// of 500 and is such that if we have 4 CPUs, then we'll get the value of 512, +// pretty close to the old default. +// TODO(yuzefovich): we probably want to remove / disable this limit completely +// when we enable the admission control. var settingMaxRunningFlows = settings.RegisterIntSetting( "sql.distsql.max_running_flows", - "maximum number of concurrent flows that can be run on a node", - 500, + "the value - when positive - used as is, or the value - when negative - "+ + "multiplied by the number of CPUs on a node, to determine the "+ + "maximum number of concurrent remote flows that can be run on the node", + -128, ).WithPublic() +// getMaxRunningFlows returns an absolute value that determines the maximum +// number of concurrent remote flows on this node. +func getMaxRunningFlows(settings *cluster.Settings) int64 { + maxRunningFlows := settingMaxRunningFlows.Get(&settings.SV) + if maxRunningFlows < 0 { + // We use GOMAXPROCS instead of NumCPU because the former could be + // adjusted based on cgroup limits (see cgroups.AdjustMaxProcs). + return -maxRunningFlows * int64(runtime.GOMAXPROCS(0)) + } + return maxRunningFlows +} + // FlowScheduler manages running flows and decides when to queue and when to // start flows. The main interface it presents is ScheduleFlows, which passes a // flow to be run. @@ -94,11 +118,11 @@ func NewFlowScheduler( flowDoneCh: make(chan Flow, flowDoneChanSize), } fs.mu.queue = list.New() - maxRunningFlows := settingMaxRunningFlows.Get(&settings.SV) + maxRunningFlows := getMaxRunningFlows(settings) fs.mu.runningFlows = make(map[execinfrapb.FlowID]execinfrapb.DistSQLRemoteFlowInfo, maxRunningFlows) fs.atomics.maxRunningFlows = int32(maxRunningFlows) settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) { - atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(settingMaxRunningFlows.Get(&settings.SV))) + atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(getMaxRunningFlows(settings))) }) return fs }