diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 85fae2a501f7..fca63a68eb56 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3436,6 +3436,7 @@ func TestChangefeedJobRetryOnNoInboundStream(t *testing.T) { // force fast "no inbound stream" error var oldMaxRunningFlows int var oldTimeout string + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.max_running_flows").Scan(&oldMaxRunningFlows) sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.flow_stream_timeout").Scan(&oldTimeout) serverutils.SetClusterSetting(t, cluster, "sql.distsql.max_running_flows", 0) diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index d5aa68e6f85d..fff4059d9ade 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -566,6 +566,9 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) { ), ) + // Enable the queueing mechanism of the flow scheduler. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") + execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) tableID := sqlutils.QueryTableID(t, sqlDB.DB, "test", "public", "foo") tableKey.Store(execCfg.Codec.TablePrefix(tableID)) diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index a90bf3efff03..6b965909a8c9 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -586,6 +586,9 @@ func TestDistSQLReceiverCancelsDeadFlows(t *testing.T) { ), ) + // Enable the queueing mechanism of the flow scheduler. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") + // Disable the execution of all remote flows and shorten the timeout. const maxRunningFlows = 0 const flowStreamTimeout = 1 // in seconds diff --git a/pkg/sql/flowinfra/cluster_test.go b/pkg/sql/flowinfra/cluster_test.go index 14d7d4cb1110..7708024bbd17 100644 --- a/pkg/sql/flowinfra/cluster_test.go +++ b/pkg/sql/flowinfra/cluster_test.go @@ -86,6 +86,10 @@ func TestClusterFlow(t *testing.T) { return span } + // Enable the queueing mechanism of the flow scheduler. + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") + // successful indicates whether the flow execution is successful. for _, successful := range []bool{true, false} { // Set up table readers on three hosts feeding data into a join reader on @@ -378,15 +382,9 @@ func TestTenantClusterFlow(t *testing.T) { return span } + // Enable the queueing mechanism of the flow scheduler. sqlDB := sqlutils.MakeSQLRunner(podConns[0]) - rows := sqlDB.Query(t, "SELECT num FROM test.t") - defer rows.Close() - for rows.Next() { - var key int - if err := rows.Scan(&key); err != nil { - t.Fatal(err) - } - } + sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true") // successful indicates whether the flow execution is successful. for _, successful := range []bool{true, false} { diff --git a/pkg/sql/flowinfra/flow_scheduler.go b/pkg/sql/flowinfra/flow_scheduler.go index bd8ac49e097d..57ec595c91bf 100644 --- a/pkg/sql/flowinfra/flow_scheduler.go +++ b/pkg/sql/flowinfra/flow_scheduler.go @@ -71,6 +71,7 @@ type FlowScheduler struct { stopper *stop.Stopper flowDoneCh chan Flow metrics *execinfra.DistSQLMetrics + sv *settings.Values mu struct { syncutil.Mutex @@ -126,12 +127,13 @@ func NewFlowScheduler( AmbientContext: ambient, stopper: stopper, flowDoneCh: make(chan Flow, flowDoneChanSize), + sv: &settings.SV, } fs.mu.queue = list.New() maxRunningFlows := getMaxRunningFlows(settings) fs.mu.runningFlows = make(map[execinfrapb.FlowID]execinfrapb.DistSQLRemoteFlowInfo, maxRunningFlows) fs.atomics.maxRunningFlows = int32(maxRunningFlows) - settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) { + settingMaxRunningFlows.SetOnChange(fs.sv, func(ctx context.Context) { atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(getMaxRunningFlows(settings))) }) return fs @@ -142,13 +144,27 @@ func (fs *FlowScheduler) Init(metrics *execinfra.DistSQLMetrics) { fs.metrics = metrics } +var flowSchedulerQueueingEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.distsql.flow_scheduler_queueing.enabled", + "determines whether the flow scheduler imposes the limit on the maximum "+ + "number of concurrent remote DistSQL flows that a single node can have "+ + "(the limit is determined by the sql.distsql.max_running_flows setting)", + false, +) + // canRunFlow returns whether the FlowScheduler can run the flow. If true is // returned, numRunning is also incremented. // TODO(radu): we will have more complex resource accounting (like memory). // For now we just limit the number of concurrent flows. -func (fs *FlowScheduler) canRunFlow(_ Flow) bool { +func (fs *FlowScheduler) canRunFlow() bool { // Optimistically increase numRunning to account for this new flow. newNumRunning := atomic.AddInt32(&fs.atomics.numRunning, 1) + if !flowSchedulerQueueingEnabled.Get(fs.sv) { + // The queueing behavior of the flow scheduler is disabled, so we can + // run this flow without checking against the maxRunningFlows counter). + return true + } if newNumRunning <= atomic.LoadInt32(&fs.atomics.maxRunningFlows) { // Happy case. This flow did not bring us over the limit, so return that the // flow can be run and is accounted for in numRunning. @@ -204,7 +220,7 @@ func (fs *FlowScheduler) ScheduleFlow(ctx context.Context, f Flow) error { ctx, "flowinfra.FlowScheduler: scheduling flow", func(ctx context.Context) error { fs.metrics.FlowsScheduled.Inc(1) telemetry.Inc(sqltelemetry.DistSQLFlowsScheduled) - if fs.canRunFlow(f) { + if fs.canRunFlow() { return fs.runFlowNow(ctx, f, false /* locked */) } fs.mu.Lock() diff --git a/pkg/sql/flowinfra/flow_scheduler_test.go b/pkg/sql/flowinfra/flow_scheduler_test.go index a78393855a5b..7b845d857a60 100644 --- a/pkg/sql/flowinfra/flow_scheduler_test.go +++ b/pkg/sql/flowinfra/flow_scheduler_test.go @@ -133,6 +133,8 @@ func TestFlowScheduler(t *testing.T) { ) defer stopper.Stop(ctx) + // Enable the queueing mechanism of the flow scheduler. + flowSchedulerQueueingEnabled.Override(ctx, &settings.SV, true) scheduler := NewFlowScheduler(log.MakeTestingAmbientCtxWithNewTracer(), stopper, settings) scheduler.Init(&metrics) scheduler.Start()