diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index d8f8c6ba6121..67756abf0683 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -71,7 +71,7 @@
sql.distsql.distribute_index_joins | boolean | true | if set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader |
sql.distsql.flow_stream_timeout | duration | 10s | amount of time incoming streams wait for a flow to be set up before erroring out |
sql.distsql.interleaved_joins.enabled | boolean | true | if set we plan interleaved table joins instead of merge joins when possible |
-sql.distsql.max_running_flows | integer | 500 | maximum number of concurrent flows that can be run on a node |
+sql.distsql.max_running_flows | integer | 1000 | maximum number of concurrent flows that can be run on a node |
sql.distsql.merge_joins.enabled | boolean | true | if set, we plan merge joins when possible |
sql.distsql.temp_storage.joins | boolean | true | set to true to enable use of disk for distributed sql joins |
sql.distsql.temp_storage.sorts | boolean | true | set to true to enable use of disk for distributed sql sorts |
diff --git a/pkg/sql/distsqlrun/flow_scheduler.go b/pkg/sql/distsqlrun/flow_scheduler.go
index 067a2886eb04..12401b713ad5 100644
--- a/pkg/sql/distsqlrun/flow_scheduler.go
+++ b/pkg/sql/distsqlrun/flow_scheduler.go
@@ -15,29 +15,48 @@
package distsqlrun
import (
- "container/list"
"context"
- "time"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
- "github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
const flowDoneChanSize = 8
+// NB: Once upon a time there was a mechanism to queue flows above this
+// max_running_flows which had a positive consequence of handling bursts of
+// flows gracefully. Unfortunately it had the negative consequence that when
+// queuing was occurring it would ultimately lead to connection timeout on the
+// inbound side of the connection. This led to hard to interpret error messages.
+// The decision was made that in lieu of a more complex mechanism we'd prefer to
+// fail fast with a clear error message which is easy to triage. The removal of
+// the queue however leaves open the likelihood of hitting an error case
+// in workloads which before may have experienced some queueing but generally
+// worked well. Removing the queuing exacerbates the importance of choosing an
+// appropriate value for this setting. Unfortunately there do not exist
+// sophisticated mechanisms to detect an appropriate value based on resource
+// constraints. The intuition for now is to choose a value we expect to be
+// conservative and educate customers who both hit this error and have a large
+// amount of RAM headroom to increase it until either they are not hitting the
+// limit or they begin to be squeezed on RAM at which point they likely need to
+// scale out their cluster.
+
+// TODO(ajwerner): devise more robust overload / resource allocation mechanisms.
+// TODO(ajwerner): In lieu of above, justify the default max_running_flows.
+
var settingMaxRunningFlows = settings.RegisterIntSetting(
"sql.distsql.max_running_flows",
"maximum number of concurrent flows that can be run on a node",
- 500,
+ 1000,
)
-// flowScheduler manages running flows and decides when to queue and when to
-// start flows. The main interface it presents is ScheduleFlows, which passes a
-// flow to be run.
+// flowScheduler manages running flows and decides whether to start flows.
+// The main interface it presents is ScheduleFlows, which passes a flow to be
+// run.
type flowScheduler struct {
log.AmbientContext
stopper *stop.Stopper
@@ -48,19 +67,9 @@ type flowScheduler struct {
syncutil.Mutex
numRunning int
maxRunningFlows int
- queue *list.List
}
}
-// flowWithCtx stores a flow to run and a context to run it with.
-// TODO(asubiotto): Figure out if asynchronous flow execution can be rearranged
-// to avoid the need to store the context.
-type flowWithCtx struct {
- ctx context.Context
- flow *Flow
- enqueueTime time.Time
-}
-
func newFlowScheduler(
ambient log.AmbientContext,
stopper *stop.Stopper,
@@ -73,7 +82,6 @@ func newFlowScheduler(
flowDoneCh: make(chan *Flow, flowDoneChanSize),
metrics: metrics,
}
- fs.mu.queue = list.New()
fs.mu.maxRunningFlows = int(settingMaxRunningFlows.Get(&settings.SV))
settingMaxRunningFlows.SetOnChange(&settings.SV, func() {
fs.mu.Lock()
@@ -108,29 +116,22 @@ func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) error {
return nil
}
-// ScheduleFlow is the main interface of the flow scheduler: it runs or enqueues
-// the given flow.
-//
-// If the flow can start immediately, errors encountered when starting the flow
-// are returned. If the flow is enqueued, these error will be later ignored.
+var errTooManyFlows = pgerror.NewError(pgerror.CodeInsufficientResourcesError,
+ "max_running_flows exceeded")
+
+// ScheduleFlow is the main interface of the flow scheduler. It runs the given
+// flow or rejects it with errTooManyFlows. Errors encountered when starting the
+// flow are returned.
func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error {
return fs.stopper.RunTaskWithErr(
ctx, "distsqlrun.flowScheduler: scheduling flow", func(ctx context.Context) error {
fs.mu.Lock()
defer fs.mu.Unlock()
-
if fs.canRunFlow(f) {
return fs.runFlowNow(ctx, f)
}
- log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id)
- fs.metrics.FlowsQueued.Inc(1)
- fs.mu.queue.PushBack(&flowWithCtx{
- ctx: ctx,
- flow: f,
- enqueueTime: timeutil.Now(),
- })
- return nil
-
+ fs.metrics.FlowsRejected.Inc(1)
+ return errTooManyFlows
})
}
@@ -144,7 +145,6 @@ func (fs *flowScheduler) Start() {
for {
if stopped && fs.mu.numRunning == 0 {
- // TODO(radu): somehow error out the flows that are still in the queue.
return
}
fs.mu.Unlock()
@@ -153,24 +153,6 @@ func (fs *flowScheduler) Start() {
fs.mu.Lock()
fs.mu.numRunning--
fs.metrics.FlowStop()
- if !stopped {
- if frElem := fs.mu.queue.Front(); frElem != nil {
- n := frElem.Value.(*flowWithCtx)
- fs.mu.queue.Remove(frElem)
- wait := timeutil.Since(n.enqueueTime)
- log.VEventf(
- n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, wait,
- )
- fs.metrics.FlowsQueued.Dec(1)
- fs.metrics.QueueWaitHist.RecordValue(int64(wait))
- // Note: we use the flow's context instead of the worker
- // context, to ensure that logging etc is relative to the
- // specific flow.
- if err := fs.runFlowNow(n.ctx, n.flow); err != nil {
- log.Errorf(n.ctx, "error starting queued flow: %s", err)
- }
- }
- }
case <-fs.stopper.ShouldStop():
fs.mu.Lock()
diff --git a/pkg/sql/distsqlrun/metrics.go b/pkg/sql/distsqlrun/metrics.go
index 9763339c9ad6..af945430be83 100644
--- a/pkg/sql/distsqlrun/metrics.go
+++ b/pkg/sql/distsqlrun/metrics.go
@@ -27,8 +27,7 @@ type DistSQLMetrics struct {
QueriesTotal *metric.Counter
FlowsActive *metric.Gauge
FlowsTotal *metric.Counter
- FlowsQueued *metric.Gauge
- QueueWaitHist *metric.Histogram
+ FlowsRejected *metric.Counter
MaxBytesHist *metric.Histogram
CurBytesCount *metric.Gauge
}
@@ -63,18 +62,12 @@ var (
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
- metaFlowsQueued = metric.Metadata{
- Name: "sql.distsql.flows.queued",
- Help: "Number of distributed SQL flows currently queued",
+ metaFlowsRejected = metric.Metadata{
+ Name: "sql.distsql.flows.rejected",
+ Help: "Number of distributed SQL flows rejected due to too many active",
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
- metaQueueWaitHist = metric.Metadata{
- Name: "sql.distsql.flows.queue_wait",
- Help: "Duration of time flows spend waiting in the queue",
- Measurement: "Nanoseconds",
- Unit: metric.Unit_NANOSECONDS,
- }
metaMemMaxBytes = metric.Metadata{
Name: "sql.mem.distsql.max",
Help: "Memory usage per sql statement for distsql",
@@ -100,8 +93,7 @@ func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics {
QueriesTotal: metric.NewCounter(metaQueriesTotal),
FlowsActive: metric.NewGauge(metaFlowsActive),
FlowsTotal: metric.NewCounter(metaFlowsTotal),
- FlowsQueued: metric.NewGauge(metaFlowsQueued),
- QueueWaitHist: metric.NewLatency(metaQueueWaitHist, histogramWindow),
+ FlowsRejected: metric.NewCounter(metaFlowsRejected),
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, log10int64times1000, 3),
CurBytesCount: metric.NewGauge(metaMemCurBytes),
}