diff --git a/pkg/sql/colexec/BUILD.bazel b/pkg/sql/colexec/BUILD.bazel index 7d8decab98de..8b77b487f143 100644 --- a/pkg/sql/colexec/BUILD.bazel +++ b/pkg/sql/colexec/BUILD.bazel @@ -77,6 +77,7 @@ go_library( "//pkg/sql/types", "//pkg/storage/enginepb", "//pkg/util/buildutil", + "//pkg/util/cancelchecker", "//pkg/util/duration", # keep "//pkg/util/encoding", # keep "//pkg/util/intsets", diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 9b29c58ad75a..4e0c6c30762a 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -8,6 +8,7 @@ package colexec import ( "context" "fmt" + "strings" "sync" "sync/atomic" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -171,7 +173,9 @@ func NewParallelUnorderedSynchronizer( // be able to distinguish the benign context cancellation error from a // true query execution error, so it can "poison" the query execution if // the child sync hasn't transitioned into the draining mode when we - // perform the eager cancellation. + // perform the eager cancellation. The child sync also won't distinguish + // between the benign context cancellation and the flow cancellation, so + // it might not collect the metadata from its inputs when it should. allowEagerCancellationOnDrain = true for _, input := range inputs { if hasParallelUnorderedSync(input.Root) { @@ -302,6 +306,9 @@ func (s *ParallelUnorderedSynchronizer) init() { // swallow the error because the user of the // synchronizer is only interested in the metadata // at this point. + // TODO(yuzefovich): this swallowing is redundant + // with what we do in DrainMeta, consider removing + // this block. continue } sendErr(err) @@ -462,6 +469,42 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada } } + bufferMeta := func(meta []execinfrapb.ProducerMetadata) { + if !s.flowCtx.Local { + s.bufferedMeta = append(s.bufferedMeta, meta...) + return + } + // Given that the synchronizer is draining, it is safe to ignore all + // context cancellation errors in the metadata for local plans. This is + // the case because: + // - if the query should result in an error, then some other error was + // already propagated to the client, and this was the reason for why we + // transitioned into draining; + // - if the query should be successful, yet we have some pending context + // cancellation errors, then it must be the case that query execution + // was short-circuited (e.g. because of the LIMIT), so we can pretend + // the part of the execution that hit the pending error didn't happen + // since clearly it wasn't necessary to compute the query result. + // + // Note that we cannot ignore all errors here since some of them (like + // ReadWithinUncertaintyIntervalError) could poison the txn and need to + // be propagated to the client, so we only swallow the cancellation + // errors here. + for _, m := range meta { + if m.Err == nil || + // This is ugly, but the context cancellation if observed in the + // KV layer can result in kvpb errors that don't satisfy + // errors.Is check (because they don't serialize the original + // error), so we have this string matching instead. + (!strings.Contains(m.Err.Error(), context.Canceled.Error()) && + // If the cancellation is observed by the CancelChecker, + // then it propagates a QueryCanceledError. + !errors.Is(m.Err, cancelchecker.QueryCanceledError)) { + s.bufferedMeta = append(s.bufferedMeta, m) + } + } + } + // Non-blocking drain of batchCh. This is important mostly because of the // following edge case: all n inputs have pushed batches to the batchCh, so // there are currently n messages. Next notifies the last read input to @@ -478,7 +521,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada if msg == nil { batchChDrained = true } else if msg.meta != nil { - s.bufferedMeta = append(s.bufferedMeta, msg.meta...) + bufferMeta(msg.meta) } default: batchChDrained = true @@ -497,7 +540,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada // Drain the batchCh, this reads the metadata that was pushed. for msg := <-s.batchCh; msg != nil; msg = <-s.batchCh { if msg.meta != nil { - s.bufferedMeta = append(s.bufferedMeta, msg.meta...) + bufferMeta(msg.meta) } } @@ -505,7 +548,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada for exitLoop := false; !exitLoop; { select { case err := <-s.errCh: - s.bufferedMeta = append(s.bufferedMeta, execinfrapb.ProducerMetadata{Err: err}) + bufferMeta([]execinfrapb.ProducerMetadata{{Err: err}}) default: exitLoop = true } diff --git a/pkg/sql/logictest/testdata/logic_test/union b/pkg/sql/logictest/testdata/logic_test/union index e787ed47c926..d177983b8e83 100644 --- a/pkg/sql/logictest/testdata/logic_test/union +++ b/pkg/sql/logictest/testdata/logic_test/union @@ -688,15 +688,50 @@ CREATE TABLE t127043_2 (k2 INT, v2 INT, INDEX (k2)); INSERT INTO t127043_2 VALUES (1, 1); CREATE TABLE t127043_3 (k3 INT, v3 INT, INDEX (k3)); INSERT INTO t127043_3 VALUES (1, 1); -CREATE VIEW v127043 (k, v) AS +CREATE VIEW v127043_3 (k, v) AS + SELECT + k1 AS k, v1 AS v FROM t127043_1 + UNION SELECT + k2 AS k, v2 AS v FROM t127043_2 + UNION SELECT + k3 AS k, v3 AS v FROM t127043_3; +CREATE VIEW v127043_3_idx (k, v) AS SELECT k1 AS k, v1 AS v FROM t127043_1@t127043_1_k1_idx UNION SELECT k2 AS k, v2 AS v FROM t127043_2@t127043_2_k2_idx UNION SELECT k3 AS k, v3 AS v FROM t127043_3@t127043_3_k3_idx; +CREATE VIEW v127043_2 (k, v) AS + SELECT + k1 AS k, v1 AS v FROM t127043_1 + UNION SELECT + k2 AS k, v2 AS v FROM t127043_2; + +statement ok +ANALYZE t127043_1; + +statement ok +ANALYZE t127043_2; + +statement ok +ANALYZE t127043_3; + +# Scan and filter in all UNION branches. +query II +SELECT k, v FROM v127043_3 WHERE k = 1 LIMIT 1; +---- +1 1 + +# Scan and index join in all UNION branches. +query II +SELECT k, v FROM v127043_3_idx WHERE k = 1 LIMIT 1; +---- +1 1 +# Hash join with two UNION branches (with a scan and an index join) on one side +# and a scan on the other. query II -SELECT k, v FROM v127043 WHERE k = 1 LIMIT 1; +SELECT k, v FROM v127043_2 INNER JOIN t127043_3 ON k = k3 WHERE k = 1 LIMIT 1; ---- 1 1