Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: colexec: fix incorrect eager cancellation in parallel unordered sync #127657

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 74 additions & 34 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ type ParallelUnorderedSynchronizer struct {
processorID int32
allocator *colmem.Allocator
inputs []colexecargs.OpWithMetaInfo
inputCtxs []context.Context
// cancelLocalInput stores context cancellation functions for each of the
// inputs. The functions are populated only if localPlan is true.
cancelLocalInput []context.CancelFunc
tracingSpans []*tracing.Span
// cancelInputsOnDrain stores context cancellation functions for each of the
// inputs. The slice is only allocated and populated if eager cancellation
// on drain should be performed (see the comments in
// NewParallelUnorderedSynchronizer for more details).
cancelInputsOnDrain []context.CancelFunc
tracingSpans []*tracing.Span
// readNextBatch is a slice of channels, where each channel corresponds to the
// input at the same index in inputs. It is used as a barrier for input
// goroutines to wait on until the Next goroutine signals that it is safe to
Expand Down Expand Up @@ -125,6 +126,21 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execopnode.
return s.inputs[nth].Root
}

// hasParallelUnorderedSync returns whether there is at least one parallel
// unordered sync in the tree of operators rooted in op.
func hasParallelUnorderedSync(op execopnode.OpNode) bool {
if _, ok := op.(*ParallelUnorderedSynchronizer); ok {
return true
}
const verbose = true
for i := 0; i < op.ChildCount(verbose); i++ {
if hasParallelUnorderedSync(op.Child(i, verbose)) {
return true
}
}
return false
}

// NewParallelUnorderedSynchronizer creates a new ParallelUnorderedSynchronizer.
// On the first call to Next, len(inputs) goroutines are spawned to read each
// input asynchronously (to not be limited by a slow input). These will
Expand All @@ -139,25 +155,56 @@ func NewParallelUnorderedSynchronizer(
inputs []colexecargs.OpWithMetaInfo,
wg *sync.WaitGroup,
) *ParallelUnorderedSynchronizer {
// Check whether the synchronizer should be allowed to eagerly cancel its
// inputs when it transitions into the draining state. This optimization is
// needed to support locality-optimized search feature.
//
// If the plan is distributed, there might be an inbox in the input tree,
// and the synchronizer cannot cancel the work eagerly because canceling the
// context would break the gRPC stream and make it impossible to fetch the
// remote metadata. Furthermore, it will result in the remote flow
// cancellation.
// TODO(yuzefovich): we could allow eager cancellation in the distributed
// plans too, but only of inputs that don't have inboxes in the input tree.
var allowEagerCancellationOnDrain bool
if flowCtx.Local {
// If the plan is local, then the only requirement for allowing eager
// cancellation on drain is that there are no other parallel unordered
// syncs in the input trees. This is needed since the "child" sync won't
// be able to distinguish the benign context cancellation error from a
// true query execution error, so it can "poison" the query execution if
// the child sync hasn't transitioned into the draining mode when we
// perform the eager cancellation.
allowEagerCancellationOnDrain = true
for _, input := range inputs {
if hasParallelUnorderedSync(input.Root) {
allowEagerCancellationOnDrain = false
break
}
}
}
var cancelInputs []context.CancelFunc
if allowEagerCancellationOnDrain {
cancelInputs = make([]context.CancelFunc, len(inputs))
}
readNextBatch := make([]chan struct{}, len(inputs))
for i := range readNextBatch {
// Buffer readNextBatch chans to allow for non-blocking writes. There will
// only be one message on the channel at a time.
readNextBatch[i] = make(chan struct{}, 1)
}
return &ParallelUnorderedSynchronizer{
flowCtx: flowCtx,
processorID: processorID,
allocator: allocator,
inputs: inputs,
inputCtxs: make([]context.Context, len(inputs)),
cancelLocalInput: make([]context.CancelFunc, len(inputs)),
tracingSpans: make([]*tracing.Span, len(inputs)),
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
nextBatch: make([]func(), len(inputs)),
externalWaitGroup: wg,
internalWaitGroup: &sync.WaitGroup{},
flowCtx: flowCtx,
processorID: processorID,
allocator: allocator,
inputs: inputs,
cancelInputsOnDrain: cancelInputs,
tracingSpans: make([]*tracing.Span, len(inputs)),
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
nextBatch: make([]func(), len(inputs)),
externalWaitGroup: wg,
internalWaitGroup: &sync.WaitGroup{},
// batchCh is a buffered channel in order to offer non-blocking writes to
// input goroutines. During normal operation, this channel will have at most
// len(inputs) messages. However, during DrainMeta, inputs might need to
Expand All @@ -177,22 +224,14 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) {
return
}
for i, input := range s.inputs {
s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan(
var inputCtx context.Context
inputCtx, s.tracingSpans[i] = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, fmt.Sprintf("parallel unordered sync input %d", i), s.processorID,
)
if s.flowCtx.Local {
// If the plan is local, there are no colrpc.Inboxes in this input
// tree, and the synchronizer can cancel the current work eagerly
// when transitioning into draining.
//
// If the plan is distributed, there might be an inbox in the
// input tree, and the synchronizer cannot cancel the work eagerly
// because canceling the context would break the gRPC stream and
// make it impossible to fetch the remote metadata. Furthermore, it
// will result in the remote flow cancellation.
s.inputCtxs[i], s.cancelLocalInput[i] = context.WithCancel(s.inputCtxs[i])
if s.cancelInputsOnDrain != nil {
inputCtx, s.cancelInputsOnDrain[i] = context.WithCancel(inputCtx)
}
input.Root.Init(s.inputCtxs[i])
input.Root.Init(inputCtx)
s.nextBatch[i] = func(inputOp colexecop.Operator, inputIdx int) func() {
return func() {
s.batches[inputIdx] = inputOp.Next()
Expand Down Expand Up @@ -257,7 +296,7 @@ func (s *ParallelUnorderedSynchronizer) init() {
switch state {
case parallelUnorderedSynchronizerStateRunning:
if err := colexecerror.CatchVectorizedRuntimeError(s.nextBatch[inputIdx]); err != nil {
if s.getState() == parallelUnorderedSynchronizerStateDraining && s.Ctx.Err() == nil && s.cancelLocalInput[inputIdx] != nil {
if s.getState() == parallelUnorderedSynchronizerStateDraining && s.Ctx.Err() == nil && s.cancelInputsOnDrain != nil {
// The synchronizer has just transitioned into the
// draining state and eagerly canceled work of this
// input. That cancellation is likely to manifest
Expand Down Expand Up @@ -417,10 +456,11 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
if prevState == parallelUnorderedSynchronizerStateUninitialized {
s.init()
}
// Cancel all local inputs (we will still wait for all remote ones to
// return the next batch).
for _, cancelFunc := range s.cancelLocalInput {
// Cancel all inputs if eager cancellation is allowed.
for _, cancelFunc := range s.cancelInputsOnDrain {
if cancelFunc != nil {
// Note that if Init was never called, cancelFunc will be nil, in
// which case there is nothing to cancel.
cancelFunc()
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colflow/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ func convertToVecTree(
)
fuseOpt := flowinfra.FuseNormally
if flowCtx.Local && !execinfra.HasParallelProcessors(flow) {
// TODO(yuzefovich): this check doesn't exactly match what we have on
// the main code path where we use !LocalState.MustUseLeafTxn() in the
// conditional. Concretely, it means that if we choose to use the
// Streamer at the execution time, we will use FuseNormally, yet here
// we'd pick FuseAggressively. The issue is minor though since we do
// capture the correct vectorized plan in the stmt bundle, so only
// explicit EXPLAIN (VEC) is affected.
fuseOpt = flowinfra.FuseAggressively
}
opChains, _, err = creator.setupFlow(ctx, flow.Processors, fuseOpt)
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/union
Original file line number Diff line number Diff line change
Expand Up @@ -678,3 +678,25 @@ SELECT a, b AS b1, b AS b2 FROM abc INTERSECT SELECT a, c, b FROM abc ORDER by a
1 1 1
1 2 2
2 2 2

# Regression test for #127043 where eager cancellation of the parallel unordered
# synchronizer was causing spurious errors on queries with LIMIT.
statement ok
CREATE TABLE t127043_1 (k1 INT, v1 INT, INDEX (k1));
INSERT INTO t127043_1 VALUES (1, 1);
CREATE TABLE t127043_2 (k2 INT, v2 INT, INDEX (k2));
INSERT INTO t127043_2 VALUES (1, 1);
CREATE TABLE t127043_3 (k3 INT, v3 INT, INDEX (k3));
INSERT INTO t127043_3 VALUES (1, 1);
CREATE VIEW v127043 (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1@t127043_1_k1_idx
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2@t127043_2_k2_idx
UNION SELECT
k3 AS k, v3 AS v FROM t127043_3@t127043_3_k3_idx;

query II
SELECT k, v FROM v127043 WHERE k = 1 LIMIT 1;
----
1 1