Skip to content

Commit

Permalink
distsql: change the flow setup code a bit
Browse files Browse the repository at this point in the history
Previously, when setting up a distributed plan, we would wait for all
SetupFlow RPCs to come back before setting up the flow on the gateway.
Most likely (in the happy scenario) all those RPCs would be successful,
so we can parallelize the happy path a bit by setting up the local flow
while the RPCs are in-flight which is what this commit does. This seems
especially beneficial given the change in the previous commit to
increase the number of DistSQL runners for beefy machines - we are now
more likely to issue SetupFlow RPCs asynchronously.

Release note: None
  • Loading branch information
yuzefovich committed Jul 22, 2022
1 parent 9490307 commit b8f1343
Showing 1 changed file with 42 additions and 43 deletions.
85 changes: 42 additions & 43 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,6 @@ func (dsp *DistSQLPlanner) setupFlows(
StatementSQL: statementSQL,
}

// Start all the flows except the flow on this node (there is always a flow on
// this node).
var resultChan chan runnerResult
if len(flows) > 1 {
resultChan = make(chan runnerResult, len(flows)-1)
}

if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff {
// Now we determine whether the vectorized engine supports the flow
// specs.
Expand All @@ -389,38 +382,54 @@ func (dsp *DistSQLPlanner) setupFlows(
if vectorizeMode == sessiondatapb.VectorizeExperimentalAlways {
return nil, nil, nil, err
}
// Vectorization is not supported for this flow, so we override the
// setting.
// Vectorization is not supported for this flow, so we override
// the setting.
setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff
break
}
}
}
for nodeID, flowSpec := range flows {
if nodeID == thisNodeID {
// Skip this node.
continue
}
req := setupReq
req.Flow = *flowSpec
runReq := runnerRequest{
ctx: ctx,
nodeDialer: dsp.podNodeDialer,
flowReq: &req,
sqlInstanceID: nodeID,
resultChan: resultChan,
}

// Send out a request to the workers; if no worker is available, run
// directly.
select {
case dsp.runnerCoordinator.runnerChan <- runReq:
default:
runReq.run()
// Start all the flows except the flow on this node (there is always a flow
// on this node).
var resultChan chan runnerResult
if len(flows) > 1 {
resultChan = make(chan runnerResult, len(flows)-1)
for nodeID, flowSpec := range flows {
if nodeID == thisNodeID {
// Skip this node.
continue
}
req := setupReq
req.Flow = *flowSpec
runReq := runnerRequest{
ctx: ctx,
nodeDialer: dsp.podNodeDialer,
flowReq: &req,
sqlInstanceID: nodeID,
resultChan: resultChan,
}

// Send out a request to the workers; if no worker is available, run
// directly.
select {
case dsp.runnerCoordinator.runnerChan <- runReq:
default:
runReq.run()
}
}
}

var firstErr error
// Now set up the flow on this node.
setupReq.Flow = *flows[thisNodeID]
var batchReceiver execinfra.BatchReceiver
if recv.batchWriter != nil {
// Use the DistSQLReceiver as an execinfra.BatchReceiver only if the
// former has the corresponding writer set.
batchReceiver = recv
}
ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState)

// Now wait for all the flows to be scheduled on remote nodes. Note that we
// are not waiting for the flows themselves to complete.
for i := 0; i < len(flows)-1; i++ {
Expand All @@ -431,19 +440,9 @@ func (dsp *DistSQLPlanner) setupFlows(
// TODO(radu): accumulate the flows that we failed to set up and move them
// into the local flow.
}
if firstErr != nil {
return nil, nil, nil, firstErr
}

// Set up the flow on this node.
setupReq.Flow = *flows[thisNodeID]
var batchReceiver execinfra.BatchReceiver
if recv.batchWriter != nil {
// Use the DistSQLReceiver as an execinfra.BatchReceiver only if the
// former has the corresponding writer set.
batchReceiver = recv
}
return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState)
// Note that we need to return the local flow even if firstErr is non-nil so
// that the local flow is properly cleaned up.
return ctx, flow, opChains, firstErr
}

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"
Expand Down

0 comments on commit b8f1343

Please sign in to comment.