Skip to content

Commit

Permalink
sql: add disable_changefeed_replication session variable
Browse files Browse the repository at this point in the history
This patch adds a `disable_changefeed_replication` session variable
that can be used to disable changefeed replication for changes that
occur within a session. Right now, the session variable has no effect
but in later commits, it will be plumbed to the KV layer.

Release note: None
  • Loading branch information
andyyang890 committed Nov 8, 2023
1 parent f8c553c commit 43717a8
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 0 deletions.
35 changes: 35 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/changefeed
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,38 @@ query TT
SELECT user_name, description FROM [SHOW CHANGEFEED JOBS]
----
testuser CREATE CHANGEFEED FOR TABLE t INTO 'null://sink' WITH OPTIONS (initial_scan = 'only')

subtest disable_changefeed_replication

user root

statement ok
GRANT CHANGEFEED ON t TO testuser

user testuser

query T
SHOW disable_changefeed_replication
----
off

statement ok
CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only'

statement ok
SET disable_changefeed_replication TO true

query T
SHOW disable_changefeed_replication
----
on

statement ok
CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only'

user root

statement ok
REVOKE CHANGEFEED ON t FROM testuser

subtest end
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3682,6 +3682,10 @@ func (m *sessionDataMutator) SetOptimizerUseProvidedOrderingFix(val bool) {
m.data.OptimizerUseProvidedOrderingFix = val
}

func (m *sessionDataMutator) SetDisableChangefeedReplication(val bool) {
m.data.DisableChangefeedReplication = val
}

// Utility functions related to scrubbing sensitive information on SQL Stats.

// quantizeCounts ensures that the Count field in the
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -5498,6 +5498,7 @@ default_transaction_read_only off
default_transaction_use_follower_reads off
default_with_oids off
descriptor_validation on
disable_changefeed_replication off
disable_hoist_projection_in_join_limitation off
disable_partially_distributed_plans off
disable_plan_gists off
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,7 @@ default_transaction_read_only off N
default_transaction_use_follower_reads off NULL NULL NULL string
default_with_oids off NULL NULL NULL string
descriptor_validation on NULL NULL NULL string
disable_changefeed_replication off NULL NULL NULL string
disable_hoist_projection_in_join_limitation off NULL NULL NULL string
disable_partially_distributed_plans off NULL NULL NULL string
disable_plan_gists off NULL NULL NULL string
Expand Down Expand Up @@ -2981,6 +2982,7 @@ default_transaction_read_only off N
default_transaction_use_follower_reads off NULL user NULL off off
default_with_oids off NULL user NULL off off
descriptor_validation on NULL user NULL on on
disable_changefeed_replication off NULL user NULL off off
disable_hoist_projection_in_join_limitation off NULL user NULL off off
disable_partially_distributed_plans off NULL user NULL off off
disable_plan_gists off NULL user NULL off off
Expand Down Expand Up @@ -3142,6 +3144,7 @@ default_transaction_use_follower_reads NULL NULL NULL
default_with_oids NULL NULL NULL NULL NULL
descriptor_validation NULL NULL NULL NULL NULL
direct_columnar_scans_enabled NULL NULL NULL NULL NULL
disable_changefeed_replication NULL NULL NULL NULL NULL
disable_hoist_projection_in_join_limitation NULL NULL NULL NULL NULL
disable_partially_distributed_plans NULL NULL NULL NULL NULL
disable_plan_gists NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ default_transaction_read_only off
default_transaction_use_follower_reads off
default_with_oids off
descriptor_validation on
disable_changefeed_replication off
disable_hoist_projection_in_join_limitation off
disable_partially_distributed_plans off
disable_plan_gists off
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ message LocalOnlySessionData {
// internal errors due to incomplete functional dependencies, and also
// fixes a bug that incorrectly truncated the provided ordering (see #113072).
bool optimizer_use_provided_ordering_fix = 115;
// DisableChangefeedReplication, when true, disables changefeed events from
// being emitted for changes to data made in a session.
// TODO(yang): Plumb this session variable down to KV.
bool disable_changefeed_replication = 116;

///////////////////////////////////////////////////////////////////////////
// WARNING: consider whether a session parameter you're adding needs to //
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -2998,6 +2998,23 @@ var varGen = map[string]sessionVar{
},
GlobalDefault: globalTrue,
},

// CockroachDB extension.
`disable_changefeed_replication`: {
GetStringVal: makePostgresBoolGetStringValFn(`disable_changefeed_replication`),
Set: func(_ context.Context, m sessionDataMutator, s string) error {
b, err := paramparse.ParseBoolVar(`disable_changefeed_replication`, s)
if err != nil {
return err
}
m.SetDisableChangefeedReplication(b)
return nil
},
Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) {
return formatBoolAsPostgresSetting(evalCtx.SessionData().DisableChangefeedReplication), nil
},
GlobalDefault: globalFalse,
},
}

func ReplicationModeFromString(s string) (sessiondatapb.ReplicationMode, error) {
Expand Down

0 comments on commit 43717a8

Please sign in to comment.