Skip to content

Commit

Permalink
colexec: improve eager cancellation in parallel unordered sync
Browse files Browse the repository at this point in the history
This commit improves recently merged fix in
dda8b3a. In particular, we have eager
cancellation of work in the parallel unordered synchronizer for local
plans, and we now do that only for subtrees that have no PUSes in them.
This commit lifts that restriction by swallowing all context
cancellation errors in the draining state.

The rationale for why this behavior is safe is the following:
- if the query should result in an error, then some other error must
have been propagated to the client, and this is what caused the sync to
transition into the draining state in the first place. (We do replace
errors for the client in one case - set `DistSQLReceiver.SetError` where
some errors from KV have higher priority then others, but it isn't
applicable here.)
- if the query should not result in an error and should succeed, yet we
have some pending context cancellation errors, then it must be the case
that query execution was short-circuited (e.g. because of the LIMIT), so
we can pretend the part of the execution that hit the pending error
didn't actually run since clearly it wasn't necessary to compute the
query result.

Note that we couldn't swallow all types of errors in the draining state
(e.g. ReadWithinUncertaintyIntervalError that comes from the KV layer
results in "poisoning" the txn, so we need to propagate it to the
client), so we only have a single error type that we swallow.

Release note: None
  • Loading branch information
yuzefovich committed Jul 24, 2024
1 parent 67dfec3 commit fc5e9f9
Showing 1 changed file with 62 additions and 80 deletions.
142 changes: 62 additions & 80 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,10 @@ type ParallelUnorderedSynchronizer struct {
// streamingMemAcc for the metadata.
metadataAccountedFor int64
inputs []colexecargs.OpWithMetaInfo
// cancelInputsOnDrain stores context cancellation functions for each of the
// inputs. The slice is only allocated and populated if eager cancellation
// on drain should be performed (see the comments in
// NewParallelUnorderedSynchronizer for more details).
cancelInputsOnDrain []context.CancelFunc
tracingSpans []*tracing.Span
// cancelLocalInput stores context cancellation functions for each of the
// inputs. The slice is only allocated and populated for local plans.
cancelLocalInput []context.CancelFunc
tracingSpans []*tracing.Span
// readNextBatch is a slice of channels, where each channel corresponds to the
// input at the same index in inputs. It is used as a barrier for input
// goroutines to wait on until the Next goroutine signals that it is safe to
Expand Down Expand Up @@ -129,21 +127,6 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execopnode.
return s.inputs[nth].Root
}

// hasParallelUnorderedSync returns whether there is at least one parallel
// unordered sync in the tree of operators rooted in op.
func hasParallelUnorderedSync(op execopnode.OpNode) bool {
if _, ok := op.(*ParallelUnorderedSynchronizer); ok {
return true
}
const verbose = true
for i := 0; i < op.ChildCount(verbose); i++ {
if hasParallelUnorderedSync(op.Child(i, verbose)) {
return true
}
}
return false
}

// NewParallelUnorderedSynchronizer creates a new ParallelUnorderedSynchronizer.
// On the first call to Next, len(inputs) goroutines are spawned to read each
// input asynchronously (to not be limited by a slow input). These will
Expand All @@ -157,37 +140,9 @@ func NewParallelUnorderedSynchronizer(
inputs []colexecargs.OpWithMetaInfo,
wg *sync.WaitGroup,
) *ParallelUnorderedSynchronizer {
// Check whether the synchronizer should be allowed to eagerly cancel its
// inputs when it transitions into the draining state. This optimization is
// needed to support locality-optimized search feature.
//
// If the plan is distributed, there might be an inbox in the input tree,
// and the synchronizer cannot cancel the work eagerly because canceling the
// context would break the gRPC stream and make it impossible to fetch the
// remote metadata. Furthermore, it will result in the remote flow
// cancellation.
// TODO(yuzefovich): we could allow eager cancellation in the distributed
// plans too, but only of inputs that don't have inboxes in the input tree.
var allowEagerCancellationOnDrain bool
var cancelLocalInput []context.CancelFunc
if flowCtx.Local {
// If the plan is local, then the only requirement for allowing eager
// cancellation on drain is that there are no other parallel unordered
// syncs in the input trees. This is needed since the "child" sync won't
// be able to distinguish the benign context cancellation error from a
// true query execution error, so it can "poison" the query execution if
// the child sync hasn't transitioned into the draining mode when we
// perform the eager cancellation.
allowEagerCancellationOnDrain = true
for _, input := range inputs {
if hasParallelUnorderedSync(input.Root) {
allowEagerCancellationOnDrain = false
break
}
}
}
var cancelInputs []context.CancelFunc
if allowEagerCancellationOnDrain {
cancelInputs = make([]context.CancelFunc, len(inputs))
cancelLocalInput = make([]context.CancelFunc, len(inputs))
}
readNextBatch := make([]chan struct{}, len(inputs))
for i := range readNextBatch {
Expand All @@ -196,17 +151,17 @@ func NewParallelUnorderedSynchronizer(
readNextBatch[i] = make(chan struct{}, 1)
}
return &ParallelUnorderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
streamingMemAcc: streamingMemAcc,
inputs: inputs,
cancelInputsOnDrain: cancelInputs,
tracingSpans: make([]*tracing.Span, len(inputs)),
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
nextBatch: make([]func(), len(inputs)),
externalWaitGroup: wg,
internalWaitGroup: &sync.WaitGroup{},
flowCtx: flowCtx,
processorID: processorID,
streamingMemAcc: streamingMemAcc,
inputs: inputs,
cancelLocalInput: cancelLocalInput,
tracingSpans: make([]*tracing.Span, len(inputs)),
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
nextBatch: make([]func(), len(inputs)),
externalWaitGroup: wg,
internalWaitGroup: &sync.WaitGroup{},
// batchCh is a buffered channel in order to offer non-blocking writes to
// input goroutines. During normal operation, this channel will have at most
// len(inputs) messages. However, during DrainMeta, inputs might need to
Expand All @@ -230,8 +185,17 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) {
inputCtx, s.tracingSpans[i] = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, fmt.Sprintf("parallel unordered sync input %d", i), s.processorID,
)
if s.cancelInputsOnDrain != nil {
inputCtx, s.cancelInputsOnDrain[i] = context.WithCancel(inputCtx)
if s.flowCtx.Local {
// If the plan is local, there are no colrpc.Inboxes in this input
// tree, and the synchronizer can cancel the current work eagerly
// when transitioning into draining.
//
// If the plan is distributed, there might be an inbox in the
// input tree, and the synchronizer cannot cancel the work eagerly
// because canceling the context would break the gRPC stream and
// make it impossible to fetch the remote metadata. Furthermore, it
// will result in the remote flow cancellation.
inputCtx, s.cancelLocalInput[i] = context.WithCancel(inputCtx)
}
input.Root.Init(inputCtx)
s.nextBatch[i] = func(inputOp colexecop.Operator, inputIdx int) func() {
Expand Down Expand Up @@ -298,17 +262,6 @@ func (s *ParallelUnorderedSynchronizer) init() {
switch state {
case parallelUnorderedSynchronizerStateRunning:
if err := colexecerror.CatchVectorizedRuntimeError(s.nextBatch[inputIdx]); err != nil {
if s.getState() == parallelUnorderedSynchronizerStateDraining && s.Ctx.Err() == nil && s.cancelInputsOnDrain != nil {
// The synchronizer has just transitioned into the
// draining state and eagerly canceled work of this
// input. That cancellation is likely to manifest
// itself as the context.Canceled error, but it
// could be another error too; in any case, we will
// swallow the error because the user of the
// synchronizer is only interested in the metadata
// at this point.
continue
}
sendErr(err)
// After we encounter an error, we proceed to draining.
// If this is a context cancellation, we'll realize that
Expand Down Expand Up @@ -458,15 +411,44 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
if prevState == parallelUnorderedSynchronizerStateUninitialized {
s.init()
}
// Cancel all inputs if eager cancellation is allowed.
for _, cancelFunc := range s.cancelInputsOnDrain {
// Cancel all inputs if we have a local plan.
for _, cancelFunc := range s.cancelLocalInput {
if cancelFunc != nil {
// Note that if Init was never called, cancelFunc will be nil, in
// which case there is nothing to cancel.
cancelFunc()
}
}

bufferMeta := func(meta []execinfrapb.ProducerMetadata) {
if s.flowCtx.Local {
// Given that the synchronizer is draining, it is safe to ignore all
// context cancellation errors in the metadata for local plans. This
// is the case because:
// - if the query should result in an error, then some other error
// was already propagated to the client, and this was the reason for
// why we transitioned into draining;
// - if the query should be successful, yet we have some pending
// context cancellation errors, then it must be the case that query
// execution was short-circuited (e.g. because of the LIMIT), so we
// can pretend the part of the execution that hit the pending error
// didn't happen since clearly it wasn't necessary to compute the
// query result.
//
// Note that we cannot ignore all errors here since some of them
// (like ReadWithinUncertaintyIntervalError) could poison the txn
// and need to be propagated to the client, so we only swallow the
// cancellation errors here.
for _, m := range meta {
if m.Err == nil || !errors.Is(m.Err, context.Canceled) {
s.bufferedMeta = append(s.bufferedMeta, m)
}
}
} else {
s.bufferedMeta = append(s.bufferedMeta, meta...)
}
}

// Non-blocking drain of batchCh. This is important mostly because of the
// following edge case: all n inputs have pushed batches to the batchCh, so
// there are currently n messages. Next notifies the last read input to
Expand All @@ -483,7 +465,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
if msg == nil {
batchChDrained = true
} else if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
default:
batchChDrained = true
Expand All @@ -502,15 +484,15 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
// Drain the batchCh, this reads the metadata that was pushed.
for msg := <-s.batchCh; msg != nil; msg = <-s.batchCh {
if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
}

// Buffer any errors that may have happened without blocking on the channel.
for exitLoop := false; !exitLoop; {
select {
case err := <-s.errCh:
s.bufferedMeta = append(s.bufferedMeta, execinfrapb.ProducerMetadata{Err: err})
bufferMeta([]execinfrapb.ProducerMetadata{{Err: err}})
default:
exitLoop = true
}
Expand Down

0 comments on commit fc5e9f9

Please sign in to comment.