Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.2: colexec: add session variable to disable eager cancellation #134905

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltelemetry", # keep
"//pkg/sql/types",
"//pkg/util/buildutil",
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -123,6 +124,14 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execopnode.
return s.inputs[nth].Root
}

func eagerCancellationDisabled(flowCtx *execinfra.FlowCtx) bool {
var sd *sessiondata.SessionData
if flowCtx.EvalCtx != nil { // EvalCtx can be nil in tests
sd = flowCtx.EvalCtx.SessionData()
}
return !flowCtx.Local || (sd != nil && sd.DisableVecUnionEagerCancellation)
}

// NewParallelUnorderedSynchronizer creates a new ParallelUnorderedSynchronizer.
// On the first call to Next, len(inputs) goroutines are spawned to read each
// input asynchronously (to not be limited by a slow input). These will
Expand Down Expand Up @@ -177,7 +186,7 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) {
s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan(
s.Ctx, s.flowCtx, fmt.Sprintf("parallel unordered sync input %d", i), s.processorID,
)
if s.flowCtx.Local {
if !eagerCancellationDisabled(s.flowCtx) {
// If the plan is local, there are no colrpc.Inboxes in this input
// tree, and the synchronizer can cancel the current work eagerly
// when transitioning into draining.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,6 +3271,10 @@ func (m *sessionDataMutator) SetPartiallyDistributedPlansDisabled(val bool) {
m.data.PartiallyDistributedPlansDisabled = val
}

func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) {
m.data.DisableVecUnionEagerCancellation = val
}

func (m *sessionDataMutator) SetRequireExplicitPrimaryKeys(val bool) {
m.data.RequireExplicitPrimaryKeys = val
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -6107,6 +6107,7 @@ disable_changefeed_replication off
disable_hoist_projection_in_join_limitation off
disable_partially_distributed_plans off
disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distsql_plan_gateway_bias 2
enable_auto_rehoming off
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2862,6 +2862,7 @@ disable_changefeed_replication off N
disable_hoist_projection_in_join_limitation off NULL NULL NULL string
disable_partially_distributed_plans off NULL NULL NULL string
disable_plan_gists off NULL NULL NULL string
disable_vec_union_eager_cancellation off NULL NULL NULL string
disallow_full_table_scans off NULL NULL NULL string
distsql off NULL NULL NULL string
distsql_plan_gateway_bias 2 NULL NULL NULL string
Expand Down Expand Up @@ -3051,6 +3052,7 @@ disable_changefeed_replication off N
disable_hoist_projection_in_join_limitation off NULL user NULL off off
disable_partially_distributed_plans off NULL user NULL off off
disable_plan_gists off NULL user NULL off off
disable_vec_union_eager_cancellation off NULL user NULL off off
disallow_full_table_scans off NULL user NULL off off
distsql off NULL user NULL off off
distsql_plan_gateway_bias 2 NULL user NULL 2 2
Expand Down Expand Up @@ -3236,6 +3238,7 @@ disable_changefeed_replication NULL NULL NULL
disable_hoist_projection_in_join_limitation NULL NULL NULL NULL NULL
disable_partially_distributed_plans NULL NULL NULL NULL NULL
disable_plan_gists NULL NULL NULL NULL NULL
disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL
disallow_full_table_scans NULL NULL NULL NULL NULL
distsql NULL NULL NULL NULL NULL
distsql_plan_gateway_bias NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ disable_changefeed_replication off
disable_hoist_projection_in_join_limitation off
disable_partially_distributed_plans off
disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distsql off
distsql_plan_gateway_bias 2
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ message LocalOnlySessionData {
// OptimizerPushLimitIntoProjectFilteredScan, when true, indicates that the
// optimizer should push limit expressions into projects of filtered scans.
bool optimizer_push_limit_into_project_filtered_scan = 139;
// DisableVecUnionEagerCancellation disables the eager cancellation that is
// performed by the vectorized engine when transitioning into the draining
// state in some cases.
bool disable_vec_union_eager_cancellation = 143;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,23 @@ var varGen = map[string]sessionVar{
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`disable_vec_union_eager_cancellation`: {
GetStringVal: makePostgresBoolGetStringValFn(`disable_vec_union_eager_cancellation`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := paramparse.ParseBoolVar("disable_vec_union_eager_cancellation", s)
if err != nil {
return err
}
m.SetDisableVecUnionEagerCancellation(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return formatBoolAsPostgresSetting(evalCtx.SessionData().DisableVecUnionEagerCancellation), nil
},
GlobalDefault: globalFalse,
},

// CockroachDB extension.
`enable_zigzag_join`: {
GetStringVal: makePostgresBoolGetStringValFn(`enable_zigzag_join`),
Expand Down