Skip to content

Commit

Permalink
changefeedccl: add ignore_disable_changefeed_replication option
Browse files Browse the repository at this point in the history
Release note (enterprise change): A new boolean changefeed option
named `ignore_disable_changefeed_replication` has been added.
When set, the changefeed will not filter events even if CDC
filtering is configured, which is currently available via the
`disable_changefeed_replication` session variable,
`sql.ttl.changefeed_replication.disabled` cluster setting, and the
`ttl_disable_changefeed_replication` table storage parameter.
  • Loading branch information
andyyang890 committed Mar 14, 2024
1 parent 4806184 commit df19639
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 320 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ sql.temp_object_cleaner.wait_interval duration 30m0s how long after creation a t
sql.log.all_statements.enabled boolean false set to true to enable logging of all executed statements application
sql.trace.stmt.enable_threshold duration 0s enables tracing on all statements; statements executing for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold application
sql.trace.txn.enable_threshold duration 0s enables tracing on all transactions; transactions open for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting is coarser-grained than sql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries) application
sql.ttl.changefeed_replication.disabled boolean false if true, deletes issued by TTL will not be replicated via changefeeds application
sql.ttl.changefeed_replication.disabled boolean false if true, deletes issued by TTL will not be replicated via changefeeds (this setting will be ignored by changefeeds that have the ignore_disable_changefeed_replication option set; such changefeeds will continue to replicate all TTL deletes) application
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job application
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@
<tr><td><div id="setting-sql-trace-log-statement-execute" class="anchored"><code>sql.log.all_statements.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to enable logging of all executed statements</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-trace-stmt-enable-threshold" class="anchored"><code>sql.trace.stmt.enable_threshold</code></div></td><td>duration</td><td><code>0s</code></td><td>enables tracing on all statements; statements executing for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-trace-txn-enable-threshold" class="anchored"><code>sql.trace.txn.enable_threshold</code></div></td><td>duration</td><td><code>0s</code></td><td>enables tracing on all transactions; transactions open for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting is coarser-grained than sql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-changefeed-replication-disabled" class="anchored"><code>sql.ttl.changefeed_replication.disabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, deletes issued by TTL will not be replicated via changefeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-changefeed-replication-disabled" class="anchored"><code>sql.ttl.changefeed_replication.disabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, deletes issued by TTL will not be replicated via changefeeds (this setting will be ignored by changefeeds that have the ignore_disable_changefeed_replication option set; such changefeeds will continue to replicate all TTL deletes)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-delete-batch-size" class="anchored"><code>sql.ttl.default_delete_batch_size</code></div></td><td>integer</td><td><code>100</code></td><td>default amount of rows to delete in a single query during a TTL job</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-delete-rate-limit" class="anchored"><code>sql.ttl.default_delete_rate_limit</code></div></td><td>integer</td><td><code>0</code></td><td>default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-select-batch-size" class="anchored"><code>sql.ttl.default_select_batch_size</code></div></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
WithFiltering: filters.WithFiltering,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
Expand Down
68 changes: 37 additions & 31 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,36 @@ const (

// Constants for the options.
const (
OptAvroSchemaPrefix = `avro_schema_prefix`
OptConfluentSchemaRegistry = `confluent_schema_registry`
OptCursor = `cursor`
OptCustomKeyColumn = `key_column`
OptEndTime = `end_time`
OptEnvelope = `envelope`
OptFormat = `format`
OptFullTableName = `full_table_name`
OptKeyInValue = `key_in_value`
OptTopicInValue = `topic_in_value`
OptResolvedTimestamps = `resolved`
OptMinCheckpointFrequency = `min_checkpoint_frequency`
OptUpdatedTimestamps = `updated`
OptMVCCTimestamps = `mvcc_timestamp`
OptDiff = `diff`
OptCompression = `compression`
OptSchemaChangeEvents = `schema_change_events`
OptSchemaChangePolicy = `schema_change_policy`
OptSplitColumnFamilies = `split_column_families`
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptUnordered = `unordered`
OptVirtualColumns = `virtual_columns`
OptExecutionLocality = `execution_locality`
OptLaggingRangesThreshold = `lagging_ranges_threshold`
OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval`
OptAvroSchemaPrefix = `avro_schema_prefix`
OptConfluentSchemaRegistry = `confluent_schema_registry`
OptCursor = `cursor`
OptCustomKeyColumn = `key_column`
OptEndTime = `end_time`
OptEnvelope = `envelope`
OptFormat = `format`
OptFullTableName = `full_table_name`
OptKeyInValue = `key_in_value`
OptTopicInValue = `topic_in_value`
OptResolvedTimestamps = `resolved`
OptMinCheckpointFrequency = `min_checkpoint_frequency`
OptUpdatedTimestamps = `updated`
OptMVCCTimestamps = `mvcc_timestamp`
OptDiff = `diff`
OptCompression = `compression`
OptSchemaChangeEvents = `schema_change_events`
OptSchemaChangePolicy = `schema_change_policy`
OptSplitColumnFamilies = `split_column_families`
OptExpirePTSAfter = `gc_protect_expires_after`
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptUnordered = `unordered`
OptVirtualColumns = `virtual_columns`
OptExecutionLocality = `execution_locality`
OptLaggingRangesThreshold = `lagging_ranges_threshold`
OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval`
OptIgnoreDisableChangefeedReplication = `ignore_disable_changefeed_replication`

OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted`
OptVirtualColumnsNull VirtualColumnVisibility = `null`
Expand Down Expand Up @@ -366,6 +367,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptExecutionLocality: stringOption,
OptLaggingRangesThreshold: durationOption,
OptLaggingRangesPollingInterval: durationOption,
OptIgnoreDisableChangefeedReplication: flagOption,
}

// CommonOptions is options common to all sinks
Expand All @@ -379,6 +381,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, OptCustomKeyColumn,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
OptIgnoreDisableChangefeedReplication,
)

// SQLValidOptions is options exclusive to SQL sink
Expand Down Expand Up @@ -891,14 +894,17 @@ func (s StatementOptions) GetSchemaChangeHandlingOptions() (SchemaChangeHandling
// Filters are aspects of the feed that the backing
// kvfeed or rangefeed want to know about.
type Filters struct {
WithDiff bool
WithDiff bool
WithFiltering bool
}

// GetFilters returns a populated Filters.
func (s StatementOptions) GetFilters() Filters {
_, withDiff := s.m[OptDiff]
_, withIgnoreDisableChangefeedReplication := s.m[OptIgnoreDisableChangefeedReplication]
return Filters{
WithDiff: withDiff,
WithDiff: withDiff,
WithFiltering: !withIgnoreDisableChangefeedReplication,
}
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ type Config struct {
// time, the changefeed job will end with a successful status.
EndTime hlc.Timestamp

// WithFiltering is propagated via the RangefeedRequest to the rangefeed
// server, where if true, the server respects the OmitInRangefeeds flag and
// enables filtering out any transactional writes with that flag set to true.
WithFiltering bool

// Knobs are kvfeed testing knobs.
Knobs TestingKnobs
}
Expand Down Expand Up @@ -113,18 +118,11 @@ func Run(ctx context.Context, cfg Config) error {
return kvevent.NewMemBuffer(cfg.MM.MakeBoundAccount(), &cfg.Settings.SV, cfg.Metrics)
}

// withFiltering is propagated via the RangefeedRequest to the rangefeed
// server, where if true, the server respects the OmitInRangefeeds flag and
// enables filtering out any transactional writes with that flag set to true.
// OmitInRangefeeds is set to true for all transactional writes when the
// disable_changefeed_replication session variable is on.
const withFiltering = true

g := ctxgroup.WithContext(ctx)
f := newKVFeed(
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff, withFiltering,
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
cfg.InitialHighWater, cfg.EndTime,
cfg.Codec,
cfg.SchemaFeed,
Expand Down
Loading

0 comments on commit df19639

Please sign in to comment.