From 0c1095e31cf93ea7f177f8bf1750ebab188e02d7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 4 Nov 2022 17:36:19 -0700 Subject: [PATCH] sql: do not wait for setup of remote flows on the gateway Previously, when setting up the flows for a distributed plan we would issue all SetupFlow RPCs to the remote nodes and wait for all of them to come back before proceeding with the execution of the local flow. This can introduce an execution stall, especially in multi-region setups, if remote flows depend on the local flow for some of the data. This was suboptimal, and this commit makes it so that we no longer wait for the RPCs to come back and start executing the local flow right away. We now spin up a separate goroutine that waits for the RPCs to come back, and if an error is encountered (which shouldn't happen often), then that goroutine sets the error on the DistSQLReceiver and cancels the local flow. Setting the error on the receiver, in turn, will make it so that all remote flows will be canceled via CancelDeadFlows RPC (which might be faster than via the distributed query shutdown triggered when the local flow is canceled). An additional change is that we now perform the setup of the local flow on the gateway first, and only if that is successful, we proceed to the setup of the remote flows. This acts as a sanity check on the validity of the flow spec and should make it less likely that the remote flows setup fails. This required some changes around the error handling of the DistSQLReceiver to make it concurrency safe. One option there was to make `SetError` and `Err` methods of `rowResultWriter` interface concurrency safe, but there are several implementations, and I decided to make the adjustment to the DistSQLReceiver itself since this concurrency safety is only needed there, and it seems somewhat wrong to impose the requirement on all of the implementations. Additionally, in order to avoid locking the mutex as much as possible, the `status` of the receiver is not protected by the mutex. This is achieved by the new goroutine not updating the status and, instead, letting the main goroutine "resolve" the status the next time a meta object is pushed. The idea is that the cancellation of the local flow shuts down the local flow's execution making it encounter an error which is then propagated as metadata. Thus, this "status resolution" should happen fairly quickly, and this setup allows us to avoid the locking in most scenarios when pushing rows and batches. Further changes were needed around `saveFlows` function as well as releasing flow specs back to the pool. The former had to be moved to be done sooner (right after setting up the local flow), and for the latter we had to move the release of the flow specs for the remote flows to right after the corresponding SetupFlow RPCs are issued. This ordering ensures that the flow specs are always released, but after all of their uses (both `saveFlows` and the RPCs use them). Yet another complication was around the concurrency between `Flow.Cleanup` being called and the new goroutine receiving an error from the RPC. At the very end of `Flow.Cleanup` the flow object is released, so the new goroutine cannot call `Flow.Cancel` after that. Additionally, since the mutex protection of the `rowResultWriter` by the DistSQLReceiver lasts only until `DistSQLPlanner.Run` returns (which is right after `Flow.Cleanup` returns), the new goroutine will attempt to set the error only if the cleanup hasn't been performed. This is achieved by having a mutex-protected boolean, and the boolean is only introduced if the new goroutine is spun up. Overall, as this whole comment suggests, it has been quite tricky to get things right (I hope I did), so one might wonder what simplifications, if any, could be made. I considered and (mostly) rejected several: - Ignoring the particular error that was returned by the RPCs and just canceling the local flow. This would allow us to remove the complexity of the concurrency safety with the error handling of the DistSQLReceiver. We would still properly shutdown the whole distributed plan. However, the downside is that we would lose the true reason for the shutdown - most likely we would return "context canceled" to the client instead of the actual error. On the other hand, the errors from the RPCs should be fairly rare that it might be worth giving this more thought. - Not canceling the local flow since just setting the error on the receiver would be sufficient for the query to eventually shutdown. The obvious downside is that we might do more work after having received an error from the RPC, and there is little upside I can see. - But the most interesting simplification would be to just not create the new goroutine in the first place. The idea is that if any of the SetupFlow RPCs fail, then the flows on other nodes would hit the "no inbound stream timeout" error (after 10s - by default - of not being connected to) which would start the shutdown of the whole plan. This idea would eliminate effectively all complexity of this commit which seems quite appealing. The downside here would be the combination of the downsides of the ideas above - namely, now the query would result in this opaque "no inbound stream" error (and we've been struggling with diagnosing those, so I'm not eager to introduce another way it could occur), and we would be doing wasteful work during this timeout window. Release note (performance improvement): The setup of the distributed query execution is now fully parallelized which should reduce the query latencies, especially in multi-region setups. --- pkg/sql/colflow/vectorized_flow.go | 5 - pkg/sql/distsql/server.go | 13 +- pkg/sql/distsql_running.go | 384 +++++++++++++++++--------- pkg/sql/distsql_running_test.go | 106 +++++++ pkg/sql/execinfra/server_config.go | 5 + pkg/sql/flowinfra/flow.go | 30 +- pkg/sql/physicalplan/physical_plan.go | 4 +- pkg/sql/physicalplan/specs.go | 4 +- pkg/sql/rowflow/row_based_flow.go | 5 - 9 files changed, 400 insertions(+), 156 deletions(-) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 6728a76dc5b1..3a8e7dc0ed21 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -349,11 +349,6 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string { return f.tempStorage.path } -// IsVectorized is part of the flowinfra.Flow interface. -func (f *vectorizedFlow) IsVectorized() bool { - return true -} - // ConcurrentTxnUse is part of the flowinfra.Flow interface. It is conservative // in that it returns that there is concurrent txn use as soon as any operator // concurrency is detected. This should be inconsequential for local flows that diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 278c8ab9caf3..690fa05f046f 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -414,7 +414,7 @@ func (ds *ServerImpl) setupFlow( ctx = flowCtx.AmbientContext.AnnotateCtx(ctx) telemetry.Inc(sqltelemetry.DistSQLExecCounter) } - if f.IsVectorized() { + if isVectorized { telemetry.Inc(sqltelemetry.VecExecCounter) } @@ -566,14 +566,10 @@ func (ds *ServerImpl) SetupLocalSyncFlow( batchOutput execinfra.BatchReceiver, localState LocalState, ) (context.Context, flowinfra.Flow, execopnode.OpChains, error) { - ctx, f, opChains, err := ds.setupFlow( + return ds.setupFlow( ctx, tracing.SpanFromContext(ctx), parentMonitor, &mon.BoundAccount{}, /* reserved */ req, output, batchOutput, localState, ) - if err != nil { - return nil, nil, nil, err - } - return ctx, f, opChains, err } // setupSpanForIncomingRPC creates a span for a SetupFlow RPC. The caller must @@ -618,6 +614,11 @@ func (ds *ServerImpl) SetupFlow( ctx context.Context, req *execinfrapb.SetupFlowRequest, ) (*execinfrapb.SimpleResponse, error) { log.VEventf(ctx, 1, "received SetupFlow request from n%v for flow %v", req.Flow.Gateway, req.Flow.FlowID) + if cb := ds.TestingKnobs.SetupFlowCb; cb != nil { + if err := cb(ds.ServerConfig.NodeID.SQLInstanceID(), req); err != nil { + return &execinfrapb.SimpleResponse{Error: execinfrapb.NewError(ctx, err)}, nil + } + } _, rpcSpan := ds.setupSpanForIncomingRPC(ctx, req) defer rpcSpan.Finish() diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index d5def275ec16..8ba42cfb5b8b 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -99,7 +98,10 @@ type runnerResult struct { err error } -func (req runnerRequest) run() { +// run executes the request. An error, if encountered, is both sent on the +// result channel and returned. +func (req runnerRequest) run() error { + defer physicalplan.ReleaseFlowSpec(&req.flowReq.Flow) res := runnerResult{nodeID: req.sqlInstanceID} conn, err := req.podNodeDialer.Dial(req.ctx, roachpb.NodeID(req.sqlInstanceID), rpc.DefaultClass) @@ -108,9 +110,6 @@ func (req runnerRequest) run() { } else { client := execinfrapb.NewDistSQLClient(conn) // TODO(radu): do we want a timeout here? - if sp := tracing.SpanFromContext(req.ctx); sp != nil && !sp.IsNoop() { - req.flowReq.TraceInfo = sp.Meta().ToProto() - } resp, err := client.SetupFlow(req.ctx, req.flowReq) if err != nil { res.err = err @@ -119,6 +118,7 @@ func (req runnerRequest) run() { } } req.resultChan <- res + return res.err } type runnerCoordinator struct { @@ -145,7 +145,7 @@ func (c *runnerCoordinator) init(ctx context.Context, stopper *stop.Stopper, sv for { select { case req := <-c.runnerChan: - req.run() + _ = req.run() case <-stopWorkerChan: return } @@ -360,27 +360,35 @@ func (c *cancelFlowsCoordinator) addFlowsToCancel( } // setupFlows sets up all the flows specified in flows using the provided state. -// It will first attempt to set up all remote flows using the dsp workers if -// available or sequentially if not, and then finally set up the gateway flow, -// whose output is the DistSQLReceiver provided. This flow is then returned to -// be run. +// It will first attempt to set up the gateway flow (whose output is the +// DistSQLReceiver provided) and - if successful - will proceed to setting up +// the remote flows using the dsp workers if available or sequentially if not. +// +// The gateway flow is returned to be Run(). It is the caller's responsibility +// to clean up that flow if a non-nil value is returned. +// +// The method doesn't wait for the setup of remote flows (if any) to return and, +// instead, optimistically proceeds once the corresponding SetupFlow RPCs are +// issued. A separate goroutine is spun up to listen for the RPCs to come back, +// and if the setup of a remote flow fails, then that goroutine updates the +// DistSQLReceiver with the error and cancels the gateway flow. func (dsp *DistSQLPlanner) setupFlows( ctx context.Context, evalCtx *extendedEvalContext, + planCtx *PlanningCtx, leafInputState *roachpb.LeafTxnInputState, flows map[base.SQLInstanceID]*execinfrapb.FlowSpec, recv *DistSQLReceiver, localState distsql.LocalState, - collectStats bool, statementSQL string, -) (context.Context, flowinfra.Flow, execopnode.OpChains, error) { +) (context.Context, flowinfra.Flow, error) { thisNodeID := dsp.gatewaySQLInstanceID _, ok := flows[thisNodeID] if !ok { - return nil, nil, nil, errors.AssertionFailedf("missing gateway flow") + return nil, nil, errors.AssertionFailedf("missing gateway flow") } if localState.IsLocal && len(flows) != 1 { - return nil, nil, nil, errors.AssertionFailedf("IsLocal set but there's multiple flows") + return nil, nil, errors.AssertionFailedf("IsLocal set but there's multiple flows") } const setupFlowRequestStmtMaxLength = 500 @@ -392,80 +400,178 @@ func (dsp *DistSQLPlanner) setupFlows( Version: execinfra.Version, EvalContext: execinfrapb.MakeEvalContext(&evalCtx.Context), TraceKV: evalCtx.Tracing.KVTracingEnabled(), - CollectStats: collectStats, + CollectStats: planCtx.collectExecStats, StatementSQL: statementSQL, } + var isVectorized bool if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff { // Now we determine whether the vectorized engine supports the flow // specs. + isVectorized = true for _, spec := range flows { if err := colflow.IsSupported(vectorizeMode, spec); err != nil { log.VEventf(ctx, 2, "failed to vectorize: %s", err) if vectorizeMode == sessiondatapb.VectorizeExperimentalAlways { - return nil, nil, nil, err + return nil, nil, err } // Vectorization is not supported for this flow, so we override // the setting. setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff + isVectorized = false break } } } - - // Start all the flows except the flow on this node (there is always a flow - // on this node). - var resultChan chan runnerResult - if len(flows) > 1 { - resultChan = make(chan runnerResult, len(flows)-1) - for nodeID, flowSpec := range flows { - if nodeID == thisNodeID { - // Skip this node. - continue - } - req := setupReq - req.Flow = *flowSpec - runReq := runnerRequest{ - ctx: ctx, - podNodeDialer: dsp.podNodeDialer, - flowReq: &req, - sqlInstanceID: nodeID, - resultChan: resultChan, - } - - // Send out a request to the workers; if no worker is available, run - // directly. - select { - case dsp.runnerCoordinator.runnerChan <- runReq: - default: - runReq.run() - } - } + if planCtx.planner != nil && isVectorized { + planCtx.planner.curPlan.flags.Set(planFlagVectorized) } - // Now set up the flow on this node. + // First, set up the flow on this node. setupReq.Flow = *flows[thisNodeID] var batchReceiver execinfra.BatchReceiver - if recv.batchWriter != nil { + if recv.resultWriterMu.batch != nil { // Use the DistSQLReceiver as an execinfra.BatchReceiver only if the // former has the corresponding writer set. batchReceiver = recv } - ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Planner.Mon(), &setupReq, recv, batchReceiver, localState) + origCtx := ctx + ctx, flow, opChains, err := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Planner.Mon(), &setupReq, recv, batchReceiver, localState) + if err == nil && planCtx.saveFlows != nil { + err = planCtx.saveFlows(flows, opChains) + } + if len(flows) == 1 || err != nil { + // If there are no remote flows, or we fail to set up the local flow, we + // can just short-circuit. + // + // Note that we need to return the local flow even if err is non-nil so + // that the local flow is properly cleaned up. + return ctx, flow, err + } + + // Start all the remote flows. + // + // usedWorker indicates whether we used at least one DistSQL worker + // goroutine to issue the SetupFlow RPC. + var usedWorker bool + if sp := tracing.SpanFromContext(origCtx); sp != nil && !sp.IsNoop() { + setupReq.TraceInfo = sp.Meta().ToProto() + } + resultChan := make(chan runnerResult, len(flows)-1) + // We use a separate context for the runnerRequests so that - in case they + // are issued concurrently and some RPCs are actually run after the current + // goroutine performs Flow.Cleanup() on the local flow - DistSQL workers + // don't attempt to reuse already finished tracing span from the local flow + // context. For the same reason we can't use origCtx (parent of the local + // flow context). + // + // In particular, consider the following scenario: + // - a runnerRequest is handled by the DistSQL worker; + // - in runnerRequest.run() the worker dials the remote node and issues the + // SetupFlow RPC. However, the client-side goroutine of that RPC is not + // yet scheduled on the local node; + // - the local flow runs to completion canceling the context and finishing + // the tracing span; + // - now the client-side goroutine of the RPC is scheduled, and it attempts + // to use the span from the context, but it has already been finished. + // + // We still want to be able to cancel the RPCs when either the local flow + // finishes or the local node is quiescing, so we derive a separate context + // with the cancellation ability. + runnerCtx, cancelRunnerCtx := dsp.stopper.WithCancelOnQuiesce(context.Background()) + for nodeID, flowSpec := range flows { + if nodeID == thisNodeID { + // Skip this node. + continue + } + req := setupReq + req.Flow = *flowSpec + runReq := runnerRequest{ + ctx: runnerCtx, + podNodeDialer: dsp.podNodeDialer, + flowReq: &req, + sqlInstanceID: nodeID, + resultChan: resultChan, + } - // Now wait for all the flows to be scheduled on remote nodes. Note that we - // are not waiting for the flows themselves to complete. - for i := 0; i < len(flows)-1; i++ { - res := <-resultChan - if firstErr == nil { - firstErr = res.err + // Send out a request to the workers; if no worker is available, run + // directly. + select { + case dsp.runnerCoordinator.runnerChan <- runReq: + usedWorker = true + default: + // We can just use the "parent" context since we're executing the + // request in a blocking fashion in the current goroutine. This + // allows for the cancellation of the "parent" context be noticed + // sooner. + runReq.ctx = origCtx + if err = runReq.run(); err != nil { + return ctx, flow, err + } } - // TODO(radu): accumulate the flows that we failed to set up and move them - // into the local flow. } - // Note that we need to return the local flow even if firstErr is non-nil so - // that the local flow is properly cleaned up. - return ctx, flow, opChains, firstErr + + if !usedWorker { + // We executed all SetupFlow RPCs in the current goroutine, and all RPCs + // succeeded. + return ctx, flow, nil + } + + // Some of the SetupFlow RPCs were executed concurrently, and at the moment + // it's not clear whether the setup of the remote flows is successful, but + // in order to not introduce an execution stall, we will proceed to run the + // local flow assuming that all RPCs are successful. However, in case the + // setup of a remote flow fails, we want to eagerly cancel all the flows, + // and we do so in a separate goroutine. + // + // We need to synchronize the new goroutine with flow.Cleanup() being called + // for two reasons: + // - flow.Cleanup() is the last thing before DistSQLPlanner.Run returns at + // which point the rowResultWriter is no longer protected by the mutex of + // the DistSQLReceiver + // - flow.Cancel can only be called before flow.Cleanup. + cleanupCalledMu := struct { + syncutil.Mutex + called bool + }{} + flow.AddOnCleanup(func() { + cleanupCalledMu.Lock() + defer cleanupCalledMu.Unlock() + cleanupCalledMu.called = true + // Cancel any outstanding RPCs while holding the lock to protect from + // the context canceled error (the result of the RPC) being set on the + // DistSQLReceiver by the listener goroutine below. + cancelRunnerCtx() + }) + _ = dsp.stopper.RunAsyncTask(origCtx, "distsql-remote-flows-setup-listener", func(ctx context.Context) { + for i := 0; i < len(flows)-1; i++ { + res := <-resultChan + if res.err != nil { + // The setup of at least one remote flow failed. + cleanupCalledMu.Lock() + defer cleanupCalledMu.Unlock() + if cleanupCalledMu.called { + // We no longer care about the error nor do we need to + // cancel the flow. + return + } + // First, we update the DistSQL receiver with the error to be + // returned to the client eventually. + // + // In order to not protect DistSQLReceiver.status with a mutex, + // we do not update the status here and, instead, rely on the + // DistSQLReceiver detecting the error the next time a meta is + // pushed into it. + recv.setErrorWithoutStatusUpdate(res.err) + // Now explicitly cancel the local flow. + flow.Cancel() + // resultChan is buffered, so we can just ignore the remaining + // results of the RPCs. + return + } + } + }) + return ctx, flow, nil } const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" @@ -496,15 +602,15 @@ func (dsp *DistSQLPlanner) Run( finishedSetupFn func(), ) { flows := plan.GenerateFlowSpecs() - defer func() { - for _, flowSpec := range flows { - physicalplan.ReleaseFlowSpec(flowSpec) - } - }() - if _, ok := flows[dsp.gatewaySQLInstanceID]; !ok { + gatewayFlowSpec, ok := flows[dsp.gatewaySQLInstanceID] + if !ok { recv.SetError(errors.Errorf("expected to find gateway flow")) return } + // Specs of the remote flows are released after performing the corresponding + // SetupFlow RPCs. This is needed in case the local flow is canceled before + // the SetupFlow RPCs are issued (which might happen in parallel). + defer physicalplan.ReleaseFlowSpec(gatewayFlowSpec) var ( localState distsql.LocalState @@ -621,7 +727,7 @@ func (dsp *DistSQLPlanner) Run( localState.IsLocal = true } else { defer func() { - if recv.resultWriter.Err() != nil { + if recv.getError() != nil { // The execution of this query encountered some error, so we // will eagerly cancel all flows running on the remote nodes // because they are now dead. @@ -638,8 +744,8 @@ func (dsp *DistSQLPlanner) Run( if planCtx.planner != nil { statementSQL = planCtx.planner.stmt.StmtNoConstants } - ctx, flow, opChains, err := dsp.setupFlows( - ctx, evalCtx, leafInputState, flows, recv, localState, planCtx.collectExecStats, statementSQL, + ctx, flow, err := dsp.setupFlows( + ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, ) // Make sure that the local flow is always cleaned up if it was created. if flow != nil { @@ -656,17 +762,6 @@ func (dsp *DistSQLPlanner) Run( finishedSetupFn() } - if planCtx.planner != nil && flow.IsVectorized() { - planCtx.planner.curPlan.flags.Set(planFlagVectorized) - } - - if planCtx.saveFlows != nil { - if err := planCtx.saveFlows(flows, opChains); err != nil { - recv.SetError(err) - return - } - } - // Check that flows that were forced to be planned locally and didn't need // to have concurrency don't actually have it. // @@ -692,11 +787,15 @@ func (dsp *DistSQLPlanner) Run( type DistSQLReceiver struct { ctx context.Context - // These two interfaces refer to the same object, but batchWriter might be - // unset (resultWriter is always set). These are used to send the results - // to. - resultWriter rowResultWriter - batchWriter batchResultWriter + resultWriterMu struct { + // Mutex only protects SetError() and Err() methods of the + // rowResultWriter. + syncutil.Mutex + // These two interfaces refer to the same object, but batch might be + // unset (row is always set). These are used to send the results to. + row rowResultWriter + batch batchResultWriter + } stmtType tree.StatementReturnType @@ -969,8 +1068,6 @@ func MakeDistSQLReceiver( *r = DistSQLReceiver{ ctx: consumeCtx, cleanup: cleanup, - resultWriter: resultWriter, - batchWriter: batchWriter, rangeCache: rangeCache, txn: txn, clockUpdater: clockUpdater, @@ -979,6 +1076,8 @@ func MakeDistSQLReceiver( tracing: tracing, contentionRegistry: contentionRegistry, } + r.resultWriterMu.row = resultWriter + r.resultWriterMu.batch = batchWriter return r } @@ -1007,13 +1106,50 @@ func (r *DistSQLReceiver) clone() *DistSQLReceiver { return ret } -// SetError provides a convenient way for a client to pass in an error, thus -// pretending that a query execution error happened. The error is passed along -// to the resultWriter. +// getError returns the error stored in the rowResultWriter (if any). +func (r *DistSQLReceiver) getError() error { + r.resultWriterMu.Lock() + defer r.resultWriterMu.Unlock() + return r.resultWriterMu.row.Err() +} + +// setErrorWithoutStatusUpdate sets the error in the rowResultWriter but does +// **not** update the status of the DistSQLReceiver. The status will be updated +// the next time a meta is pushed into the receiver (by the goroutine that is +// pushing). // -// The status of DistSQLReceiver is updated accordingly. -func (r *DistSQLReceiver) SetError(err error) { - r.resultWriter.SetError(err) +// NOTE: consider using SetError() instead. +func (r *DistSQLReceiver) setErrorWithoutStatusUpdate(err error) { + r.resultWriterMu.Lock() + defer r.resultWriterMu.Unlock() + // Check if the error we just received should take precedence over a + // previous error (if any). + if roachpb.ErrPriority(err) > roachpb.ErrPriority(r.resultWriterMu.row.Err()) { + if r.txn != nil { + if retryErr := (*roachpb.UnhandledRetryableError)(nil); errors.As(err, &retryErr) { + // Update the txn in response to remote errors. In the + // non-DistSQL world, the TxnCoordSender handles "unhandled" + // retryable errors, but this one is coming from a distributed + // SQL node, which has left the handling up to the root + // transaction. + err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr) + // Update the clock with information from the error. On + // non-DistSQL code paths, the DistSender does this. + // TODO(andrei): We don't propagate clock signals on success + // cases through DistSQL; we should. We also don't propagate + // them through non-retryable errors; we also should. + if r.clockUpdater != nil { + r.clockUpdater.Update(retryErr.PErr.Now) + } + } + } + r.resultWriterMu.row.SetError(err) + } +} + +// updateStatusAfterError updates the status of the DistSQLReceiver after it +// has received an error. +func (r *DistSQLReceiver) updateStatusAfterError(err error) { // If we encountered an error, we will transition to draining unless we were // canceled. if r.ctx.Err() != nil { @@ -1025,10 +1161,27 @@ func (r *DistSQLReceiver) SetError(err error) { } } +// SetError provides a convenient way for a client to pass in an error, thus +// pretending that a query execution error happened. The error is passed along +// to the resultWriter. +// +// The status of DistSQLReceiver is updated accordingly. +func (r *DistSQLReceiver) SetError(err error) { + r.setErrorWithoutStatusUpdate(err) + r.updateStatusAfterError(err) +} + // pushMeta takes in non-empty metadata object and pushes it to the result // writer. Possibly updated status is returned. func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus { - if metaWriter, ok := r.resultWriter.(MetadataResultWriter); ok { + // Check whether an error has been set by another goroutine without updating + // the status. + if r.status == execinfra.NeedMoreRows { + if previousErr := r.getError(); previousErr != nil { + r.updateStatusAfterError(previousErr) + } + } + if metaWriter, ok := r.resultWriterMu.row.(MetadataResultWriter); ok { metaWriter.AddMeta(r.ctx, meta) } if meta.LeafTxnFinalState != nil { @@ -1044,28 +1197,7 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra } } if meta.Err != nil { - // Check if the error we just received should take precedence over a - // previous error (if any). - if roachpb.ErrPriority(meta.Err) > roachpb.ErrPriority(r.resultWriter.Err()) { - if r.txn != nil { - if retryErr := (*roachpb.UnhandledRetryableError)(nil); errors.As(meta.Err, &retryErr) { - // Update the txn in response to remote errors. In the non-DistSQL - // world, the TxnCoordSender handles "unhandled" retryable errors, - // but this one is coming from a distributed SQL node, which has - // left the handling up to the root transaction. - meta.Err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr) - // Update the clock with information from the error. On non-DistSQL - // code paths, the DistSender does this. - // TODO(andrei): We don't propagate clock signals on success cases - // through DistSQL; we should. We also don't propagate them through - // non-retryable errors; we also should. - if r.clockUpdater != nil { - r.clockUpdater.Update(retryErr.PErr.Now) - } - } - } - r.SetError(meta.Err) - } + r.SetError(meta.Err) } if len(meta.Ranges) > 0 { r.rangeCache.Insert(r.ctx, meta.Ranges...) @@ -1160,7 +1292,7 @@ func (r *DistSQLReceiver) Push( if meta != nil { return r.pushMeta(meta) } - if r.resultWriter.Err() == nil && r.ctx.Err() != nil { + if r.ctx.Err() != nil && r.getError() == nil { r.SetError(r.ctx.Err()) } if r.status != execinfra.NeedMoreRows { @@ -1172,7 +1304,7 @@ func (r *DistSQLReceiver) Push( // We only need the row count. planNodeToRowSource is set up to handle // ensuring that the last stage in the pipeline will return a single-column // row with the row count in it, so just grab that and exit. - r.resultWriter.IncrementRowsAffected(r.ctx, n) + r.resultWriterMu.row.IncrementRowsAffected(r.ctx, n) return r.status } @@ -1201,7 +1333,7 @@ func (r *DistSQLReceiver) Push( } } r.tracing.TraceExecRowsResult(r.ctx, r.row) - if commErr := r.resultWriter.AddRow(r.ctx, r.row); commErr != nil { + if commErr := r.resultWriterMu.row.AddRow(r.ctx, r.row); commErr != nil { r.handleCommErr(commErr) } return r.status @@ -1214,7 +1346,7 @@ func (r *DistSQLReceiver) PushBatch( if meta != nil { return r.pushMeta(meta) } - if r.resultWriter.Err() == nil && r.ctx.Err() != nil { + if r.ctx.Err() != nil && r.getError() == nil { r.SetError(r.ctx.Err()) } if r.status != execinfra.NeedMoreRows { @@ -1230,7 +1362,7 @@ func (r *DistSQLReceiver) PushBatch( // We only need the row count. planNodeToRowSource is set up to handle // ensuring that the last stage in the pipeline will return a single-column // row with the row count in it, so just grab that and exit. - r.resultWriter.IncrementRowsAffected(r.ctx, int(batch.ColVec(0).Int64()[0])) + r.resultWriterMu.row.IncrementRowsAffected(r.ctx, int(batch.ColVec(0).Int64()[0])) return r.status } @@ -1245,7 +1377,7 @@ func (r *DistSQLReceiver) PushBatch( panic("unsupported exists mode for PushBatch") } r.tracing.TraceExecBatchResult(r.ctx, batch) - if commErr := r.batchWriter.AddBatch(r.ctx, batch); commErr != nil { + if commErr := r.resultWriterMu.batch.AddBatch(r.ctx, batch); commErr != nil { r.handleCommErr(commErr) } return r.status @@ -1299,7 +1431,7 @@ func (dsp *DistSQLPlanner) PlanAndRunAll( dsp.PlanAndRun( ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv, ) - if recv.commErr != nil || recv.resultWriter.Err() != nil { + if recv.commErr != nil || recv.getError() != nil { return recv.commErr } @@ -1419,7 +1551,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( // TODO(yuzefovich): consider implementing batch receiving result writer. subqueryRowReceiver := NewRowResultWriter(&rows) - subqueryRecv.resultWriter = subqueryRowReceiver + subqueryRecv.resultWriterMu.row = subqueryRowReceiver subqueryPlans[planIdx].started = true dsp.Run(ctx, subqueryPlanCtx, planner.txn, subqueryPhysPlan, subqueryRecv, evalCtx, nil /* finishedSetupFn */) if err := subqueryRowReceiver.Err(); err != nil { @@ -1740,8 +1872,8 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( // TODO(yuzefovich): at the moment, errOnlyResultWriter is sufficient here, // but it may not be the case when we support cascades through the optimizer. postqueryResultWriter := &errOnlyResultWriter{} - postqueryRecv.resultWriter = postqueryResultWriter - postqueryRecv.batchWriter = postqueryResultWriter + postqueryRecv.resultWriterMu.row = postqueryResultWriter + postqueryRecv.resultWriterMu.batch = postqueryResultWriter dsp.Run(ctx, postqueryPlanCtx, planner.txn, postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */) - return postqueryRecv.resultWriter.Err() + return postqueryRecv.getError() } diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 497bb3ae65aa..2ee21dafec50 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -582,3 +582,109 @@ func TestDistSQLRunnerCoordinator(t *testing.T) { // Now bump it up to 100. checkNumRunners(100) } + +// TestSetupFlowRPCError verifies that the distributed query plan errors out and +// cleans up all flows if the SetupFlow RPC fails for one of the remote nodes. +// It also checks that the expected error is returned. +func TestSetupFlowRPCError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Start a 3 node cluster where we can inject an error for SetupFlow RPC on + // the server side for the queries in question. + const numNodes = 3 + ctx := context.Background() + getError := func(nodeID base.SQLInstanceID) error { + return errors.Newf("injected error on n%d", nodeID) + } + // We use different queries to simplify handling the node ID on which the + // error should be injected (i.e. we avoid the need for synchronization in + // the test). In particular, the difficulty comes from the fact that some of + // the SetupFlow RPCs might not be issued at all while others are served + // after the corresponding flow on the gateway has exited. + queries := []string{ + "SELECT k FROM test.foo", + "SELECT v FROM test.foo", + "SELECT * FROM test.foo", + } + stmtToNodeIDForError := map[string]base.SQLInstanceID{ + queries[0]: 2, // error on n2 + queries[1]: 3, // error on n3 + queries[2]: 0, // no error + } + tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + SetupFlowCb: func(nodeID base.SQLInstanceID, req *execinfrapb.SetupFlowRequest) error { + nodeIDForError, ok := stmtToNodeIDForError[req.StatementSQL] + if !ok || nodeIDForError != nodeID { + return nil + } + return getError(nodeID) + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Create a table with 30 rows, split them into 3 ranges with each node + // having one. + db := tc.ServerConn(0) + sqlDB := sqlutils.MakeSQLRunner(db) + sqlutils.CreateTable( + t, db, "foo", + "k INT PRIMARY KEY, v INT", + 30, + sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(2)), + ) + sqlDB.Exec(t, "ALTER TABLE test.foo SPLIT AT VALUES (10), (20)") + sqlDB.Exec( + t, + fmt.Sprintf("ALTER TABLE test.foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], 0), (ARRAY[%d], 10), (ARRAY[%d], 20)", + tc.Server(0).GetFirstStoreID(), + tc.Server(1).GetFirstStoreID(), + tc.Server(2).GetFirstStoreID(), + ), + ) + + // assertNoRemoteFlows verifies that the remote flows exit "soon". + // + // Note that in practice this happens very quickly, but in an edge case it + // could take 10s (sql.distsql.flow_stream_timeout). That edge case occurs + // when the server-side goroutine of the SetupFlow RPC is scheduled after + // - the gateway flow exits with an error + // - the CancelDeadFlows RPC for the remote flow in question completes. + // With such setup the FlowStream RPC of the outbox will time out after 10s. + assertNoRemoteFlows := func() { + testutils.SucceedsSoon(t, func() error { + for i, remoteNode := range []*distsql.ServerImpl{ + tc.Server(1).DistSQLServer().(*distsql.ServerImpl), + tc.Server(2).DistSQLServer().(*distsql.ServerImpl), + } { + if n := remoteNode.NumRemoteRunningFlows(); n != 0 { + return errors.Newf("%d remote flows still running on n%d", n, i+2) + } + } + return nil + }) + } + + // Run query twice while injecting an error on the remote nodes. + for i := 0; i < 2; i++ { + query := queries[i] + nodeID := stmtToNodeIDForError[query] + t.Logf("running %q with error being injected on n%d", query, nodeID) + _, err := db.ExecContext(ctx, query) + require.True(t, strings.Contains(err.Error(), getError(nodeID).Error())) + assertNoRemoteFlows() + } + + // Sanity check that the query doesn't error out without error injection. + t.Logf("running %q with no error injection", queries[2]) + _, err := db.ExecContext(ctx, queries[2]) + require.NoError(t, err) + assertNoRemoteFlows() +} diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index c74c3ca46442..959fc2e5e77f 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" @@ -292,6 +293,10 @@ type TestingKnobs struct { // ProcessorNoTracingSpan is used to disable the creation of a tracing span // in ProcessorBase.StartInternal if the tracing is enabled. ProcessorNoTracingSpan bool + + // SetupFlowCb, when non-nil, is called by the execinfrapb.DistSQLServer + // when responding to SetupFlow RPCs. + SetupFlowCb func(base.SQLInstanceID, *execinfrapb.SetupFlowRequest) error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index e58b22123f47..a76c7fddaebc 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -115,9 +115,6 @@ type Flow interface { // query. IsLocal() bool - // IsVectorized returns whether this flow will run with vectorized execution. - IsVectorized() bool - // StatementSQL is the SQL statement for which this flow is executing. It is // populated on a best effort basis (only available for user-issued queries // that are also not like BulkIO/CDC related). @@ -137,9 +134,16 @@ type Flow interface { // on behalf of this Flow. MemUsage() int64 - // Cancel cancels the flow by canceling its context. + // Cancel cancels the flow by canceling its context. Safe to be called from + // any goroutine but **cannot** be called after (or concurrently with) + // Cleanup. Cancel() + // AddOnCleanup adds a callback to be executed at the very end of Cleanup. + // Callbacks are put on the stack meaning that AddOnCleanup is called + // multiple times, then the "later" callbacks are executed first. + AddOnCleanup(fn func()) + // Cleanup must be called whenever the flow is done (meaning it either // completes gracefully after all processors and mailboxes exited or an // error is encountered that stops the flow from making progress). The @@ -442,11 +446,6 @@ func (f *FlowBase) IsLocal() bool { return f.Local } -// IsVectorized returns whether this flow will run with vectorized execution. -func (f *FlowBase) IsVectorized() bool { - panic("IsVectorized should not be called on FlowBase") -} - // Start is part of the Flow interface. func (f *FlowBase) Start(ctx context.Context) error { return f.StartInternal(ctx, f.processors) @@ -520,6 +519,19 @@ func (f *FlowBase) Cancel() { f.ctxCancel() } +// AddOnCleanup is part of the Flow interface. +func (f *FlowBase) AddOnCleanup(fn func()) { + if f.onFlowCleanup != nil { + oldOnFlowCleanup := f.onFlowCleanup + f.onFlowCleanup = func() { + fn() + oldOnFlowCleanup() + } + } else { + f.onFlowCleanup = fn + } +} + // Cleanup is part of the Flow interface. // NOTE: this implements only the shared clean up logic between row-based and // vectorized flows. diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index 4a5f4ae19163..c14c5bd29175 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -875,8 +875,6 @@ func (p *PhysicalPlan) PopulateEndpoints() { // GenerateFlowSpecs takes a plan (with populated endpoints) and generates the // set of FlowSpecs (one per node involved in the plan). -// -// gateway is the current node's SQLInstanceID. func (p *PhysicalPlan) GenerateFlowSpecs() map[base.SQLInstanceID]*execinfrapb.FlowSpec { flowID := execinfrapb.FlowID{ UUID: p.FlowID, @@ -886,7 +884,7 @@ func (p *PhysicalPlan) GenerateFlowSpecs() map[base.SQLInstanceID]*execinfrapb.F for _, proc := range p.Processors { flowSpec, ok := flows[proc.SQLInstanceID] if !ok { - flowSpec = NewFlowSpec(flowID, p.GatewaySQLInstanceID) + flowSpec = newFlowSpec(flowID, p.GatewaySQLInstanceID) flows[proc.SQLInstanceID] = flowSpec } flowSpec.Processors = append(flowSpec.Processors, proc.Spec) diff --git a/pkg/sql/physicalplan/specs.go b/pkg/sql/physicalplan/specs.go index d7e2700162da..422fa4952625 100644 --- a/pkg/sql/physicalplan/specs.go +++ b/pkg/sql/physicalplan/specs.go @@ -23,9 +23,9 @@ var flowSpecPool = sync.Pool{ }, } -// NewFlowSpec returns a new FlowSpec, which may have non-zero capacity in its +// newFlowSpec returns a new FlowSpec, which may have non-zero capacity in its // slice fields. -func NewFlowSpec(flowID execinfrapb.FlowID, gateway base.SQLInstanceID) *execinfrapb.FlowSpec { +func newFlowSpec(flowID execinfrapb.FlowID, gateway base.SQLInstanceID) *execinfrapb.FlowSpec { spec := flowSpecPool.Get().(*execinfrapb.FlowSpec) spec.FlowID = flowID spec.Gateway = gateway diff --git a/pkg/sql/rowflow/row_based_flow.go b/pkg/sql/rowflow/row_based_flow.go index 4229ad664833..489d2e348447 100644 --- a/pkg/sql/rowflow/row_based_flow.go +++ b/pkg/sql/rowflow/row_based_flow.go @@ -431,11 +431,6 @@ func (f *rowBasedFlow) setupRouter(spec *execinfrapb.OutputRouterSpec) (router, return makeRouter(spec, streams) } -// IsVectorized is part of the flowinfra.Flow interface. -func (f *rowBasedFlow) IsVectorized() bool { - return false -} - // Release releases this rowBasedFlow back to the pool. func (f *rowBasedFlow) Release() { *f = rowBasedFlow{}