Skip to content

Commit

Permalink
distsql: make the number of DistSQL runners dynamic
Browse files Browse the repository at this point in the history
This commit improves the infrastructure around a pool of "DistSQL
runners" that are used for issuing SetupFlow RPCs in parallel.
Previously, we had a hard-coded number of 16 goroutines which was
probably insufficient in many cases. This commit makes it so that we use
the default value of `4 x N(cpus)` to make it proportional to how beefy
the node is (under the expectation that the larger the node is, the more
distributed queries it will be handling). The choice of the four as the
multiple was made so that we get the previous default on machines with
4 CPUs.

Additionally, this commit introduces a mechanism to dynamically adjust
the number of runners based on a cluster setting. Whenever the setting
is reduced, some of the workers are stopped, if the setting is
increased, then new workers are spun up accordingly. This coordinator
listens on two channels: once about the server quescing, and another
about the new target pool size. Whenever a new target size is received,
the coordinator will spin up / shut down one worker at a time until that
target size is achieved. The worker, however, doesn't access the server
quescing channel and, instead, relies on the coordinator to tell it to
exit (either by closing the channel when quescing or sending a single
message when the target size is decreased).

Release note: None
  • Loading branch information
yuzefovich committed Jul 22, 2022
1 parent 02727e3 commit 9490307
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 29 deletions.
8 changes: 4 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ type DistSQLPlanner struct {
distSQLSrv *distsql.ServerImpl
spanResolver physicalplan.SpanResolver

// runnerChan is used to send out requests (for running SetupFlow RPCs) to a
// pool of workers.
runnerChan chan runnerRequest
// runnerCoordinator is used to send out requests (for running SetupFlow
// RPCs) to a pool of workers.
runnerCoordinator runnerCoordinator

// cancelFlowsCoordinator is responsible for batching up the requests to
// cancel remote flows initiated on the behalf of the current node when the
Expand Down Expand Up @@ -216,7 +216,7 @@ func NewDistSQLPlanner(
rpcCtx.Stopper.AddCloser(dsp.parallelLocalScansSem.Closer("stopper"))
}

dsp.initRunners(ctx)
dsp.runnerCoordinator.init(ctx, stopper, &st.SV)
dsp.initCancelingWorkers(ctx)
return dsp
}
Expand Down
125 changes: 100 additions & 25 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"math"
"runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
Expand All @@ -50,18 +52,29 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
)

// To allow queries to send out flow RPCs in parallel, we use a pool of workers
// that can issue the RPCs on behalf of the running code. The pool is shared by
// multiple queries.
const numRunners = 16
var settingDistSQLRunnersCPUMultiple = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.distsql.runners_cpu_multiple",
"the value multiplied by the number of CPUs on a node to determine "+
"the number of DistSQL runner goroutines used for issuing SetupFlow RPCs",
// The choice of the default multiple of 4 was made in order to get the
// original value of 16 on machines with 4 CPUs.
4, /* defaultValue */
settings.PositiveInt,
)

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"
func getNumDistSQLRunners(cpuMultiple int64) int64 {
// We use GOMAXPROCS instead of NumCPU because the former could be adjusted
// based on cgroup limits (see cgroups.AdjustMaxProcs).
return cpuMultiple * int64(runtime.GOMAXPROCS(0))
}

// runnerRequest is the request that is sent (via a channel) to a worker.
type runnerRequest struct {
Expand Down Expand Up @@ -101,32 +114,92 @@ func (req runnerRequest) run() {
req.resultChan <- res
}

func (dsp *DistSQLPlanner) initRunners(ctx context.Context) {
type runnerCoordinator struct {
// runnerChan is used by the DistSQLPlanner to send out requests (for
// running SetupFlow RPCs) to a pool of workers.
runnerChan chan runnerRequest
// newDesiredNumWorkers is used to notify the coordinator that the size of
// the pool of workers might have changed.
newDesiredNumWorkers chan int64
// numWorkers tracks the number of workers running at the moment.
numWorkers int64
}

func (c *runnerCoordinator) init(ctx context.Context, stopper *stop.Stopper, sv *settings.Values) {
// This channel has to be unbuffered because we want to only be able to send
// requests if a worker is actually there to receive them.
dsp.runnerChan = make(chan runnerRequest)
for i := 0; i < numRunners; i++ {
_ = dsp.stopper.RunAsyncTask(ctx, "distsql-runner", func(context.Context) {
runnerChan := dsp.runnerChan
stopChan := dsp.stopper.ShouldQuiesce()
for {
select {
case req := <-runnerChan:
req.run()

case <-stopChan:
return
}
c.runnerChan = make(chan runnerRequest)
stopWorkerChan := make(chan struct{})
worker := func(context.Context) {
for {
select {
case req := <-c.runnerChan:
req.run()
case <-stopWorkerChan:
return
}
})
}
}
stopChan := stopper.ShouldQuiesce()
// This is a buffered channel because we will be sending on it from the
// callback when the corresponding setting changes. The buffer size of 1
// should be sufficient, but we use a larger buffer out of caution (in case
// the cluster setting is updated rapidly) - in order to not block the
// goroutine that is updating the settings.
c.newDesiredNumWorkers = make(chan int64, 4)
// setNewNumWorkers sets the new target size of the pool of workers.
setNewNumWorkers := func(newNumWorkers int64) {
select {
case c.newDesiredNumWorkers <- newNumWorkers:
case <-stopChan:
// If the server is quescing, then the new size of the pool doesn't
// matter.
return
}
}
// Whenever the corresponding setting is updated, we need to notify the
// coordinator.
settingDistSQLRunnersCPUMultiple.SetOnChange(sv, func(ctx context.Context) {
newNumWorkers := getNumDistSQLRunners(settingDistSQLRunnersCPUMultiple.Get(sv))
setNewNumWorkers(newNumWorkers)
})
// Spin up the coordinator goroutine. Note that we don't immediately spin up
// any of the worker goroutines - this will be done when the default value
// of the setting is set.
_ = stopper.RunAsyncTask(ctx, "distsql-runner-coordinator", func(context.Context) {
// Make sure to stop all workers when the coordinator exits.
defer close(stopWorkerChan)
for {
select {
case newNumWorkers := <-c.newDesiredNumWorkers:
for c.numWorkers != newNumWorkers {
if c.numWorkers < newNumWorkers {
// Need to spin another worker.
err := stopper.RunAsyncTask(ctx, "distsql-runner", worker)
if err != nil {
return
}
c.numWorkers++
} else {
// Need to stop one of the workers.
select {
case stopWorkerChan <- struct{}{}:
c.numWorkers--
case <-stopChan:
return
}
}
}
case <-stopChan:
return
}
}
})
}

// To allow for canceling flows via CancelDeadFlows RPC on different nodes
// simultaneously, we use a pool of workers. It is likely that these workers
// will be less busy than SetupFlow runners, so we instantiate smaller number of
// the canceling workers.
const numCancelingWorkers = numRunners / 4
// simultaneously, we use a pool of workers.
const numCancelingWorkers = 4

func (dsp *DistSQLPlanner) initCancelingWorkers(initCtx context.Context) {
dsp.cancelFlowsCoordinator.workerWait = make(chan struct{}, numCancelingWorkers)
Expand Down Expand Up @@ -341,7 +414,7 @@ func (dsp *DistSQLPlanner) setupFlows(
// Send out a request to the workers; if no worker is available, run
// directly.
select {
case dsp.runnerChan <- runReq:
case dsp.runnerCoordinator.runnerChan <- runReq:
default:
runReq.run()
}
Expand Down Expand Up @@ -373,6 +446,8 @@ func (dsp *DistSQLPlanner) setupFlows(
return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState)
}

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"

// Run executes a physical plan. The plan should have been finalized using
// FinalizePlan.
//
Expand Down

0 comments on commit 9490307

Please sign in to comment.