diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 7aa3f943d549..e8d2bb6eedbc 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -91,9 +91,9 @@ type DistSQLPlanner struct { distSQLSrv *distsql.ServerImpl spanResolver physicalplan.SpanResolver - // runnerChan is used to send out requests (for running SetupFlow RPCs) to a - // pool of workers. - runnerChan chan runnerRequest + // runnerCoordinator is used to send out requests (for running SetupFlow + // RPCs) to a pool of workers. + runnerCoordinator runnerCoordinator // cancelFlowsCoordinator is responsible for batching up the requests to // cancel remote flows initiated on the behalf of the current node when the @@ -211,7 +211,7 @@ func NewDistSQLPlanner( rpcCtx.Stopper.AddCloser(dsp.parallelLocalScansSem.Closer("stopper")) } - dsp.initRunners(ctx) + dsp.runnerCoordinator.init(ctx, stopper, &st.SV) dsp.initCancelingWorkers(ctx) return dsp } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 64bafdf62866..bcc931d320ec 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "math" + "runtime" "sync" "sync/atomic" "time" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/colflow" "github.com/cockroachdb/cockroach/pkg/sql/contention" "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" @@ -50,18 +52,36 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/ring" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" ) -// To allow queries to send out flow RPCs in parallel, we use a pool of workers -// that can issue the RPCs on behalf of the running code. The pool is shared by -// multiple queries. -const numRunners = 16 +var settingDistSQLNumRunners = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.distsql.num_runners", + "determines the number of DistSQL runner goroutines used for issuing SetupFlow RPCs", + // We use GOMAXPROCS instead of NumCPU because the former could be adjusted + // based on cgroup limits (see cgroups.AdjustMaxProcs). + // + // The choice of the default multiple of 4 was made in order to get the + // original value of 16 on machines with 4 CPUs. + 4*int64(runtime.GOMAXPROCS(0)), /* defaultValue */ + func(v int64) error { + if v < 0 { + return errors.Errorf("cannot be set to a negative value: %d", v) + } + if v > distSQLNumRunnersMax { + return errors.Errorf("cannot be set to a value exceeding %d: %d", distSQLNumRunnersMax, v) + } + return nil + }, +) -const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" +// Somewhat arbitrary upper bound. +var distSQLNumRunnersMax = 256 * int64(runtime.GOMAXPROCS(0)) // runnerRequest is the request that is sent (via a channel) to a worker. type runnerRequest struct { @@ -101,32 +121,104 @@ func (req runnerRequest) run() { req.resultChan <- res } -func (dsp *DistSQLPlanner) initRunners(ctx context.Context) { +type runnerCoordinator struct { + // runnerChan is used by the DistSQLPlanner to send out requests (for + // running SetupFlow RPCs) to a pool of workers. + runnerChan chan runnerRequest + // newDesiredNumWorkers is used to notify the coordinator that the size of + // the pool of workers might have changed. + newDesiredNumWorkers chan int64 + atomics struct { + // numWorkers tracks the number of workers running at the moment. This + // needs to be accessed atomically, but only because of the usage in + // tests. + numWorkers int64 + } +} + +func (c *runnerCoordinator) init(ctx context.Context, stopper *stop.Stopper, sv *settings.Values) { // This channel has to be unbuffered because we want to only be able to send // requests if a worker is actually there to receive them. - dsp.runnerChan = make(chan runnerRequest) - for i := 0; i < numRunners; i++ { - _ = dsp.stopper.RunAsyncTask(ctx, "distsql-runner", func(context.Context) { - runnerChan := dsp.runnerChan - stopChan := dsp.stopper.ShouldQuiesce() - for { - select { - case req := <-runnerChan: - req.run() - - case <-stopChan: - return - } + c.runnerChan = make(chan runnerRequest) + stopWorkerChan := make(chan struct{}) + worker := func(context.Context) { + for { + select { + case req := <-c.runnerChan: + req.run() + case <-stopWorkerChan: + return } - }) + } } + stopChan := stopper.ShouldQuiesce() + // This is a buffered channel because we will be sending on it from the + // callback when the corresponding setting changes. The buffer size of 1 + // should be sufficient, but we use a larger buffer out of caution (in case + // the cluster setting is updated rapidly) - in order to not block the + // goroutine that is updating the settings. + c.newDesiredNumWorkers = make(chan int64, 4) + // setNewNumWorkers sets the new target size of the pool of workers. + setNewNumWorkers := func(newNumWorkers int64) { + select { + case c.newDesiredNumWorkers <- newNumWorkers: + case <-stopChan: + // If the server is quescing, then the new size of the pool doesn't + // matter. + return + } + } + // Whenever the corresponding setting is updated, we need to notify the + // coordinator. + // NB: runnerCoordinator.init is called once per server lifetime so this + // won't leak an unbounded number of OnChange callbacks. + settingDistSQLNumRunners.SetOnChange(sv, func(ctx context.Context) { + setNewNumWorkers(settingDistSQLNumRunners.Get(sv)) + }) + // We need to set the target pool size based on the current setting + // explicitly since the OnChange callback won't ever be called for the + // initial value - the setting initialization has already been performed + // before we registered the OnChange callback. + setNewNumWorkers(settingDistSQLNumRunners.Get(sv)) + // Spin up the coordinator goroutine. + _ = stopper.RunAsyncTask(ctx, "distsql-runner-coordinator", func(context.Context) { + // Make sure to stop all workers when the coordinator exits. + defer close(stopWorkerChan) + for { + select { + case newNumWorkers := <-c.newDesiredNumWorkers: + for { + numWorkers := atomic.LoadInt64(&c.atomics.numWorkers) + if numWorkers == newNumWorkers { + break + } + if numWorkers < newNumWorkers { + // Need to spin another worker. + err := stopper.RunAsyncTask(ctx, "distsql-runner", worker) + if err != nil { + return + } + atomic.AddInt64(&c.atomics.numWorkers, 1) + } else { + // Need to stop one of the workers. + select { + case stopWorkerChan <- struct{}{}: + atomic.AddInt64(&c.atomics.numWorkers, -1) + case <-stopChan: + return + } + } + } + case <-stopChan: + return + } + } + }) } // To allow for canceling flows via CancelDeadFlows RPC on different nodes -// simultaneously, we use a pool of workers. It is likely that these workers -// will be less busy than SetupFlow runners, so we instantiate smaller number of -// the canceling workers. -const numCancelingWorkers = numRunners / 4 +// simultaneously, we use a pool of workers. +const numCancelingWorkers = 4 func (dsp *DistSQLPlanner) initCancelingWorkers(initCtx context.Context) { dsp.cancelFlowsCoordinator.workerWait = make(chan struct{}, numCancelingWorkers) @@ -300,13 +392,6 @@ func (dsp *DistSQLPlanner) setupFlows( StatementSQL: statementSQL, } - // Start all the flows except the flow on this node (there is always a flow on - // this node). - var resultChan chan runnerResult - if len(flows) > 1 { - resultChan = make(chan runnerResult, len(flows)-1) - } - if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff { // Now we determine whether the vectorized engine supports the flow // specs. @@ -316,38 +401,54 @@ func (dsp *DistSQLPlanner) setupFlows( if vectorizeMode == sessiondatapb.VectorizeExperimentalAlways { return nil, nil, nil, err } - // Vectorization is not supported for this flow, so we override the - // setting. + // Vectorization is not supported for this flow, so we override + // the setting. setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff break } } } - for nodeID, flowSpec := range flows { - if nodeID == thisNodeID { - // Skip this node. - continue - } - req := setupReq - req.Flow = *flowSpec - runReq := runnerRequest{ - ctx: ctx, - nodeDialer: dsp.podNodeDialer, - flowReq: &req, - sqlInstanceID: nodeID, - resultChan: resultChan, - } - // Send out a request to the workers; if no worker is available, run - // directly. - select { - case dsp.runnerChan <- runReq: - default: - runReq.run() + // Start all the flows except the flow on this node (there is always a flow + // on this node). + var resultChan chan runnerResult + if len(flows) > 1 { + resultChan = make(chan runnerResult, len(flows)-1) + for nodeID, flowSpec := range flows { + if nodeID == thisNodeID { + // Skip this node. + continue + } + req := setupReq + req.Flow = *flowSpec + runReq := runnerRequest{ + ctx: ctx, + nodeDialer: dsp.podNodeDialer, + flowReq: &req, + sqlInstanceID: nodeID, + resultChan: resultChan, + } + + // Send out a request to the workers; if no worker is available, run + // directly. + select { + case dsp.runnerCoordinator.runnerChan <- runReq: + default: + runReq.run() + } } } - var firstErr error + // Now set up the flow on this node. + setupReq.Flow = *flows[thisNodeID] + var batchReceiver execinfra.BatchReceiver + if recv.batchWriter != nil { + // Use the DistSQLReceiver as an execinfra.BatchReceiver only if the + // former has the corresponding writer set. + batchReceiver = recv + } + ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) + // Now wait for all the flows to be scheduled on remote nodes. Note that we // are not waiting for the flows themselves to complete. for i := 0; i < len(flows)-1; i++ { @@ -358,21 +459,13 @@ func (dsp *DistSQLPlanner) setupFlows( // TODO(radu): accumulate the flows that we failed to set up and move them // into the local flow. } - if firstErr != nil { - return nil, nil, nil, firstErr - } - - // Set up the flow on this node. - setupReq.Flow = *flows[thisNodeID] - var batchReceiver execinfra.BatchReceiver - if recv.batchWriter != nil { - // Use the DistSQLReceiver as an execinfra.BatchReceiver only if the - // former has the corresponding writer set. - batchReceiver = recv - } - return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) + // Note that we need to return the local flow even if firstErr is non-nil so + // that the local flow is properly cleaned up. + return ctx, flow, opChains, firstErr } +const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" + // Run executes a physical plan. The plan should have been finalized using // FinalizePlan. // diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index a90bf3efff03..efd92302eb17 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -676,3 +676,34 @@ func TestDistSQLReceiverCancelsDeadFlows(t *testing.T) { return nil }) } + +// TestDistSQLRunnerCoordinator verifies that the runnerCoordinator correctly +// reacts to the changes of the corresponding setting. +func TestDistSQLRunnerCoordinator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + runner := &s.ExecutorConfig().(ExecutorConfig).DistSQLPlanner.runnerCoordinator + sqlDB := sqlutils.MakeSQLRunner(db) + + checkNumRunners := func(newNumRunners int64) { + sqlDB.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.distsql.num_runners = %d", newNumRunners)) + testutils.SucceedsSoon(t, func() error { + numWorkers := atomic.LoadInt64(&runner.atomics.numWorkers) + if numWorkers != newNumRunners { + return errors.Newf("%d workers are up, want %d", numWorkers, newNumRunners) + } + return nil + }) + } + + // Lower the setting to 0 and make sure that all runners exit. + checkNumRunners(0) + + // Now bump it up to 100. + checkNumRunners(100) +}