From 2982e435b44884e82a679c5c89061d00c0a0dd54 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 7 Nov 2024 11:03:28 -0800 Subject: [PATCH] colexec: add session variable to disable eager cancellation This commit adds a session variable that allows us to disable the eager cancellation that is performed by the parallel unordered synchronizer in local flows in some cases when it transitions into draining state. This will serve as an escape hatch in case we find more issues with this feature. Release note: None --- pkg/sql/colexec/BUILD.bazel | 1 + .../colexec/parallel_unordered_synchronizer.go | 11 ++++++++++- pkg/sql/exec_util.go | 4 ++++ .../testdata/logic_test/information_schema | 1 + .../logictest/testdata/logic_test/pg_catalog | 3 +++ .../logictest/testdata/logic_test/show_source | 1 + .../sessiondatapb/local_only_session_data.proto | 4 ++++ pkg/sql/vars.go | 17 +++++++++++++++++ 8 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pkg/sql/colexec/BUILD.bazel b/pkg/sql/colexec/BUILD.bazel index 53a1f439c068..3ddc88ae3a72 100644 --- a/pkg/sql/colexec/BUILD.bazel +++ b/pkg/sql/colexec/BUILD.bazel @@ -73,6 +73,7 @@ go_library( "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sqltelemetry", # keep "//pkg/sql/types", "//pkg/util/buildutil", diff --git a/pkg/sql/colexec/parallel_unordered_synchronizer.go b/pkg/sql/colexec/parallel_unordered_synchronizer.go index 3607c349bf2d..a892be31a2cb 100644 --- a/pkg/sql/colexec/parallel_unordered_synchronizer.go +++ b/pkg/sql/colexec/parallel_unordered_synchronizer.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -123,6 +124,14 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execopnode. return s.inputs[nth].Root } +func eagerCancellationDisabled(flowCtx *execinfra.FlowCtx) bool { + var sd *sessiondata.SessionData + if flowCtx.EvalCtx != nil { // EvalCtx can be nil in tests + sd = flowCtx.EvalCtx.SessionData() + } + return !flowCtx.Local || (sd != nil && sd.DisableVecUnionEagerCancellation) +} + // NewParallelUnorderedSynchronizer creates a new ParallelUnorderedSynchronizer. // On the first call to Next, len(inputs) goroutines are spawned to read each // input asynchronously (to not be limited by a slow input). These will @@ -177,7 +186,7 @@ func (s *ParallelUnorderedSynchronizer) Init(ctx context.Context) { s.inputCtxs[i], s.tracingSpans[i] = execinfra.ProcessorSpan( s.Ctx, s.flowCtx, fmt.Sprintf("parallel unordered sync input %d", i), s.processorID, ) - if s.flowCtx.Local { + if !eagerCancellationDisabled(s.flowCtx) { // If the plan is local, there are no colrpc.Inboxes in this input // tree, and the synchronizer can cancel the current work eagerly // when transitioning into draining. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 006f8c56db3e..604ab9dd9e63 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3271,6 +3271,10 @@ func (m *sessionDataMutator) SetPartiallyDistributedPlansDisabled(val bool) { m.data.PartiallyDistributedPlansDisabled = val } +func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) { + m.data.DisableVecUnionEagerCancellation = val +} + func (m *sessionDataMutator) SetRequireExplicitPrimaryKeys(val bool) { m.data.RequireExplicitPrimaryKeys = val } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 5ee0b87b0257..0bdbd459abdb 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -6107,6 +6107,7 @@ disable_changefeed_replication off disable_hoist_projection_in_join_limitation off disable_partially_distributed_plans off disable_plan_gists off +disable_vec_union_eager_cancellation off disallow_full_table_scans off distsql_plan_gateway_bias 2 enable_auto_rehoming off diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 8b36fea3aaab..7f7ffe4043b7 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2862,6 +2862,7 @@ disable_changefeed_replication off N disable_hoist_projection_in_join_limitation off NULL NULL NULL string disable_partially_distributed_plans off NULL NULL NULL string disable_plan_gists off NULL NULL NULL string +disable_vec_union_eager_cancellation off NULL NULL NULL string disallow_full_table_scans off NULL NULL NULL string distsql off NULL NULL NULL string distsql_plan_gateway_bias 2 NULL NULL NULL string @@ -3051,6 +3052,7 @@ disable_changefeed_replication off N disable_hoist_projection_in_join_limitation off NULL user NULL off off disable_partially_distributed_plans off NULL user NULL off off disable_plan_gists off NULL user NULL off off +disable_vec_union_eager_cancellation off NULL user NULL off off disallow_full_table_scans off NULL user NULL off off distsql off NULL user NULL off off distsql_plan_gateway_bias 2 NULL user NULL 2 2 @@ -3236,6 +3238,7 @@ disable_changefeed_replication NULL NULL NULL disable_hoist_projection_in_join_limitation NULL NULL NULL NULL NULL disable_partially_distributed_plans NULL NULL NULL NULL NULL disable_plan_gists NULL NULL NULL NULL NULL +disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL disallow_full_table_scans NULL NULL NULL NULL NULL distsql NULL NULL NULL NULL NULL distsql_plan_gateway_bias NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 6637cec1936a..504efb25d2ed 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -63,6 +63,7 @@ disable_changefeed_replication off disable_hoist_projection_in_join_limitation off disable_partially_distributed_plans off disable_plan_gists off +disable_vec_union_eager_cancellation off disallow_full_table_scans off distsql off distsql_plan_gateway_bias 2 diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index b29f030f901d..21731b560884 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -535,6 +535,10 @@ message LocalOnlySessionData { // OptimizerPushLimitIntoProjectFilteredScan, when true, indicates that the // optimizer should push limit expressions into projects of filtered scans. bool optimizer_push_limit_into_project_filtered_scan = 139; + // DisableVecUnionEagerCancellation disables the eager cancellation that is + // performed by the vectorized engine when transitioning into the draining + // state in some cases. + bool disable_vec_union_eager_cancellation = 143; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index f89dc32a63ee..41843fb09eb9 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -648,6 +648,23 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `disable_vec_union_eager_cancellation`: { + GetStringVal: makePostgresBoolGetStringValFn(`disable_vec_union_eager_cancellation`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := paramparse.ParseBoolVar("disable_vec_union_eager_cancellation", s) + if err != nil { + return err + } + m.SetDisableVecUnionEagerCancellation(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatBoolAsPostgresSetting(evalCtx.SessionData().DisableVecUnionEagerCancellation), nil + }, + GlobalDefault: globalFalse, + }, + // CockroachDB extension. `enable_zigzag_join`: { GetStringVal: makePostgresBoolGetStringValFn(`enable_zigzag_join`),