Skip to content

Commit

Permalink
sql: do not wait for setup of remote flows on the gateway
Browse files Browse the repository at this point in the history
Previously, when setting up the flows for a distributed plan we would
issue all SetupFlow RPCs to the remote nodes and wait for all of them to
come back before proceeding with the execution of the local flow. This
can introduce an execution stall, especially in multi-region setups, if
remote flows depend on the local flow for some of the data. This was
suboptimal, and this commit makes it so that we no longer wait for the
RPCs to come back and start executing the local flow right away. We now
spin up a separate goroutine that waits for the RPCs to come back, and
if an error is encountered (which shouldn't happen often), then that
goroutine sets the error on the DistSQLReceiver and cancels the local
flow. Setting the error on the receiver, in turn, will make it so that
all remote flows will be canceled via CancelDeadFlows RPC (which might
be faster than via the distributed query shutdown triggered when the
local flow is canceled).

An additional change is that we now perform the setup of the local flow
on the gateway first, and only if that is successful, we proceed to the
setup of the remote flows. This acts as a sanity check on the validity
of the flow spec and should make it less likely that the remote flows
setup fails.

This required some changes around the error handling of the
DistSQLReceiver to make it concurrency safe. One option there was to make
`SetError` and `Err` methods of `rowResultWriter` interface concurrency
safe, but there are several implementations, and I decided to make the
adjustment to the DistSQLReceiver itself since this concurrency safety is
only needed there, and it seems somewhat wrong to impose the requirement
on all of the implementations.

Additionally, in order to avoid locking the mutex as much as possible,
the `status` of the receiver is not protected by the mutex. This is
achieved by the new goroutine not updating the status and, instead,
letting the main goroutine "resolve" the status the next time a meta
object is pushed. The idea is that the cancellation of the local flow
shuts down the local flow's execution making it encounter an error which
is then propagated as metadata. Thus, this "status resolution" should
happen fairly quickly, and this setup allows us to avoid the locking in
most scenarios when pushing rows and batches.

Further changes were needed around `saveFlows` function as well as
releasing flow specs back to the pool. The former had to be moved to be
done sooner (right after setting up the local flow), and for the latter
we had to move the release of the flow specs for the remote flows to
right after the corresponding SetupFlow RPCs are issued. This ordering
ensures that the flow specs are always released, but after all of their
uses (both `saveFlows` and the RPCs use them).

Yet another complication was around the concurrency between
`Flow.Cleanup` being called and the new goroutine receiving an error
from the RPC. At the very end of `Flow.Cleanup` the flow object is
released, so the new goroutine cannot call `Flow.Cancel` after that.
Additionally, since the mutex protection of the `rowResultWriter` by the
DistSQLReceiver lasts only until `DistSQLPlanner.Run` returns (which is
right after `Flow.Cleanup` returns), the new goroutine will attempt to
set the error only if the cleanup hasn't been performed. This is
achieved by having a mutex-protected boolean, and the boolean is only
introduced if the new goroutine is spun up.

Overall, as this whole comment suggests, it has been quite tricky to get
things right (I hope I did), so one might wonder what simplifications,
if any, could be made. I considered and (mostly) rejected several:
- Ignoring the particular error that was returned by the RPCs and just
canceling the local flow. This would allow us to remove the complexity
of the concurrency safety with the error handling of the
DistSQLReceiver. We would still properly shutdown the whole distributed
plan. However, the downside is that we would lose the true reason for
the shutdown - most likely we would return "context canceled" to the
client instead of the actual error. On the other hand, the errors from
the RPCs should be fairly rare that it might be worth giving this more
thought.
- Not canceling the local flow since just setting the error on the
receiver would be sufficient for the query to eventually shutdown. The
obvious downside is that we might do more work after having received an
error from the RPC, and there is little upside I can see.
- But the most interesting simplification would be to just not create
the new goroutine in the first place. The idea is that if any of the
SetupFlow RPCs fail, then the flows on other nodes would hit the "no
inbound stream timeout" error (after 10s - by default - of not being
connected to) which would start the shutdown of the whole plan. This
idea would eliminate effectively all complexity of this commit which
seems quite appealing. The downside here would be the combination of the
downsides of the ideas above - namely, now the query would result in
this opaque "no inbound stream" error (and we've been struggling with
diagnosing those, so I'm not eager to introduce another way it could
occur), and we would be doing wasteful work during this timeout window.

Release note (performance improvement): The setup of the distributed
query execution is now fully parallelized which should reduce the query
latencies, especially in multi-region setups.
  • Loading branch information
yuzefovich committed Nov 11, 2022
1 parent 8ed1691 commit 0c1095e
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 156 deletions.
5 changes: 0 additions & 5 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,6 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string {
return f.tempStorage.path
}

// IsVectorized is part of the flowinfra.Flow interface.
func (f *vectorizedFlow) IsVectorized() bool {
return true
}

// ConcurrentTxnUse is part of the flowinfra.Flow interface. It is conservative
// in that it returns that there is concurrent txn use as soon as any operator
// concurrency is detected. This should be inconsequential for local flows that
Expand Down
13 changes: 7 additions & 6 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (ds *ServerImpl) setupFlow(
ctx = flowCtx.AmbientContext.AnnotateCtx(ctx)
telemetry.Inc(sqltelemetry.DistSQLExecCounter)
}
if f.IsVectorized() {
if isVectorized {
telemetry.Inc(sqltelemetry.VecExecCounter)
}

Expand Down Expand Up @@ -566,14 +566,10 @@ func (ds *ServerImpl) SetupLocalSyncFlow(
batchOutput execinfra.BatchReceiver,
localState LocalState,
) (context.Context, flowinfra.Flow, execopnode.OpChains, error) {
ctx, f, opChains, err := ds.setupFlow(
return ds.setupFlow(
ctx, tracing.SpanFromContext(ctx), parentMonitor, &mon.BoundAccount{}, /* reserved */
req, output, batchOutput, localState,
)
if err != nil {
return nil, nil, nil, err
}
return ctx, f, opChains, err
}

// setupSpanForIncomingRPC creates a span for a SetupFlow RPC. The caller must
Expand Down Expand Up @@ -618,6 +614,11 @@ func (ds *ServerImpl) SetupFlow(
ctx context.Context, req *execinfrapb.SetupFlowRequest,
) (*execinfrapb.SimpleResponse, error) {
log.VEventf(ctx, 1, "received SetupFlow request from n%v for flow %v", req.Flow.Gateway, req.Flow.FlowID)
if cb := ds.TestingKnobs.SetupFlowCb; cb != nil {
if err := cb(ds.ServerConfig.NodeID.SQLInstanceID(), req); err != nil {
return &execinfrapb.SimpleResponse{Error: execinfrapb.NewError(ctx, err)}, nil
}
}
_, rpcSpan := ds.setupSpanForIncomingRPC(ctx, req)
defer rpcSpan.Finish()

Expand Down
Loading

0 comments on commit 0c1095e

Please sign in to comment.