Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flowinfra: disable queueing mechanism of the flow scheduler by default #85091

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3436,6 +3436,7 @@ func TestChangefeedJobRetryOnNoInboundStream(t *testing.T) {
// force fast "no inbound stream" error
var oldMaxRunningFlows int
var oldTimeout string
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")
sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.max_running_flows").Scan(&oldMaxRunningFlows)
sqlDB.QueryRow(t, "SHOW CLUSTER SETTING sql.distsql.flow_stream_timeout").Scan(&oldTimeout)
serverutils.SetClusterSetting(t, cluster, "sql.distsql.max_running_flows", 0)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
),
)

// Enable the queueing mechanism of the flow scheduler.
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)
tableID := sqlutils.QueryTableID(t, sqlDB.DB, "test", "public", "foo")
tableKey.Store(execCfg.Codec.TablePrefix(tableID))
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ func TestDistSQLReceiverCancelsDeadFlows(t *testing.T) {
),
)

// Enable the queueing mechanism of the flow scheduler.
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

// Disable the execution of all remote flows and shorten the timeout.
const maxRunningFlows = 0
const flowStreamTimeout = 1 // in seconds
Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/flowinfra/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func TestClusterFlow(t *testing.T) {
return span
}

// Enable the queueing mechanism of the flow scheduler.
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

// successful indicates whether the flow execution is successful.
for _, successful := range []bool{true, false} {
// Set up table readers on three hosts feeding data into a join reader on
Expand Down Expand Up @@ -378,15 +382,9 @@ func TestTenantClusterFlow(t *testing.T) {
return span
}

// Enable the queueing mechanism of the flow scheduler.
sqlDB := sqlutils.MakeSQLRunner(podConns[0])
rows := sqlDB.Query(t, "SELECT num FROM test.t")
defer rows.Close()
for rows.Next() {
var key int
if err := rows.Scan(&key); err != nil {
t.Fatal(err)
}
}
sqlDB.Exec(t, "SET CLUSTER SETTING sql.distsql.flow_scheduler_queueing.enabled = true")

// successful indicates whether the flow execution is successful.
for _, successful := range []bool{true, false} {
Expand Down
22 changes: 19 additions & 3 deletions pkg/sql/flowinfra/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type FlowScheduler struct {
stopper *stop.Stopper
flowDoneCh chan Flow
metrics *execinfra.DistSQLMetrics
sv *settings.Values

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -126,12 +127,13 @@ func NewFlowScheduler(
AmbientContext: ambient,
stopper: stopper,
flowDoneCh: make(chan Flow, flowDoneChanSize),
sv: &settings.SV,
}
fs.mu.queue = list.New()
maxRunningFlows := getMaxRunningFlows(settings)
fs.mu.runningFlows = make(map[execinfrapb.FlowID]execinfrapb.DistSQLRemoteFlowInfo, maxRunningFlows)
fs.atomics.maxRunningFlows = int32(maxRunningFlows)
settingMaxRunningFlows.SetOnChange(&settings.SV, func(ctx context.Context) {
settingMaxRunningFlows.SetOnChange(fs.sv, func(ctx context.Context) {
atomic.StoreInt32(&fs.atomics.maxRunningFlows, int32(getMaxRunningFlows(settings)))
})
return fs
Expand All @@ -142,13 +144,27 @@ func (fs *FlowScheduler) Init(metrics *execinfra.DistSQLMetrics) {
fs.metrics = metrics
}

var flowSchedulerQueueingEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.distsql.flow_scheduler_queueing.enabled",
"determines whether the flow scheduler imposes the limit on the maximum "+
"number of concurrent remote DistSQL flows that a single node can have "+
"(the limit is determined by the sql.distsql.max_running_flows setting)",
false,
)

// canRunFlow returns whether the FlowScheduler can run the flow. If true is
// returned, numRunning is also incremented.
// TODO(radu): we will have more complex resource accounting (like memory).
// For now we just limit the number of concurrent flows.
func (fs *FlowScheduler) canRunFlow(_ Flow) bool {
func (fs *FlowScheduler) canRunFlow() bool {
// Optimistically increase numRunning to account for this new flow.
newNumRunning := atomic.AddInt32(&fs.atomics.numRunning, 1)
if !flowSchedulerQueueingEnabled.Get(fs.sv) {
// The queueing behavior of the flow scheduler is disabled, so we can
// run this flow without checking against the maxRunningFlows counter).
return true
}
if newNumRunning <= atomic.LoadInt32(&fs.atomics.maxRunningFlows) {
// Happy case. This flow did not bring us over the limit, so return that the
// flow can be run and is accounted for in numRunning.
Expand Down Expand Up @@ -204,7 +220,7 @@ func (fs *FlowScheduler) ScheduleFlow(ctx context.Context, f Flow) error {
ctx, "flowinfra.FlowScheduler: scheduling flow", func(ctx context.Context) error {
fs.metrics.FlowsScheduled.Inc(1)
telemetry.Inc(sqltelemetry.DistSQLFlowsScheduled)
if fs.canRunFlow(f) {
if fs.canRunFlow() {
return fs.runFlowNow(ctx, f, false /* locked */)
}
fs.mu.Lock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/flowinfra/flow_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func TestFlowScheduler(t *testing.T) {
)
defer stopper.Stop(ctx)

// Enable the queueing mechanism of the flow scheduler.
flowSchedulerQueueingEnabled.Override(ctx, &settings.SV, true)
scheduler := NewFlowScheduler(log.MakeTestingAmbientCtxWithNewTracer(), stopper, settings)
scheduler.Init(&metrics)
scheduler.Start()
Expand Down