Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: make the number of DistSQL runners dynamic #84946

Merged
merged 2 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ type DistSQLPlanner struct {
distSQLSrv *distsql.ServerImpl
spanResolver physicalplan.SpanResolver

// runnerChan is used to send out requests (for running SetupFlow RPCs) to a
// pool of workers.
runnerChan chan runnerRequest
// runnerCoordinator is used to send out requests (for running SetupFlow
// RPCs) to a pool of workers.
runnerCoordinator runnerCoordinator

// cancelFlowsCoordinator is responsible for batching up the requests to
// cancel remote flows initiated on the behalf of the current node when the
Expand Down Expand Up @@ -211,7 +211,7 @@ func NewDistSQLPlanner(
rpcCtx.Stopper.AddCloser(dsp.parallelLocalScansSem.Closer("stopper"))
}

dsp.initRunners(ctx)
dsp.runnerCoordinator.init(ctx, stopper, &st.SV)
dsp.initCancelingWorkers(ctx)
return dsp
}
Expand Down
227 changes: 160 additions & 67 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"math"
"runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
Expand All @@ -50,18 +52,36 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
)

// To allow queries to send out flow RPCs in parallel, we use a pool of workers
// that can issue the RPCs on behalf of the running code. The pool is shared by
// multiple queries.
const numRunners = 16
var settingDistSQLNumRunners = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.distsql.num_runners",
"determines the number of DistSQL runner goroutines used for issuing SetupFlow RPCs",
// We use GOMAXPROCS instead of NumCPU because the former could be adjusted
// based on cgroup limits (see cgroups.AdjustMaxProcs).
//
// The choice of the default multiple of 4 was made in order to get the
// original value of 16 on machines with 4 CPUs.
4*int64(runtime.GOMAXPROCS(0)), /* defaultValue */
func(v int64) error {
if v < 0 {
return errors.Errorf("cannot be set to a negative value: %d", v)
}
if v > distSQLNumRunnersMax {
return errors.Errorf("cannot be set to a value exceeding %d: %d", distSQLNumRunnersMax, v)
}
return nil
},
)

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"
// Somewhat arbitrary upper bound.
var distSQLNumRunnersMax = 256 * int64(runtime.GOMAXPROCS(0))

// runnerRequest is the request that is sent (via a channel) to a worker.
type runnerRequest struct {
Expand Down Expand Up @@ -101,32 +121,104 @@ func (req runnerRequest) run() {
req.resultChan <- res
}

func (dsp *DistSQLPlanner) initRunners(ctx context.Context) {
type runnerCoordinator struct {
// runnerChan is used by the DistSQLPlanner to send out requests (for
// running SetupFlow RPCs) to a pool of workers.
runnerChan chan runnerRequest
// newDesiredNumWorkers is used to notify the coordinator that the size of
// the pool of workers might have changed.
newDesiredNumWorkers chan int64
atomics struct {
// numWorkers tracks the number of workers running at the moment. This
// needs to be accessed atomically, but only because of the usage in
// tests.
numWorkers int64
}
}

func (c *runnerCoordinator) init(ctx context.Context, stopper *stop.Stopper, sv *settings.Values) {
// This channel has to be unbuffered because we want to only be able to send
// requests if a worker is actually there to receive them.
dsp.runnerChan = make(chan runnerRequest)
for i := 0; i < numRunners; i++ {
_ = dsp.stopper.RunAsyncTask(ctx, "distsql-runner", func(context.Context) {
runnerChan := dsp.runnerChan
stopChan := dsp.stopper.ShouldQuiesce()
for {
select {
case req := <-runnerChan:
req.run()

case <-stopChan:
return
}
c.runnerChan = make(chan runnerRequest)
stopWorkerChan := make(chan struct{})
worker := func(context.Context) {
for {
select {
case req := <-c.runnerChan:
req.run()
case <-stopWorkerChan:
return
}
})
}
}
stopChan := stopper.ShouldQuiesce()
// This is a buffered channel because we will be sending on it from the
// callback when the corresponding setting changes. The buffer size of 1
// should be sufficient, but we use a larger buffer out of caution (in case
// the cluster setting is updated rapidly) - in order to not block the
// goroutine that is updating the settings.
c.newDesiredNumWorkers = make(chan int64, 4)
// setNewNumWorkers sets the new target size of the pool of workers.
setNewNumWorkers := func(newNumWorkers int64) {
select {
case c.newDesiredNumWorkers <- newNumWorkers:
case <-stopChan:
// If the server is quescing, then the new size of the pool doesn't
// matter.
return
}
}
// Whenever the corresponding setting is updated, we need to notify the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you were so inclined, could expand this comment with a note of the fact that initRunners is only called once per server lifetime so this won't leak an unbounded numbers of onchange callbacks, since in general an onChange not in an init() is suspect

// coordinator.
// NB: runnerCoordinator.init is called once per server lifetime so this
// won't leak an unbounded number of OnChange callbacks.
settingDistSQLNumRunners.SetOnChange(sv, func(ctx context.Context) {
setNewNumWorkers(settingDistSQLNumRunners.Get(sv))
})
// We need to set the target pool size based on the current setting
// explicitly since the OnChange callback won't ever be called for the
// initial value - the setting initialization has already been performed
// before we registered the OnChange callback.
setNewNumWorkers(settingDistSQLNumRunners.Get(sv))
// Spin up the coordinator goroutine.
_ = stopper.RunAsyncTask(ctx, "distsql-runner-coordinator", func(context.Context) {
// Make sure to stop all workers when the coordinator exits.
defer close(stopWorkerChan)
for {
select {
case newNumWorkers := <-c.newDesiredNumWorkers:
for {
numWorkers := atomic.LoadInt64(&c.atomics.numWorkers)
if numWorkers == newNumWorkers {
break
}
if numWorkers < newNumWorkers {
// Need to spin another worker.
err := stopper.RunAsyncTask(ctx, "distsql-runner", worker)
if err != nil {
return
}
atomic.AddInt64(&c.atomics.numWorkers, 1)
} else {
// Need to stop one of the workers.
select {
case stopWorkerChan <- struct{}{}:
atomic.AddInt64(&c.atomics.numWorkers, -1)
case <-stopChan:
return
}
}
}
case <-stopChan:
return
}
}
})
}

// To allow for canceling flows via CancelDeadFlows RPC on different nodes
// simultaneously, we use a pool of workers. It is likely that these workers
// will be less busy than SetupFlow runners, so we instantiate smaller number of
// the canceling workers.
const numCancelingWorkers = numRunners / 4
// simultaneously, we use a pool of workers.
const numCancelingWorkers = 4

func (dsp *DistSQLPlanner) initCancelingWorkers(initCtx context.Context) {
dsp.cancelFlowsCoordinator.workerWait = make(chan struct{}, numCancelingWorkers)
Expand Down Expand Up @@ -300,13 +392,6 @@ func (dsp *DistSQLPlanner) setupFlows(
StatementSQL: statementSQL,
}

// Start all the flows except the flow on this node (there is always a flow on
// this node).
var resultChan chan runnerResult
if len(flows) > 1 {
resultChan = make(chan runnerResult, len(flows)-1)
}

if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff {
// Now we determine whether the vectorized engine supports the flow
// specs.
Expand All @@ -316,38 +401,54 @@ func (dsp *DistSQLPlanner) setupFlows(
if vectorizeMode == sessiondatapb.VectorizeExperimentalAlways {
return nil, nil, nil, err
}
// Vectorization is not supported for this flow, so we override the
// setting.
// Vectorization is not supported for this flow, so we override
// the setting.
setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff
break
}
}
}
for nodeID, flowSpec := range flows {
if nodeID == thisNodeID {
// Skip this node.
continue
}
req := setupReq
req.Flow = *flowSpec
runReq := runnerRequest{
ctx: ctx,
nodeDialer: dsp.podNodeDialer,
flowReq: &req,
sqlInstanceID: nodeID,
resultChan: resultChan,
}

// Send out a request to the workers; if no worker is available, run
// directly.
select {
case dsp.runnerChan <- runReq:
default:
runReq.run()
// Start all the flows except the flow on this node (there is always a flow
// on this node).
var resultChan chan runnerResult
if len(flows) > 1 {
resultChan = make(chan runnerResult, len(flows)-1)
for nodeID, flowSpec := range flows {
if nodeID == thisNodeID {
// Skip this node.
continue
}
req := setupReq
req.Flow = *flowSpec
runReq := runnerRequest{
ctx: ctx,
nodeDialer: dsp.podNodeDialer,
flowReq: &req,
sqlInstanceID: nodeID,
resultChan: resultChan,
}

// Send out a request to the workers; if no worker is available, run
// directly.
select {
case dsp.runnerCoordinator.runnerChan <- runReq:
default:
runReq.run()
}
}
}

var firstErr error
// Now set up the flow on this node.
setupReq.Flow = *flows[thisNodeID]
var batchReceiver execinfra.BatchReceiver
if recv.batchWriter != nil {
// Use the DistSQLReceiver as an execinfra.BatchReceiver only if the
// former has the corresponding writer set.
batchReceiver = recv
}
ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState)

// Now wait for all the flows to be scheduled on remote nodes. Note that we
// are not waiting for the flows themselves to complete.
for i := 0; i < len(flows)-1; i++ {
Expand All @@ -358,21 +459,13 @@ func (dsp *DistSQLPlanner) setupFlows(
// TODO(radu): accumulate the flows that we failed to set up and move them
// into the local flow.
}
if firstErr != nil {
return nil, nil, nil, firstErr
}

// Set up the flow on this node.
setupReq.Flow = *flows[thisNodeID]
var batchReceiver execinfra.BatchReceiver
if recv.batchWriter != nil {
// Use the DistSQLReceiver as an execinfra.BatchReceiver only if the
// former has the corresponding writer set.
batchReceiver = recv
}
return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState)
// Note that we need to return the local flow even if firstErr is non-nil so
// that the local flow is properly cleaned up.
return ctx, flow, opChains, firstErr
}

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"

// Run executes a physical plan. The plan should have been finalized using
// FinalizePlan.
//
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,34 @@ func TestDistSQLReceiverCancelsDeadFlows(t *testing.T) {
return nil
})
}

// TestDistSQLRunnerCoordinator verifies that the runnerCoordinator correctly
// reacts to the changes of the corresponding setting.
func TestDistSQLRunnerCoordinator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

runner := &s.ExecutorConfig().(ExecutorConfig).DistSQLPlanner.runnerCoordinator
sqlDB := sqlutils.MakeSQLRunner(db)

checkNumRunners := func(newNumRunners int64) {
sqlDB.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.distsql.num_runners = %d", newNumRunners))
testutils.SucceedsSoon(t, func() error {
numWorkers := atomic.LoadInt64(&runner.atomics.numWorkers)
if numWorkers != newNumRunners {
return errors.Newf("%d workers are up, want %d", numWorkers, newNumRunners)
}
return nil
})
}

// Lower the setting to 0 and make sure that all runners exit.
checkNumRunners(0)

// Now bump it up to 100.
checkNumRunners(100)
}