Skip to content

Commit

Permalink
Merge pull request #134609 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-133893

release-24.3: colexec: harden eager cancellation in parallel unordered sync
  • Loading branch information
yuzefovich authored Nov 8, 2024
2 parents 7a19d4a + 0818fe3 commit 8c0dabf
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ go_library(
"//pkg/sql/sqltelemetry", # keep
"//pkg/sql/types",
"//pkg/util/buildutil",
"//pkg/util/cancelchecker",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/intsets",
Expand Down
51 changes: 47 additions & 4 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package colexec
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -171,7 +173,9 @@ func NewParallelUnorderedSynchronizer(
// be able to distinguish the benign context cancellation error from a
// true query execution error, so it can "poison" the query execution if
// the child sync hasn't transitioned into the draining mode when we
// perform the eager cancellation.
// perform the eager cancellation. The child sync also won't distinguish
// between the benign context cancellation and the flow cancellation, so
// it might not collect the metadata from its inputs when it should.
allowEagerCancellationOnDrain = true
for _, input := range inputs {
if hasParallelUnorderedSync(input.Root) {
Expand Down Expand Up @@ -462,6 +466,45 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
}
}

bufferMeta := func(meta []execinfrapb.ProducerMetadata) {
if !s.flowCtx.Local {
s.bufferedMeta = append(s.bufferedMeta, meta...)
return
}
// Given that the synchronizer is draining, it is safe to ignore all
// context cancellation errors in the metadata for local plans. This is
// the case because:
// - if the query should result in an error, then some other error was
// already propagated to the client, and this was the reason for why we
// transitioned into draining;
// - if the query should be successful, yet we have some pending context
// cancellation errors, then it must be the case that query execution
// was short-circuited (e.g. because of the LIMIT), so we can pretend
// the part of the execution that hit the pending error didn't happen
// since clearly it wasn't necessary to compute the query result.
//
// Note that we cannot ignore all errors here since some of them (like
// ReadWithinUncertaintyIntervalError) could poison the txn and need to
// be propagated to the client, so we only swallow the cancellation
// errors here.
// TODO(yuzefovich): the txn could be poisoned even by errors that we're
// swallowing. I think we could (and perhaps should) swallow all errors
// here.
for _, m := range meta {
if m.Err == nil ||
// This is ugly, but the context cancellation if observed in the
// KV layer can result in kvpb errors that don't satisfy
// errors.Is check (because they don't serialize the original
// error), so we have this string matching instead.
(!strings.Contains(m.Err.Error(), context.Canceled.Error()) &&
// If the cancellation is observed by the CancelChecker,
// then it propagates a QueryCanceledError.
!errors.Is(m.Err, cancelchecker.QueryCanceledError)) {
s.bufferedMeta = append(s.bufferedMeta, m)
}
}
}

// Non-blocking drain of batchCh. This is important mostly because of the
// following edge case: all n inputs have pushed batches to the batchCh, so
// there are currently n messages. Next notifies the last read input to
Expand All @@ -478,7 +521,7 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
if msg == nil {
batchChDrained = true
} else if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
default:
batchChDrained = true
Expand All @@ -497,15 +540,15 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta() []execinfrapb.ProducerMetada
// Drain the batchCh, this reads the metadata that was pushed.
for msg := <-s.batchCh; msg != nil; msg = <-s.batchCh {
if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
bufferMeta(msg.meta)
}
}

// Buffer any errors that may have happened without blocking on the channel.
for exitLoop := false; !exitLoop; {
select {
case err := <-s.errCh:
s.bufferedMeta = append(s.bufferedMeta, execinfrapb.ProducerMetadata{Err: err})
bufferMeta([]execinfrapb.ProducerMetadata{{Err: err}})
default:
exitLoop = true
}
Expand Down
39 changes: 37 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/union
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,50 @@ CREATE TABLE t127043_2 (k2 INT, v2 INT, INDEX (k2));
INSERT INTO t127043_2 VALUES (1, 1);
CREATE TABLE t127043_3 (k3 INT, v3 INT, INDEX (k3));
INSERT INTO t127043_3 VALUES (1, 1);
CREATE VIEW v127043 (k, v) AS
CREATE VIEW v127043_3 (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2
UNION SELECT
k3 AS k, v3 AS v FROM t127043_3;
CREATE VIEW v127043_3_idx (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1@t127043_1_k1_idx
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2@t127043_2_k2_idx
UNION SELECT
k3 AS k, v3 AS v FROM t127043_3@t127043_3_k3_idx;
CREATE VIEW v127043_2 (k, v) AS
SELECT
k1 AS k, v1 AS v FROM t127043_1
UNION SELECT
k2 AS k, v2 AS v FROM t127043_2;

statement ok
ANALYZE t127043_1;

statement ok
ANALYZE t127043_2;

statement ok
ANALYZE t127043_3;

# Scan and filter in all UNION branches.
query II
SELECT k, v FROM v127043_3 WHERE k = 1 LIMIT 1;
----
1 1

# Scan and index join in all UNION branches.
query II
SELECT k, v FROM v127043_3_idx WHERE k = 1 LIMIT 1;
----
1 1

# Hash join with two UNION branches (with a scan and an index join) on one side
# and a scan on the other.
query II
SELECT k, v FROM v127043 WHERE k = 1 LIMIT 1;
SELECT k, v FROM v127043_2 INNER JOIN t127043_3 ON k = k3 WHERE k = 1 LIMIT 1;
----
1 1

0 comments on commit 8c0dabf

Please sign in to comment.