Skip to content

Commit

Permalink
distsqlrun: remove queuing behavior from flowScheduler
Browse files Browse the repository at this point in the history
Prior to this PR when SetupFlowRequests in excess of `max_running_flows` were
received they would be queued for processing. This queueing was unbounded and
had no notion of a "full" queue. While the queue was well intentioned and may
have led to desirable behavior in bursty, short-lived workloads, in cases of
high volumes of longer running flows the system would observe arbitrarily long
queuing delays. These delays would ultimately result propagate to the user in
the form of a timeout setting up the other side of the connection. This error
was both delayed and difficult for customers to interpret. The immediate action
is to remove the queuing behavior and increase the flow limit. It is worth
noting that this new limit is still arbitrary and unjustified except to say that
it's larger than the previous limit by a factor of 2. After this PR when the
`max_running_flows` limit is hit, clients will receive a more meaningful error
faster which hopefully will guide them to adjusting the setting when appropriate
or scaling out.

This commit could use a unit test to ensure that we do indeed see the newly
introduced error and I intend to write one soon. The machinery around setting
up a unit test seemed much more daunting than this change but I wanted to get
this out just to start a discussion about whether this is the right thing to do
and if not, what better approaches y'all might see.

Fixes cockroachdb#34229.

Release note: None
  • Loading branch information
ajwerner committed Feb 6, 2019
1 parent 5d7e15a commit cb3612b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
<tr><td><code>sql.distsql.distribute_index_joins</code></td><td>boolean</td><td><code>true</code></td><td>if set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader</td></tr>
<tr><td><code>sql.distsql.flow_stream_timeout</code></td><td>duration</td><td><code>10s</code></td><td>amount of time incoming streams wait for a flow to be set up before erroring out</td></tr>
<tr><td><code>sql.distsql.interleaved_joins.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set we plan interleaved table joins instead of merge joins when possible</td></tr>
<tr><td><code>sql.distsql.max_running_flows</code></td><td>integer</td><td><code>500</code></td><td>maximum number of concurrent flows that can be run on a node</td></tr>
<tr><td><code>sql.distsql.max_running_flows</code></td><td>integer</td><td><code>1000</code></td><td>maximum number of concurrent flows that can be run on a node</td></tr>
<tr><td><code>sql.distsql.merge_joins.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, we plan merge joins when possible</td></tr>
<tr><td><code>sql.distsql.temp_storage.joins</code></td><td>boolean</td><td><code>true</code></td><td>set to true to enable use of disk for distributed sql joins</td></tr>
<tr><td><code>sql.distsql.temp_storage.sorts</code></td><td>boolean</td><td><code>true</code></td><td>set to true to enable use of disk for distributed sql sorts</td></tr>
Expand Down
86 changes: 34 additions & 52 deletions pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,48 @@
package distsqlrun

import (
"container/list"
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const flowDoneChanSize = 8

// NB: Once upon a time there was a mechanism to queue flows above this
// max_running_flows which had a positive consequence of handling bursts of
// flows gracefully. Unfortunately it had the negative consequence that when
// queuing was occurring it would ultimately lead to connection timeout on the
// inbound side of the connection. This led to hard to interpret error messages.
// The decision was made that in lieu of a more complex mechanism we'd prefer to
// fail fast with a clear error message which is easy to triage. The removal of
// the queue however leaves open the likelihood of hitting an error case
// in workloads which before may have experienced some queueing but generally
// worked well. Removing the queuing exacerbates the importance of choosing an
// appropriate value for this setting. Unfortunately there do not exist
// sophisticated mechanisms to detect an appropriate value based on resource
// constraints. The intuition for now is to choose a value we expect to be
// conservative and educate customers who both hit this error and have a large
// amount of RAM headroom to increase it until either they are not hitting the
// limit or they begin to be squeezed on RAM at which point they likely need to
// scale out their cluster.

// TODO(ajwerner): devise more robust overload / resource allocation mechanisms.
// TODO(ajwerner): In lieu of above, justify the default max_running_flows.

var settingMaxRunningFlows = settings.RegisterIntSetting(
"sql.distsql.max_running_flows",
"maximum number of concurrent flows that can be run on a node",
500,
1000,
)

// flowScheduler manages running flows and decides when to queue and when to
// start flows. The main interface it presents is ScheduleFlows, which passes a
// flow to be run.
// flowScheduler manages running flows and decides whether to start flows.
// The main interface it presents is ScheduleFlows, which passes a flow to be
// run.
type flowScheduler struct {
log.AmbientContext
stopper *stop.Stopper
Expand All @@ -48,19 +67,9 @@ type flowScheduler struct {
syncutil.Mutex
numRunning int
maxRunningFlows int
queue *list.List
}
}

// flowWithCtx stores a flow to run and a context to run it with.
// TODO(asubiotto): Figure out if asynchronous flow execution can be rearranged
// to avoid the need to store the context.
type flowWithCtx struct {
ctx context.Context
flow *Flow
enqueueTime time.Time
}

func newFlowScheduler(
ambient log.AmbientContext,
stopper *stop.Stopper,
Expand All @@ -73,7 +82,6 @@ func newFlowScheduler(
flowDoneCh: make(chan *Flow, flowDoneChanSize),
metrics: metrics,
}
fs.mu.queue = list.New()
fs.mu.maxRunningFlows = int(settingMaxRunningFlows.Get(&settings.SV))
settingMaxRunningFlows.SetOnChange(&settings.SV, func() {
fs.mu.Lock()
Expand Down Expand Up @@ -108,29 +116,22 @@ func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) error {
return nil
}

// ScheduleFlow is the main interface of the flow scheduler: it runs or enqueues
// the given flow.
//
// If the flow can start immediately, errors encountered when starting the flow
// are returned. If the flow is enqueued, these error will be later ignored.
var errTooManyFlows = pgerror.NewError(pgerror.CodeInsufficientResourcesError,
"max_running_flows exceeded")

// ScheduleFlow is the main interface of the flow scheduler. It runs the given
// flow or rejects it with errTooManyFlows. Errors encountered when starting the
// flow are returned.
func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error {
return fs.stopper.RunTaskWithErr(
ctx, "distsqlrun.flowScheduler: scheduling flow", func(ctx context.Context) error {
fs.mu.Lock()
defer fs.mu.Unlock()

if fs.canRunFlow(f) {
return fs.runFlowNow(ctx, f)
}
log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id)
fs.metrics.FlowsQueued.Inc(1)
fs.mu.queue.PushBack(&flowWithCtx{
ctx: ctx,
flow: f,
enqueueTime: timeutil.Now(),
})
return nil

fs.metrics.FlowsRejected.Inc(1)
return errTooManyFlows
})
}

Expand All @@ -144,7 +145,6 @@ func (fs *flowScheduler) Start() {

for {
if stopped && fs.mu.numRunning == 0 {
// TODO(radu): somehow error out the flows that are still in the queue.
return
}
fs.mu.Unlock()
Expand All @@ -153,24 +153,6 @@ func (fs *flowScheduler) Start() {
fs.mu.Lock()
fs.mu.numRunning--
fs.metrics.FlowStop()
if !stopped {
if frElem := fs.mu.queue.Front(); frElem != nil {
n := frElem.Value.(*flowWithCtx)
fs.mu.queue.Remove(frElem)
wait := timeutil.Since(n.enqueueTime)
log.VEventf(
n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, wait,
)
fs.metrics.FlowsQueued.Dec(1)
fs.metrics.QueueWaitHist.RecordValue(int64(wait))
// Note: we use the flow's context instead of the worker
// context, to ensure that logging etc is relative to the
// specific flow.
if err := fs.runFlowNow(n.ctx, n.flow); err != nil {
log.Errorf(n.ctx, "error starting queued flow: %s", err)
}
}
}

case <-fs.stopper.ShouldStop():
fs.mu.Lock()
Expand Down
18 changes: 5 additions & 13 deletions pkg/sql/distsqlrun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type DistSQLMetrics struct {
QueriesTotal *metric.Counter
FlowsActive *metric.Gauge
FlowsTotal *metric.Counter
FlowsQueued *metric.Gauge
QueueWaitHist *metric.Histogram
FlowsRejected *metric.Counter
MaxBytesHist *metric.Histogram
CurBytesCount *metric.Gauge
}
Expand Down Expand Up @@ -63,18 +62,12 @@ var (
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaFlowsQueued = metric.Metadata{
Name: "sql.distsql.flows.queued",
Help: "Number of distributed SQL flows currently queued",
metaFlowsRejected = metric.Metadata{
Name: "sql.distsql.flows.rejected",
Help: "Number of distributed SQL flows rejected due to too many active",
Measurement: "Flows",
Unit: metric.Unit_COUNT,
}
metaQueueWaitHist = metric.Metadata{
Name: "sql.distsql.flows.queue_wait",
Help: "Duration of time flows spend waiting in the queue",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaMemMaxBytes = metric.Metadata{
Name: "sql.mem.distsql.max",
Help: "Memory usage per sql statement for distsql",
Expand All @@ -100,8 +93,7 @@ func MakeDistSQLMetrics(histogramWindow time.Duration) DistSQLMetrics {
QueriesTotal: metric.NewCounter(metaQueriesTotal),
FlowsActive: metric.NewGauge(metaFlowsActive),
FlowsTotal: metric.NewCounter(metaFlowsTotal),
FlowsQueued: metric.NewGauge(metaFlowsQueued),
QueueWaitHist: metric.NewLatency(metaQueueWaitHist, histogramWindow),
FlowsRejected: metric.NewCounter(metaFlowsRejected),
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, log10int64times1000, 3),
CurBytesCount: metric.NewGauge(metaMemCurBytes),
}
Expand Down

0 comments on commit cb3612b

Please sign in to comment.