From cb3612b66e62338c8027d6449bf03107cca4fd86 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 30 Jan 2019 16:45:50 -0500 Subject: [PATCH] distsqlrun: remove queuing behavior from flowScheduler Prior to this PR when SetupFlowRequests in excess of `max_running_flows` were received they would be queued for processing. This queueing was unbounded and had no notion of a "full" queue. While the queue was well intentioned and may have led to desirable behavior in bursty, short-lived workloads, in cases of high volumes of longer running flows the system would observe arbitrarily long queuing delays. These delays would ultimately result propagate to the user in the form of a timeout setting up the other side of the connection. This error was both delayed and difficult for customers to interpret. The immediate action is to remove the queuing behavior and increase the flow limit. It is worth noting that this new limit is still arbitrary and unjustified except to say that it's larger than the previous limit by a factor of 2. After this PR when the `max_running_flows` limit is hit, clients will receive a more meaningful error faster which hopefully will guide them to adjusting the setting when appropriate or scaling out. This commit could use a unit test to ensure that we do indeed see the newly introduced error and I intend to write one soon. The machinery around setting up a unit test seemed much more daunting than this change but I wanted to get this out just to start a discussion about whether this is the right thing to do and if not, what better approaches y'all might see. Fixes #34229. Release note: None --- docs/generated/settings/settings.html | 2 +- pkg/sql/distsqlrun/flow_scheduler.go | 86 +++++++++++---------------- pkg/sql/distsqlrun/metrics.go | 18 ++---- 3 files changed, 40 insertions(+), 66 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index d8f8c6ba6121..67756abf0683 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -71,7 +71,7 @@ sql.distsql.distribute_index_joinsbooleantrueif set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader sql.distsql.flow_stream_timeoutduration10samount of time incoming streams wait for a flow to be set up before erroring out sql.distsql.interleaved_joins.enabledbooleantrueif set we plan interleaved table joins instead of merge joins when possible -sql.distsql.max_running_flowsinteger500maximum number of concurrent flows that can be run on a node +sql.distsql.max_running_flowsinteger1000maximum number of concurrent flows that can be run on a node sql.distsql.merge_joins.enabledbooleantrueif set, we plan merge joins when possible sql.distsql.temp_storage.joinsbooleantrueset to true to enable use of disk for distributed sql joins sql.distsql.temp_storage.sortsbooleantrueset to true to enable use of disk for distributed sql sorts diff --git a/pkg/sql/distsqlrun/flow_scheduler.go b/pkg/sql/distsqlrun/flow_scheduler.go index 067a2886eb04..12401b713ad5 100644 --- a/pkg/sql/distsqlrun/flow_scheduler.go +++ b/pkg/sql/distsqlrun/flow_scheduler.go @@ -15,29 +15,48 @@ package distsqlrun import ( - "container/list" "context" - "time" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) const flowDoneChanSize = 8 +// NB: Once upon a time there was a mechanism to queue flows above this +// max_running_flows which had a positive consequence of handling bursts of +// flows gracefully. Unfortunately it had the negative consequence that when +// queuing was occurring it would ultimately lead to connection timeout on the +// inbound side of the connection. This led to hard to interpret error messages. +// The decision was made that in lieu of a more complex mechanism we'd prefer to +// fail fast with a clear error message which is easy to triage. The removal of +// the queue however leaves open the likelihood of hitting an error case +// in workloads which before may have experienced some queueing but generally +// worked well. Removing the queuing exacerbates the importance of choosing an +// appropriate value for this setting. Unfortunately there do not exist +// sophisticated mechanisms to detect an appropriate value based on resource +// constraints. The intuition for now is to choose a value we expect to be +// conservative and educate customers who both hit this error and have a large +// amount of RAM headroom to increase it until either they are not hitting the +// limit or they begin to be squeezed on RAM at which point they likely need to +// scale out their cluster. + +// TODO(ajwerner): devise more robust overload / resource allocation mechanisms. +// TODO(ajwerner): In lieu of above, justify the default max_running_flows. + var settingMaxRunningFlows = settings.RegisterIntSetting( "sql.distsql.max_running_flows", "maximum number of concurrent flows that can be run on a node", - 500, + 1000, ) -// flowScheduler manages running flows and decides when to queue and when to -// start flows. The main interface it presents is ScheduleFlows, which passes a -// flow to be run. +// flowScheduler manages running flows and decides whether to start flows. +// The main interface it presents is ScheduleFlows, which passes a flow to be +// run. type flowScheduler struct { log.AmbientContext stopper *stop.Stopper @@ -48,19 +67,9 @@ type flowScheduler struct { syncutil.Mutex numRunning int maxRunningFlows int - queue *list.List } } -// flowWithCtx stores a flow to run and a context to run it with. -// TODO(asubiotto): Figure out if asynchronous flow execution can be rearranged -// to avoid the need to store the context. -type flowWithCtx struct { - ctx context.Context - flow *Flow - enqueueTime time.Time -} - func newFlowScheduler( ambient log.AmbientContext, stopper *stop.Stopper, @@ -73,7 +82,6 @@ func newFlowScheduler( flowDoneCh: make(chan *Flow, flowDoneChanSize), metrics: metrics, } - fs.mu.queue = list.New() fs.mu.maxRunningFlows = int(settingMaxRunningFlows.Get(&settings.SV)) settingMaxRunningFlows.SetOnChange(&settings.SV, func() { fs.mu.Lock() @@ -108,29 +116,22 @@ func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) error { return nil } -// ScheduleFlow is the main interface of the flow scheduler: it runs or enqueues -// the given flow. -// -// If the flow can start immediately, errors encountered when starting the flow -// are returned. If the flow is enqueued, these error will be later ignored. +var errTooManyFlows = pgerror.NewError(pgerror.CodeInsufficientResourcesError, + "max_running_flows exceeded") + +// ScheduleFlow is the main interface of the flow scheduler. It runs the given +// flow or rejects it with errTooManyFlows. Errors encountered when starting the +// flow are returned. func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error { return fs.stopper.RunTaskWithErr( ctx, "distsqlrun.flowScheduler: scheduling flow", func(ctx context.Context) error { fs.mu.Lock() defer fs.mu.Unlock() - if fs.canRunFlow(f) { return fs.runFlowNow(ctx, f) } - log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id) - fs.metrics.FlowsQueued.Inc(1) - fs.mu.queue.PushBack(&flowWithCtx{ - ctx: ctx, - flow: f, - enqueueTime: timeutil.Now(), - }) - return nil - + fs.metrics.FlowsRejected.Inc(1) + return errTooManyFlows }) } @@ -144,7 +145,6 @@ func (fs *flowScheduler) Start() { for { if stopped && fs.mu.numRunning == 0 { - // TODO(radu): somehow error out the flows that are still in the queue. return } fs.mu.Unlock() @@ -153,24 +153,6 @@ func (fs *flowScheduler) Start() { fs.mu.Lock() fs.mu.numRunning-- fs.metrics.FlowStop() - if !stopped { - if frElem := fs.mu.queue.Front(); frElem != nil { - n := frElem.Value.(*flowWithCtx) - fs.mu.queue.Remove(frElem) - wait := timeutil.Since(n.enqueueTime) - log.VEventf( - n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, wait, - ) - fs.metrics.FlowsQueued.Dec(1) - fs.metrics.QueueWaitHist.RecordValue(int64(wait)) - // Note: we use the flow's context instead of the worker - // context, to ensure that logging etc is relative to the - // specific flow. - if err := fs.runFlowNow(n.ctx, n.flow); err != nil { - log.Errorf(n.ctx, "error starting queued flow: %s", err) - } - } - } case <-fs.stopper.ShouldStop(): fs.mu.Lock() diff --git a/pkg/sql/distsqlrun/metrics.go b/pkg/sql/distsqlrun/metrics.go index 9763339c9ad6..af945430be83 100644 --- a/pkg/sql/distsqlrun/metrics.go +++ b/pkg/sql/distsqlrun/metrics.go @@ -27,8 +27,7 @@ type DistSQLMetrics struct { QueriesTotal *metric.Counter FlowsActive *metric.Gauge FlowsTotal *metric.Counter - FlowsQueued *metric.Gauge - QueueWaitHist *metric.Histogram + FlowsRejected *metric.Counter MaxBytesHist *metric.Histogram CurBytesCount *metric.Gauge } @@ -63,18 +62,12 @@ var ( Measurement: "Flows", Unit: metric.Unit_COUNT, } - metaFlowsQueued = metric.Metadata{ - Name: "sql.distsql.flows.queued", - Help: "Number of distributed SQL flows currently queued", + metaFlowsRejected = metric.Metadata{ + Name: "sql.distsql.flows.rejected", + Help: "Number of distributed SQL flows rejected due to too many active", Measurement: "Flows", Unit: metric.Unit_COUNT, } - metaQueueWaitHist = metric.Metadata{ - Name: "sql.distsql.flows.queue_wait", - Help: "Duration of time flows spend waiting in the queue", - Measurement: "Nanoseconds", - Unit: metric.Unit_NANOSECONDS, - } metaMemMaxBytes = metric.Metadata{ Name: "sql.mem.distsql.max", Help: "Memory usage per sql statement for distsql", @@ -100,8 +93,7 @@ func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics { QueriesTotal: metric.NewCounter(metaQueriesTotal), FlowsActive: metric.NewGauge(metaFlowsActive), FlowsTotal: metric.NewCounter(metaFlowsTotal), - FlowsQueued: metric.NewGauge(metaFlowsQueued), - QueueWaitHist: metric.NewLatency(metaQueueWaitHist, histogramWindow), + FlowsRejected: metric.NewCounter(metaFlowsRejected), MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, log10int64times1000, 3), CurBytesCount: metric.NewGauge(metaMemCurBytes), }