From 77c467389668d526fd483cab459cf6bc3c49e25e Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 26 Jul 2022 14:07:52 -0700 Subject: [PATCH] flowinfra: disable queueing mechanism of the flow scheduler by default This commit disables the queueing mechanism of the flow scheduler as part of the effort to remove that queueing altogether during 23.1 release cycle. To get there though we choose a conservative approach of introducing a cluster setting that determines whether the queueing is enabled or not, and if it is disabled, then we effectively a treating `sql.distsql.max_running_flows` limit as infinite. By default, the queueing is now disabled since recent experiments have shown that the admission control does a good job of protecting the nodes from the influx of remote flows. Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 1 + pkg/sql/crdb_internal_test.go | 3 +++ pkg/sql/distsql_running_test.go | 3 +++ pkg/sql/flowinfra/cluster_test.go | 14 ++++++-------- pkg/sql/flowinfra/flow_scheduler.go | 22 +++++++++++++++++++--- pkg/sql/flowinfra/flow_scheduler_test.go | 2 ++ 6 files changed, 34 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 85fae2a501f7..fca63a68eb56 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3436,6 +3436,7 @@ func TestChangefeedJobRetryOnNoInboundStream(t *testing.T) { // force fast "no inbound stream" error var oldMaxRunningFlows int var oldTimeout string + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.max_running_flows").Scan(&oldMaxRunningFlows) sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.flow_stream_timeout").Scan(&oldTimeout) serverutils.SetClusterSetting(t, cluster, "sql.distsql.max_running_flows", 0) diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index d5aa68e6f85d..fff4059d9ade 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -566,6 +566,9 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) { ), ) + // Enable the queueing mechanism of the flow scheduler. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") + execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) tableID := sqlutils.QueryTableID(t, sqlDB.DB, "test", "public", "foo") tableKey.Store(execCfg.Codec.TablePrefix(tableID)) diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index a90bf3efff03..6b965909a8c9 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -586,6 +586,9 @@ func TestDistSQLReceiverCancelsDeadFlows(t *testing.T) { ), ) + // Enable the queueing mechanism of the flow scheduler. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") + // Disable the execution of all remote flows and shorten the timeout. const maxRunningFlows = 0 const flowStreamTimeout = 1 // in seconds diff --git a/pkg/sql/flowinfra/cluster_test.go b/pkg/sql/flowinfra/cluster_test.go index 14d7d4cb1110..7708024bbd17 100644 --- a/pkg/sql/flowinfra/cluster_test.go +++ b/pkg/sql/flowinfra/cluster_test.go @@ -86,6 +86,10 @@ func TestClusterFlow(t *testing.T) { return span } + // Enable the queueing mechanism of the flow scheduler. + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") + // successful indicates whether the flow execution is successful. for _, successful := range []bool{true, false} { // Set up table readers on three hosts feeding data into a join reader on @@ -378,15 +382,9 @@ func TestTenantClusterFlow(t *testing.T) { return span } + // Enable the queueing mechanism of the flow scheduler. sqlDB := sqlutils.MakeSQLRunner(podConns[0]) - rows := sqlDB.Query(t, "SELECT num FROM test.t") - defer rows.Close() - for rows.Next() { - var key int - if err := rows.Scan(&key); err != nil { - t.Fatal(err) - } - } + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") // successful indicates whether the flow execution is successful. for _, successful := range []bool{true, false} { diff --git a/pkg/sql/flowinfra/flow_scheduler.go b/pkg/sql/flowinfra/flow_scheduler.go index bd8ac49e097d..57ec595c91bf 100644 --- a/pkg/sql/flowinfra/flow_scheduler.go +++ b/pkg/sql/flowinfra/flow_scheduler.go @@ -71,6 +71,7 @@ type FlowScheduler struct { stopper *stop.Stopper flowDoneCh chan Flow metrics *execinfra.DistSQLMetrics + sv *settings.Values mu struct { syncutil.Mutex @@ -126,12 +127,13 @@ func NewFlowScheduler( AmbientContext: ambient, stopper: stopper, flowDoneCh: make(chan Flow, flowDoneChanSize), + sv: &settings.SV, } fs.mu.queue = list.New() maxRunningFlows := getMaxRunningFlows(settings) fs.mu.runningFlows = make(map[execinfrapb.FlowID]execinfrapb.DistSQLRemoteFlowInfo, maxRunningFlows) fs.atomics.maxRunningFlows = int32(maxRunningFlows) - settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) { + settingMaxRunningFlows.SetOnChange(fs.sv, func(ctx context.Context) { atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(getMaxRunningFlows(settings))) }) return fs @@ -142,13 +144,27 @@ func (fs *FlowScheduler) Init(metrics *execinfra.DistSQLMetrics) { fs.metrics = metrics } +var flowSchedulerQueueingEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.distsql.flow_scheduler_queueing.enabled", + "determines whether the flow scheduler imposes the limit on the maximum "+ + "number of concurrent remote DistSQL flows that a single node can have "+ + "(the limit is determined by the sql.distsql.max_running_flows setting)", + false, +) + // canRunFlow returns whether the FlowScheduler can run the flow. If true is // returned, numRunning is also incremented. // TODO(radu): we will have more complex resource accounting (like memory). // For now we just limit the number of concurrent flows. -func (fs *FlowScheduler) canRunFlow(_ Flow) bool { +func (fs *FlowScheduler) canRunFlow() bool { // Optimistically increase numRunning to account for this new flow. newNumRunning := atomic.AddInt32(&fs.atomics.numRunning, 1) + if !flowSchedulerQueueingEnabled.Get(fs.sv) { + // The queueing behavior of the flow scheduler is disabled, so we can + // run this flow without checking against the maxRunningFlows counter). + return true + } if newNumRunning <= atomic.LoadInt32(&fs.atomics.maxRunningFlows) { // Happy case. This flow did not bring us over the limit, so return that the // flow can be run and is accounted for in numRunning. @@ -204,7 +220,7 @@ func (fs *FlowScheduler) ScheduleFlow(ctx context.Context, f Flow) error { ctx, "flowinfra.FlowScheduler: scheduling flow", func(ctx context.Context) error { fs.metrics.FlowsScheduled.Inc(1) telemetry.Inc(sqltelemetry.DistSQLFlowsScheduled) - if fs.canRunFlow(f) { + if fs.canRunFlow() { return fs.runFlowNow(ctx, f, false /* locked */) } fs.mu.Lock() diff --git a/pkg/sql/flowinfra/flow_scheduler_test.go b/pkg/sql/flowinfra/flow_scheduler_test.go index a78393855a5b..7b845d857a60 100644 --- a/pkg/sql/flowinfra/flow_scheduler_test.go +++ b/pkg/sql/flowinfra/flow_scheduler_test.go @@ -133,6 +133,8 @@ func TestFlowScheduler(t *testing.T) { ) defer stopper.Stop(ctx) + // Enable the queueing mechanism of the flow scheduler. + flowSchedulerQueueingEnabled.Override(ctx, &settings.SV, true) scheduler := NewFlowScheduler(log.MakeTestingAmbientCtxWithNewTracer(), stopper, settings) scheduler.Init(&metrics) scheduler.Start()