diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index b55c1278a771..a59526f89a47 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -373,13 +373,6 @@ func (dsp *DistSQLPlanner) setupFlows( StatementSQL: statementSQL, } - // Start all the flows except the flow on this node (there is always a flow on - // this node). - var resultChan chan runnerResult - if len(flows) > 1 { - resultChan = make(chan runnerResult, len(flows)-1) - } - if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff { // Now we determine whether the vectorized engine supports the flow // specs. @@ -389,38 +382,54 @@ func (dsp *DistSQLPlanner) setupFlows( if vectorizeMode == sessiondatapb.VectorizeExperimentalAlways { return nil, nil, nil, err } - // Vectorization is not supported for this flow, so we override the - // setting. + // Vectorization is not supported for this flow, so we override + // the setting. setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff break } } } - for nodeID, flowSpec := range flows { - if nodeID == thisNodeID { - // Skip this node. - continue - } - req := setupReq - req.Flow = *flowSpec - runReq := runnerRequest{ - ctx: ctx, - nodeDialer: dsp.podNodeDialer, - flowReq: &req, - sqlInstanceID: nodeID, - resultChan: resultChan, - } - // Send out a request to the workers; if no worker is available, run - // directly. - select { - case dsp.runnerCoordinator.runnerChan <- runReq: - default: - runReq.run() + // Start all the flows except the flow on this node (there is always a flow + // on this node). + var resultChan chan runnerResult + if len(flows) > 1 { + resultChan = make(chan runnerResult, len(flows)-1) + for nodeID, flowSpec := range flows { + if nodeID == thisNodeID { + // Skip this node. + continue + } + req := setupReq + req.Flow = *flowSpec + runReq := runnerRequest{ + ctx: ctx, + nodeDialer: dsp.podNodeDialer, + flowReq: &req, + sqlInstanceID: nodeID, + resultChan: resultChan, + } + + // Send out a request to the workers; if no worker is available, run + // directly. + select { + case dsp.runnerCoordinator.runnerChan <- runReq: + default: + runReq.run() + } } } - var firstErr error + // Now set up the flow on this node. + setupReq.Flow = *flows[thisNodeID] + var batchReceiver execinfra.BatchReceiver + if recv.batchWriter != nil { + // Use the DistSQLReceiver as an execinfra.BatchReceiver only if the + // former has the corresponding writer set. + batchReceiver = recv + } + ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) + // Now wait for all the flows to be scheduled on remote nodes. Note that we // are not waiting for the flows themselves to complete. for i := 0; i < len(flows)-1; i++ { @@ -431,19 +440,9 @@ func (dsp *DistSQLPlanner) setupFlows( // TODO(radu): accumulate the flows that we failed to set up and move them // into the local flow. } - if firstErr != nil { - return nil, nil, nil, firstErr - } - - // Set up the flow on this node. - setupReq.Flow = *flows[thisNodeID] - var batchReceiver execinfra.BatchReceiver - if recv.batchWriter != nil { - // Use the DistSQLReceiver as an execinfra.BatchReceiver only if the - // former has the corresponding writer set. - batchReceiver = recv - } - return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) + // Note that we need to return the local flow even if firstErr is non-nil so + // that the local flow is properly cleaned up. + return ctx, flow, opChains, firstErr } const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"