Skip to content

Commit

Permalink
colexec: add session variable to disable eager cancellation
Browse files Browse the repository at this point in the history
This commit adds a session variable that allows us to disable the eager
cancellation that is performed by the parallel unordered synchronizer in
local flows in some cases when it transitions into draining state. This
will serve as an escape hatch in case we find more issues with this
feature.

Release note: None
  • Loading branch information
yuzefovich committed Nov 11, 2024
1 parent a308cc9 commit 2982e43
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltelemetry", # keep
"//pkg/sql/types",
"//pkg/util/buildutil",
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -123,6 +124,14 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execopnode.
return s.inputs[nth].Root
}

func eagerCancellationDisabled(flowCtx *execinfra.FlowCtx) bool {
var sd *sessiondata.SessionData
if flowCtx.EvalCtx != nil { // EvalCtx can be nil in tests
sd = flowCtx.EvalCtx.SessionData()
}
return !flowCtx.Local || (sd != nil && sd.DisableVecUnionEagerCancellation)
}

// NewParallelUnorderedSynchronizer creates a new ParallelUnorderedSynchronizer.
// On the first call to Next, len(inputs) goroutines are spawned to read each
// input asynchronously (to not be limited by a slow input). These will
Expand Down Expand Up @@ -177,7 +186,7 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) {
s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, fmt.Sprintf("parallel unordered sync input %d", i), s.processorID,
)
if s.flowCtx.Local {
if !eagerCancellationDisabled(s.flowCtx) {
// If the plan is local, there are no colrpc.Inboxes in this input
// tree, and the synchronizer can cancel the current work eagerly
// when transitioning into draining.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,6 +3271,10 @@ func (m *sessionDataMutator) SetPartiallyDistributedPlansDisabled(val bool) {
m.data.PartiallyDistributedPlansDisabled = val
}

func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) {
m.data.DisableVecUnionEagerCancellation = val
}

func (m *sessionDataMutator) SetRequireExplicitPrimaryKeys(val bool) {
m.data.RequireExplicitPrimaryKeys = val
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -6107,6 +6107,7 @@ disable_changefeed_replication off
disable_hoist_projection_in_join_limitation off
disable_partially_distributed_plans off
disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distsql_plan_gateway_bias 2
enable_auto_rehoming off
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2862,6 +2862,7 @@ disable_changefeed_replication off N
disable_hoist_projection_in_join_limitation off NULL NULL NULL string
disable_partially_distributed_plans off NULL NULL NULL string
disable_plan_gists off NULL NULL NULL string
disable_vec_union_eager_cancellation off NULL NULL NULL string
disallow_full_table_scans off NULL NULL NULL string
distsql off NULL NULL NULL string
distsql_plan_gateway_bias 2 NULL NULL NULL string
Expand Down Expand Up @@ -3051,6 +3052,7 @@ disable_changefeed_replication off N
disable_hoist_projection_in_join_limitation off NULL user NULL off off
disable_partially_distributed_plans off NULL user NULL off off
disable_plan_gists off NULL user NULL off off
disable_vec_union_eager_cancellation off NULL user NULL off off
disallow_full_table_scans off NULL user NULL off off
distsql off NULL user NULL off off
distsql_plan_gateway_bias 2 NULL user NULL 2 2
Expand Down Expand Up @@ -3236,6 +3238,7 @@ disable_changefeed_replication NULL NULL NULL
disable_hoist_projection_in_join_limitation NULL NULL NULL NULL NULL
disable_partially_distributed_plans NULL NULL NULL NULL NULL
disable_plan_gists NULL NULL NULL NULL NULL
disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL
disallow_full_table_scans NULL NULL NULL NULL NULL
distsql NULL NULL NULL NULL NULL
distsql_plan_gateway_bias NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ disable_changefeed_replication off
disable_hoist_projection_in_join_limitation off
disable_partially_distributed_plans off
disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distsql off
distsql_plan_gateway_bias 2
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ message LocalOnlySessionData {
// OptimizerPushLimitIntoProjectFilteredScan, when true, indicates that the
// optimizer should push limit expressions into projects of filtered scans.
bool optimizer_push_limit_into_project_filtered_scan = 139;
// DisableVecUnionEagerCancellation disables the eager cancellation that is
// performed by the vectorized engine when transitioning into the draining
// state in some cases.
bool disable_vec_union_eager_cancellation = 143;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,23 @@ var varGen = map[string]sessionVar{
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`disable_vec_union_eager_cancellation`: {
GetStringVal: makePostgresBoolGetStringValFn(`disable_vec_union_eager_cancellation`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := paramparse.ParseBoolVar("disable_vec_union_eager_cancellation", s)
if err != nil {
return err
}
m.SetDisableVecUnionEagerCancellation(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return formatBoolAsPostgresSetting(evalCtx.SessionData().DisableVecUnionEagerCancellation), nil
},
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`enable_zigzag_join`: {
GetStringVal: makePostgresBoolGetStringValFn(`enable_zigzag_join`),
Expand Down

0 comments on commit 2982e43

Please sign in to comment.