diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index ac22ff36a426..e57754aa1a10 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -92,9 +92,9 @@ type DistSQLPlanner struct { distSQLSrv *distsql.ServerImpl spanResolver physicalplan.SpanResolver - // runnerChan is used to send out requests (for running SetupFlow RPCs) to a - // pool of workers. - runnerChan chan runnerRequest + // runnerCoordinator is used to send out requests (for running SetupFlow + // RPCs) to a pool of workers. + runnerCoordinator runnerCoordinator // cancelFlowsCoordinator is responsible for batching up the requests to // cancel remote flows initiated on the behalf of the current node when the @@ -216,7 +216,7 @@ func NewDistSQLPlanner( rpcCtx.Stopper.AddCloser(dsp.parallelLocalScansSem.Closer("stopper")) } - dsp.initRunners(ctx) + dsp.runnerCoordinator.init(ctx, stopper, &st.SV) dsp.initCancelingWorkers(ctx) return dsp } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 0b57213c7978..b55c1278a771 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "math" + "runtime" "sync" "sync/atomic" "time" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/colflow" "github.com/cockroachdb/cockroach/pkg/sql/contention" "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" @@ -50,18 +52,29 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/ring" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" ) -// To allow queries to send out flow RPCs in parallel, we use a pool of workers -// that can issue the RPCs on behalf of the running code. The pool is shared by -// multiple queries. -const numRunners = 16 +var settingDistSQLRunnersCPUMultiple = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.distsql.runners_cpu_multiple", + "the value multiplied by the number of CPUs on a node to determine "+ + "the number of DistSQL runner goroutines used for issuing SetupFlow RPCs", + // The choice of the default multiple of 4 was made in order to get the + // original value of 16 on machines with 4 CPUs. + 4, /* defaultValue */ + settings.PositiveInt, +) -const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" +func getNumDistSQLRunners(cpuMultiple int64) int64 { + // We use GOMAXPROCS instead of NumCPU because the former could be adjusted + // based on cgroup limits (see cgroups.AdjustMaxProcs). + return cpuMultiple * int64(runtime.GOMAXPROCS(0)) +} // runnerRequest is the request that is sent (via a channel) to a worker. type runnerRequest struct { @@ -101,32 +114,92 @@ func (req runnerRequest) run() { req.resultChan <- res } -func (dsp *DistSQLPlanner) initRunners(ctx context.Context) { +type runnerCoordinator struct { + // runnerChan is used by the DistSQLPlanner to send out requests (for + // running SetupFlow RPCs) to a pool of workers. + runnerChan chan runnerRequest + // newDesiredNumWorkers is used to notify the coordinator that the size of + // the pool of workers might have changed. + newDesiredNumWorkers chan int64 + // numWorkers tracks the number of workers running at the moment. + numWorkers int64 +} + +func (c *runnerCoordinator) init(ctx context.Context, stopper *stop.Stopper, sv *settings.Values) { // This channel has to be unbuffered because we want to only be able to send // requests if a worker is actually there to receive them. - dsp.runnerChan = make(chan runnerRequest) - for i := 0; i < numRunners; i++ { - _ = dsp.stopper.RunAsyncTask(ctx, "distsql-runner", func(context.Context) { - runnerChan := dsp.runnerChan - stopChan := dsp.stopper.ShouldQuiesce() - for { - select { - case req := <-runnerChan: - req.run() - - case <-stopChan: - return - } + c.runnerChan = make(chan runnerRequest) + stopWorkerChan := make(chan struct{}) + worker := func(context.Context) { + for { + select { + case req := <-c.runnerChan: + req.run() + case <-stopWorkerChan: + return } - }) + } } + stopChan := stopper.ShouldQuiesce() + // This is a buffered channel because we will be sending on it from the + // callback when the corresponding setting changes. The buffer size of 1 + // should be sufficient, but we use a larger buffer out of caution (in case + // the cluster setting is updated rapidly) - in order to not block the + // goroutine that is updating the settings. + c.newDesiredNumWorkers = make(chan int64, 4) + // setNewNumWorkers sets the new target size of the pool of workers. + setNewNumWorkers := func(newNumWorkers int64) { + select { + case c.newDesiredNumWorkers <- newNumWorkers: + case <-stopChan: + // If the server is quescing, then the new size of the pool doesn't + // matter. + return + } + } + // Whenever the corresponding setting is updated, we need to notify the + // coordinator. + settingDistSQLRunnersCPUMultiple.SetOnChange(sv, func(ctx context.Context) { + newNumWorkers := getNumDistSQLRunners(settingDistSQLRunnersCPUMultiple.Get(sv)) + setNewNumWorkers(newNumWorkers) + }) + // Spin up the coordinator goroutine. Note that we don't immediately spin up + // any of the worker goroutines - this will be done when the default value + // of the setting is set. + _ = stopper.RunAsyncTask(ctx, "distsql-runner-coordinator", func(context.Context) { + // Make sure to stop all workers when the coordinator exits. + defer close(stopWorkerChan) + for { + select { + case newNumWorkers := <-c.newDesiredNumWorkers: + for c.numWorkers != newNumWorkers { + if c.numWorkers < newNumWorkers { + // Need to spin another worker. + err := stopper.RunAsyncTask(ctx, "distsql-runner", worker) + if err != nil { + return + } + c.numWorkers++ + } else { + // Need to stop one of the workers. + select { + case stopWorkerChan <- struct{}{}: + c.numWorkers-- + case <-stopChan: + return + } + } + } + case <-stopChan: + return + } + } + }) } // To allow for canceling flows via CancelDeadFlows RPC on different nodes -// simultaneously, we use a pool of workers. It is likely that these workers -// will be less busy than SetupFlow runners, so we instantiate smaller number of -// the canceling workers. -const numCancelingWorkers = numRunners / 4 +// simultaneously, we use a pool of workers. +const numCancelingWorkers = 4 func (dsp *DistSQLPlanner) initCancelingWorkers(initCtx context.Context) { dsp.cancelFlowsCoordinator.workerWait = make(chan struct{}, numCancelingWorkers) @@ -341,7 +414,7 @@ func (dsp *DistSQLPlanner) setupFlows( // Send out a request to the workers; if no worker is available, run // directly. select { - case dsp.runnerChan <- runReq: + case dsp.runnerCoordinator.runnerChan <- runReq: default: runReq.run() } @@ -373,6 +446,8 @@ func (dsp *DistSQLPlanner) setupFlows( return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) } +const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" + // Run executes a physical plan. The plan should have been finalized using // FinalizePlan. //