Skip to content

Commit

Permalink
colexec: harden eager cancellation in parallel unordered sync
Browse files Browse the repository at this point in the history
This commit hardens the eager cancellation mechanism in the parallel
unordered synchronizer. It was recently fixed in dda8b3a,
but the newly added test exposed a bug where the eager cancellation in
a child PUS could poison the query execution of another input of the
parent PUS, incorrectly failing the query altogether. More detailed
description can be found [here](cockroachdb#127942 (comment)),
but in short, due sharing of the same leaf txn between most operators in
a flow, eager cancellation of one operator could lead to poisoning the
execution of another operator which could only happen with a hierarchy
of PUSes. This commit fixes such situation by swallowing all context
cancellation errors in draining state of the PUS _even if_ that
particular PUS didn't eagerly cancel its inputs.

The rationale for why this behavior is safe is the following:
- if the query should result in an error, then some other error must
have been propagated to the client, and this is what caused the sync to
transition into the draining state in the first place. (We do replace
errors for the client in one case - set `DistSQLReceiver.SetError` where
some errors from KV have higher priority then others, but it isn't
applicable here.)
- if the query should not result in an error and should succeed, yet we
have some pending context cancellation errors, then it must be the case
that query execution was short-circuited (e.g. because of the LIMIT), so
we can pretend the part of the execution that hit the pending error
didn't actually run since clearly it wasn't necessary to compute the
query result.

Note that we couldn't swallow all types of errors in the draining state
(e.g. ReadWithinUncertaintyIntervalError that comes from the KV layer
results in "poisoning" the txn, so we need to propagate it to the
client), so we only have a single error type that we swallow.

Additionally, while working on this change I realized another reason for
why we don't want to lift the restriction for having eager cancellation
only on "leaf" PUSes, so I extended the comment. This commit also adds
a few more logic tests.

Release note: None
  • Loading branch information
yuzefovich committed Oct 30, 2024
1 parent 6db8137 commit a67ae51
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ go_library(
"//pkg/sql/types",
"//pkg/storage/enginepb",
"//pkg/util/buildutil",
"//pkg/util/cancelchecker",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/intsets",
Expand Down
51 changes: 47 additions & 4 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package colexec
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -171,7 +173,9 @@ func NewParallelUnorderedSynchronizer(
// be able to distinguish the benign context cancellation error from a
// true query execution error, so it can "poison" the query execution if
// the child sync hasn't transitioned into the draining mode when we
// perform the eager cancellation.
// perform the eager cancellation. The child sync also won't distinguish
// between the benign context cancellation and the flow cancellation, so
// it might not collect the metadata from its inputs when it should.
allowEagerCancellationOnDrain = true
for _, input := range inputs {
if hasParallelUnorderedSync(input.Root) {
Expand Down Expand Up @@ -302,6 +306,9 @@ func (s *ParallelUnorderedSynchronizer) init() {
// swallow the error because the user of the
// synchronizer is only interested in the metadata
// at this point.
// TODO(yuzefovich): this swallowing is redundant
// with what we do in DrainMeta, consider removing
// this block.
continue
}
sendErr(err)
Expand Down Expand Up @@ -462,6 +469,42 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
}
}

bufferMeta := func(meta []execinfrapb.ProducerMetadata) {
if !s.flowCtx.Local {
s.bufferedMeta = append(s.bufferedMeta, meta...)
return
}
// Given that the synchronizer is draining, it is safe to ignore all
// context cancellation errors in the metadata for local plans. This is
// the case because:
// - if the query should result in an error, then some other error was
// already propagated to the client, and this was the reason for why we
// transitioned into draining;
// - if the query should be successful, yet we have some pending context
// cancellation errors, then it must be the case that query execution
// was short-circuited (e.g. because of the LIMIT), so we can pretend
// the part of the execution that hit the pending error didn't happen
// since clearly it wasn't necessary to compute the query result.
//
// Note that we cannot ignore all errors here since some of them (like
// ReadWithinUncertaintyIntervalError) could poison the txn and need to
// be propagated to the client, so we only swallow the cancellation
// errors here.
for _, m := range meta {
if m.Err == nil ||
// This is ugly, but the context cancellation if observed in the
// KV layer can result in kvpb errors that don't satisfy
// errors.Is check (because they don't serialize the original
// error), so we have this string matching instead.
(!strings.Contains(m.Err.Error(), context.Canceled.Error()) &&
// If the cancellation is observed by the CancelChecker,
// then it propagates a QueryCanceledError.
!errors.Is(m.Err, cancelchecker.QueryCanceledError)) {
s.bufferedMeta = append(s.bufferedMeta, m)
}
}
}

// Non-blocking drain of batchCh. This is important mostly because of the
// following edge case: all n inputs have pushed batches to the batchCh, so
// there are currently n messages. Next notifies the last read input to
Expand All @@ -478,7 +521,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
if msg == nil {
batchChDrained = true
} else if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
default:
batchChDrained = true
Expand All @@ -497,15 +540,15 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
// Drain the batchCh, this reads the metadata that was pushed.
for msg := <-s.batchCh; msg != nil; msg = <-s.batchCh {
if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
}

// Buffer any errors that may have happened without blocking on the channel.
for exitLoop := false; !exitLoop; {
select {
case err := <-s.errCh:
s.bufferedMeta = append(s.bufferedMeta, execinfrapb.ProducerMetadata{Err: err})
bufferMeta([]execinfrapb.ProducerMetadata{{Err: err}})
default:
exitLoop = true
}
Expand Down
39 changes: 37 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/union
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,50 @@ CREATE TABLE t127043_2 (k2 INT, v2 INT, INDEX (k2));
INSERT INTO t127043_2 VALUES (1, 1);
CREATE TABLE t127043_3 (k3 INT, v3 INT, INDEX (k3));
INSERT INTO t127043_3 VALUES (1, 1);
CREATE VIEW v127043 (k, v) AS
CREATE VIEW v127043_3 (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2
UNION SELECT
k3 AS k, v3 AS v FROM t127043_3;
CREATE VIEW v127043_3_idx (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1@t127043_1_k1_idx
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2@t127043_2_k2_idx
UNION SELECT
k3 AS k, v3 AS v FROM t127043_3@t127043_3_k3_idx;
CREATE VIEW v127043_2 (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2;

statement ok
ANALYZE t127043_1;

statement ok
ANALYZE t127043_2;

statement ok
ANALYZE t127043_3;

# Scan and filter in all UNION branches.
query II
SELECT k, v FROM v127043_3 WHERE k = 1 LIMIT 1;
----
1 1

# Scan and index join in all UNION branches.
query II
SELECT k, v FROM v127043_3_idx WHERE k = 1 LIMIT 1;
----
1 1

# Hash join with two UNION branches (with a scan and an index join) on one side
# and a scan on the other.
query II
SELECT k, v FROM v127043 WHERE k = 1 LIMIT 1;
SELECT k, v FROM v127043_2 INNER JOIN t127043_3 ON k = k3 WHERE k = 1 LIMIT 1;
----
1 1

0 comments on commit a67ae51

Please sign in to comment.