Skip to content

Commit

Permalink
flowinfra: disable queueing mechanism of the flow scheduler by default
Browse files Browse the repository at this point in the history
This commit disables the queueing mechanism of the flow scheduler as
part of the effort to remove that queueing altogether during 23.1
release cycle. To get there though we choose a conservative approach of
introducing a cluster setting that determines whether the queueing is
enabled or not, and if it is disabled, then we effectively a treating
`sql.distsql.max_running_flows` limit as infinite. By default, the
queueing is now disabled since recent experiments have shown that the
admission control does a good job of protecting the nodes from the
influx of remote flows.

Release note: None
  • Loading branch information
yuzefovich committed Jul 26, 2022
1 parent d7b901d commit 77c4673
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3436,6 +3436,7 @@ func TestChangefeedJobRetryOnNoInboundStream(t *testing.T) {
// force fast "no inbound stream" error
var oldMaxRunningFlows int
var oldTimeout string
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")
sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.max_running_flows").Scan(&oldMaxRunningFlows)
sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.flow_stream_timeout").Scan(&oldTimeout)
serverutils.SetClusterSetting(t, cluster, "sql.distsql.max_running_flows", 0)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
),
)

// Enable the queueing mechanism of the flow scheduler.
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "test", "public", "foo")
tableKey.Store(execCfg.Codec.TablePrefix(tableID))
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ func TestDistSQLReceiverCancelsDeadFlows(t *testing.T) {
),
)

// Enable the queueing mechanism of the flow scheduler.
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

// Disable the execution of all remote flows and shorten the timeout.
const maxRunningFlows = 0
const flowStreamTimeout = 1 // in seconds
Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/flowinfra/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func TestClusterFlow(t *testing.T) {
return span
}

// Enable the queueing mechanism of the flow scheduler.
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

// successful indicates whether the flow execution is successful.
for _, successful := range []bool{true, false} {
// Set up table readers on three hosts feeding data into a join reader on
Expand Down Expand Up @@ -378,15 +382,9 @@ func TestTenantClusterFlow(t *testing.T) {
return span
}

// Enable the queueing mechanism of the flow scheduler.
sqlDB := sqlutils.MakeSQLRunner(podConns[0])
rows := sqlDB.Query(t, "SELECT num FROM test.t")
defer rows.Close()
for rows.Next() {
var key int
if err := rows.Scan(&key); err != nil {
t.Fatal(err)
}
}
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

// successful indicates whether the flow execution is successful.
for _, successful := range []bool{true, false} {
Expand Down
22 changes: 19 additions & 3 deletions pkg/sql/flowinfra/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type FlowScheduler struct {
stopper *stop.Stopper
flowDoneCh chan Flow
metrics *execinfra.DistSQLMetrics
sv *settings.Values

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -126,12 +127,13 @@ func NewFlowScheduler(
AmbientContext: ambient,
stopper: stopper,
flowDoneCh: make(chan Flow, flowDoneChanSize),
sv: &settings.SV,
}
fs.mu.queue = list.New()
maxRunningFlows := getMaxRunningFlows(settings)
fs.mu.runningFlows = make(map[execinfrapb.FlowID]execinfrapb.DistSQLRemoteFlowInfo, maxRunningFlows)
fs.atomics.maxRunningFlows = int32(maxRunningFlows)
settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) {
settingMaxRunningFlows.SetOnChange(fs.sv, func(ctx context.Context) {
atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(getMaxRunningFlows(settings)))
})
return fs
Expand All @@ -142,13 +144,27 @@ func (fs *FlowScheduler) Init(metrics *execinfra.DistSQLMetrics) {
fs.metrics = metrics
}

var flowSchedulerQueueingEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.distsql.flow_scheduler_queueing.enabled",
"determines whether the flow scheduler imposes the limit on the maximum "+
"number of concurrent remote DistSQL flows that a single node can have "+
"(the limit is determined by the sql.distsql.max_running_flows setting)",
false,
)

// canRunFlow returns whether the FlowScheduler can run the flow. If true is
// returned, numRunning is also incremented.
// TODO(radu): we will have more complex resource accounting (like memory).
// For now we just limit the number of concurrent flows.
func (fs *FlowScheduler) canRunFlow(_ Flow) bool {
func (fs *FlowScheduler) canRunFlow() bool {
// Optimistically increase numRunning to account for this new flow.
newNumRunning := atomic.AddInt32(&fs.atomics.numRunning, 1)
if !flowSchedulerQueueingEnabled.Get(fs.sv) {
// The queueing behavior of the flow scheduler is disabled, so we can
// run this flow without checking against the maxRunningFlows counter).
return true
}
if newNumRunning <= atomic.LoadInt32(&fs.atomics.maxRunningFlows) {
// Happy case. This flow did not bring us over the limit, so return that the
// flow can be run and is accounted for in numRunning.
Expand Down Expand Up @@ -204,7 +220,7 @@ func (fs *FlowScheduler) ScheduleFlow(ctx context.Context, f Flow) error {
ctx, "flowinfra.FlowScheduler: scheduling flow", func(ctx context.Context) error {
fs.metrics.FlowsScheduled.Inc(1)
telemetry.Inc(sqltelemetry.DistSQLFlowsScheduled)
if fs.canRunFlow(f) {
if fs.canRunFlow() {
return fs.runFlowNow(ctx, f, false /* locked */)
}
fs.mu.Lock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/flowinfra/flow_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func TestFlowScheduler(t *testing.T) {
)
defer stopper.Stop(ctx)

// Enable the queueing mechanism of the flow scheduler.
flowSchedulerQueueingEnabled.Override(ctx, &settings.SV, true)
scheduler := NewFlowScheduler(log.MakeTestingAmbientCtxWithNewTracer(), stopper, settings)
scheduler.Init(&metrics)
scheduler.Start()
Expand Down

0 comments on commit 77c4673

Please sign in to comment.