From 970ea323f5d3805688c08a40a28ee2951f24113f Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Fri, 23 Feb 2024 15:58:22 -0500 Subject: [PATCH] sql: add ttl_disable_changefeed_replication table storage parameter This patch adds a new `ttl_disable_changefeed_replication` table storage parameter, which can be used to disable changefeed replication for row-level TTL on a per-table basis. Release note (sql change): A new `ttl_disable_changefeed_replication` table storage parameter has been added and can be used to disable changefeed replication for row-level TTL on a per-table basis. --- pkg/cmd/roachtest/tests/cdc_filtering.go | 189 +++++++++++------- pkg/sql/catalog/catpb/catalog.proto | 3 + pkg/sql/catalog/tabledesc/structured.go | 3 + .../testdata/logic_test/row_level_ttl | 17 +- .../tablestorageparam/table_storage_param.go | 17 ++ pkg/sql/ttl/ttlbase/ttl_helpers.go | 7 +- pkg/sql/ttl/ttljob/ttljob.go | 2 +- 7 files changed, 153 insertions(+), 85 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cdc_filtering.go b/pkg/cmd/roachtest/tests/cdc_filtering.go index 9cdebee0c506..4c145885a457 100644 --- a/pkg/cmd/roachtest/tests/cdc_filtering.go +++ b/pkg/cmd/roachtest/tests/cdc_filtering.go @@ -44,13 +44,22 @@ func registerCDCFiltering(r registry.Registry) { Run: runCDCSessionFiltering, }) r.Add(registry.TestSpec{ - Name: "cdc/filtering/ttl", + Name: "cdc/filtering/ttl/cluster", Owner: registry.OwnerCDC, Cluster: r.MakeClusterSpec(3), CompatibleClouds: registry.AllClouds, Suites: registry.Suites(registry.Nightly), RequiresLicense: true, - Run: runCDCTTLFiltering, + Run: runCDCTTLFiltering(ttlFilteringClusterSetting), + }) + r.Add(registry.TestSpec{ + Name: "cdc/filtering/ttl/table", + Owner: registry.OwnerCDC, + Cluster: r.MakeClusterSpec(3), + CompatibleClouds: registry.AllClouds, + Suites: registry.Suites(registry.Nightly), + RequiresLicense: true, + Run: runCDCTTLFiltering(ttlFilteringTableStorageParam), }) } @@ -283,98 +292,124 @@ func checkCDCEvents[S any]( return nil } -func runCDCTTLFiltering(ctx context.Context, t test.Test, c cluster.Cluster) { - t.Status("starting cluster") - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings()) - conn := c.Conn(ctx, t.L(), 1) - defer conn.Close() +// ttlFilteringType is the type of TTL filtering to use in the test function +// returned by runCDCTTLFiltering. +type ttlFilteringType bool + +const ( + // ttlFilteringClusterSetting denotes TTL filtering enabled via setting the + // sql.ttl.changefeed_replication.disabled cluster setting. + ttlFilteringClusterSetting = false + // ttlFilteringTableStorageParam denotes TTL filtering enabled via setting the + // ttl_disable_changefeed_replication storage parameter on the table. + ttlFilteringTableStorageParam = true +) - // kv.rangefeed.enabled is required for changefeeds to run - _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) - require.NoError(t, err) +func runCDCTTLFiltering( + filteringType ttlFilteringType, +) func(ctx context.Context, t test.Test, c cluster.Cluster) { + return func(ctx context.Context, t test.Test, c cluster.Cluster) { + t.Status("starting cluster") + c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings()) + conn := c.Conn(ctx, t.L(), 1) + defer conn.Close() + + // kv.rangefeed.enabled is required for changefeeds to run + _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + require.NoError(t, err) - t.Status("creating table with TTL") - _, err = conn.ExecContext(ctx, `CREATE TABLE events ( + t.Status("creating table with TTL") + _, err = conn.ExecContext(ctx, `CREATE TABLE events ( id STRING PRIMARY KEY, expired_at TIMESTAMPTZ ) WITH (ttl_expiration_expression = 'expired_at', ttl_job_cron = '* * * * *')`) - require.NoError(t, err) + require.NoError(t, err) - t.Status("creating changefeed") - var jobID int - err = conn.QueryRowContext(ctx, `CREATE CHANGEFEED FOR TABLE events + t.Status("creating changefeed") + var jobID int + err = conn.QueryRowContext(ctx, `CREATE CHANGEFEED FOR TABLE events INTO 'nodelocal://1/events' WITH diff, updated, min_checkpoint_frequency = '1s'`).Scan(&jobID) - require.NoError(t, err) - - const ( - expiredTime = "2000-01-01" - notExpiredTime = "2200-01-01" - ) + require.NoError(t, err) - t.Status("insert initial table data") - _, err = conn.Exec(`INSERT INTO events VALUES ('A', $1), ('B', $2)`, expiredTime, notExpiredTime) - require.NoError(t, err) + const ( + expiredTime = "2000-01-01" + notExpiredTime = "2200-01-01" + ) - t.Status("wait for TTL to run and delete rows") - err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now()) - require.NoError(t, err) + t.Status("insert initial table data") + _, err = conn.Exec(`INSERT INTO events VALUES ('A', $1), ('B', $2)`, expiredTime, notExpiredTime) + require.NoError(t, err) - t.Status("check that rows are deleted") - var countA int - err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'A'`).Scan(&countA) - require.NoError(t, err) - require.Equal(t, countA, 0) + t.Status("wait for TTL to run and delete rows") + err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now()) + require.NoError(t, err) - t.Status("set sql.ttl.changefeed_replication.disabled") - _, err = conn.ExecContext(ctx, `SET CLUSTER SETTING sql.ttl.changefeed_replication.disabled = true`) - require.NoError(t, err) + t.Status("check that rows are deleted") + var countA int + err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'A'`).Scan(&countA) + require.NoError(t, err) + require.Equal(t, countA, 0) - t.Status("update remaining rows to be expired") - _, err = conn.Exec(`UPDATE events SET expired_at = $1 WHERE id = 'B'`, expiredTime) - require.NoError(t, err) + switch filteringType { + case ttlFilteringClusterSetting: + t.Status("set sql.ttl.changefeed_replication.disabled cluster setting to true") + _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING sql.ttl.changefeed_replication.disabled = true`) + require.NoError(t, err) + case ttlFilteringTableStorageParam: + t.Status("set ttl_disable_changefeed_replication storage param on events table to true") + _, err := conn.ExecContext(ctx, `ALTER TABLE events SET (ttl_disable_changefeed_replication = true)`) + require.NoError(t, err) + default: + panic("unknown TTL filtering type") + } - t.Status("wait for TTL to run and delete rows") - err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now().Add(time.Minute)) - require.NoError(t, err) + t.Status("update remaining rows to be expired") + _, err = conn.Exec(`UPDATE events SET expired_at = $1 WHERE id = 'B'`, expiredTime) + require.NoError(t, err) - t.Status("check that rows are deleted") - var countB int - err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'B'`).Scan(&countB) - require.NoError(t, err) - require.Equal(t, countB, 0) + t.Status("wait for TTL to run and delete rows") + err = waitForTTL(ctx, conn, "defaultdb.public.events", timeutil.Now().Add(time.Minute)) + require.NoError(t, err) - expectedEvents := []string{ - // initial - "A@2000-01-01T00:00:00Z", "B@2200-01-01T00:00:00Z", - // TTL deletes A - " (before: A@2000-01-01T00:00:00Z)", - // update B to be expired - "B@2000-01-01T00:00:00Z (before: B@2200-01-01T00:00:00Z)", - // TTL deletes B (no events) - } - type state struct { - ID string `json:"id"` - ExpiredAt string `json:"expired_at"` + t.Status("check that rows are deleted") + var countB int + err = conn.QueryRow(`SELECT count(*) FROM events WHERE id = 'B'`).Scan(&countB) + require.NoError(t, err) + require.Equal(t, countB, 0) + + expectedEvents := []string{ + // initial + "A@2000-01-01T00:00:00Z", "B@2200-01-01T00:00:00Z", + // TTL deletes A + " (before: A@2000-01-01T00:00:00Z)", + // update B to be expired + "B@2000-01-01T00:00:00Z (before: B@2200-01-01T00:00:00Z)", + // TTL deletes B (no events) + } + type state struct { + ID string `json:"id"` + ExpiredAt string `json:"expired_at"` + } + err = checkCDCEvents[state](ctx, t, c, conn, jobID, "events", + // Produce a canonical format that we can assert on. The format is of the + // form: id@exp_at (before: id@exp_at)[, id@exp_at (before: id@exp_at), ...] + func(before *state, after *state) string { + var s string + if after == nil { + s += "" + } else { + s += fmt.Sprintf("%s@%s", after.ID, after.ExpiredAt) + } + if before != nil { + s += fmt.Sprintf(" (before: %s@%s)", before.ID, before.ExpiredAt) + } + return s + }, + expectedEvents, + ) + require.NoError(t, err) } - err = checkCDCEvents[state](ctx, t, c, conn, jobID, "events", - // Produce a canonical format that we can assert on. The format is of the - // form: id@exp_at (before: id@exp_at)[, id@exp_at (before: id@exp_at), ...] - func(before *state, after *state) string { - var s string - if after == nil { - s += "" - } else { - s += fmt.Sprintf("%s@%s", after.ID, after.ExpiredAt) - } - if before != nil { - s += fmt.Sprintf(" (before: %s@%s)", before.ID, before.ExpiredAt) - } - return s - }, - expectedEvents, - ) - require.NoError(t, err) } // waitForTTL waits until the row-level TTL job for a given table has run diff --git a/pkg/sql/catalog/catpb/catalog.proto b/pkg/sql/catalog/catpb/catalog.proto index 9ea429d47895..86d497cc8f07 100644 --- a/pkg/sql/catalog/catpb/catalog.proto +++ b/pkg/sql/catalog/catpb/catalog.proto @@ -197,6 +197,9 @@ message RowLevelTTL { optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"]; // SelectRateLimit is the maximum amount of rows to select per second. optional int64 select_rate_limit = 12 [(gogoproto.nullable)=false]; + // DisableChangefeedReplication disables changefeed replication for the + // deletes performed by the TTL job. + optional bool disable_changefeed_replication = 13 [(gogoproto.nullable) = false]; } // AutoStatsSettings represents settings related to automatic statistics diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index a42203cc0f89..1e0ffde0deb1 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -2419,6 +2419,9 @@ func (desc *wrapper) GetStorageParams(spaceBetweenEqual bool) []string { if labelMetrics := ttl.LabelMetrics; labelMetrics { appendStorageParam(`ttl_label_metrics`, fmt.Sprintf(`%t`, labelMetrics)) } + if ttl.DisableChangefeedReplication { + appendStorageParam(`ttl_disable_changefeed_replication`, fmt.Sprintf("%t", ttl.DisableChangefeedReplication)) + } } if exclude := desc.GetExcludeDataFromBackup(); exclude { appendStorageParam(`exclude_data_from_backup`, `true`) diff --git a/pkg/sql/logictest/testdata/logic_test/row_level_ttl b/pkg/sql/logictest/testdata/logic_test/row_level_ttl index d8380ceb5a0a..b1b3cb711653 100644 --- a/pkg/sql/logictest/testdata/logic_test/row_level_ttl +++ b/pkg/sql/logictest/testdata/logic_test/row_level_ttl @@ -233,7 +233,8 @@ CREATE TABLE tbl_reloptions ( ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', - ttl_label_metrics = true + ttl_label_metrics = true, + ttl_disable_changefeed_replication = true ) query T rowsort @@ -248,6 +249,7 @@ ttl_delete_rate_limit=40 ttl_pause=true ttl_row_stats_poll_interval='1m0s' ttl_label_metrics=true +ttl_disable_changefeed_replication=true subtest end @@ -775,7 +777,8 @@ CREATE TABLE tbl_set_ttl_params ( ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', - ttl_label_metrics = true + ttl_label_metrics = true, + ttl_disable_changefeed_replication = true ) query T @@ -785,7 +788,7 @@ CREATE TABLE public.tbl_set_ttl_params ( id INT8 NOT NULL, crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL, CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC) -) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_select_rate_limit = 30, ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true) +) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_select_rate_limit = 30, ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true, ttl_disable_changefeed_replication = true) statement ok ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_row_stats_poll_interval = '2m0s') @@ -797,7 +800,7 @@ CREATE TABLE public.tbl_set_ttl_params ( id INT8 NOT NULL, crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL, CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC) -) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true) +) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true, ttl_disable_changefeed_replication = true) statement ok ALTER TABLE tbl_set_ttl_params RESET ( @@ -806,7 +809,9 @@ ALTER TABLE tbl_set_ttl_params RESET ( ttl_select_rate_limit, ttl_delete_rate_limit, ttl_pause, - ttl_row_stats_poll_interval + ttl_row_stats_poll_interval, + ttl_label_metrics, + ttl_disable_changefeed_replication ) query T @@ -816,7 +821,7 @@ CREATE TABLE public.tbl_set_ttl_params ( id INT8 NOT NULL, crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL, CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC) -) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_label_metrics = true) +) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL) subtest end diff --git a/pkg/sql/storageparam/tablestorageparam/table_storage_param.go b/pkg/sql/storageparam/tablestorageparam/table_storage_param.go index a04d47058179..ddf3d129224b 100644 --- a/pkg/sql/storageparam/tablestorageparam/table_storage_param.go +++ b/pkg/sql/storageparam/tablestorageparam/table_storage_param.go @@ -439,6 +439,23 @@ var tableParams = map[string]tableParam{ return nil }, }, + `ttl_disable_changefeed_replication`: { + onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error { + b, err := boolFromDatum(ctx, evalCtx, key, datum) + if err != nil { + return err + } + rowLevelTTL := po.getOrCreateRowLevelTTL() + rowLevelTTL.DisableChangefeedReplication = b + return nil + }, + onReset: func(ctx context.Context, po *Setter, evalCtx *eval.Context, key string) error { + if po.hasRowLevelTTL() { + po.UpdatedRowLevelTTL.DisableChangefeedReplication = false + } + return nil + }, + }, `exclude_data_from_backup`: { onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error { diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go index 5bbd8c183639..435b94c90a9b 100644 --- a/pkg/sql/ttl/ttlbase/ttl_helpers.go +++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go @@ -153,7 +153,12 @@ func CheckJobEnabled(settingsValues *settings.Values) error { // GetChangefeedReplicationDisabled returns whether changefeed replication // should be disabled for this job based on the relevant cluster setting. -func GetChangefeedReplicationDisabled(settingsValues *settings.Values) bool { +func GetChangefeedReplicationDisabled( + settingsValues *settings.Values, ttl *catpb.RowLevelTTL, +) bool { + if ttl.DisableChangefeedReplication { + return true + } return changefeedReplicationDisabled.Get(settingsValues) } diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 5eb68923a449..94023a1271cf 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -205,7 +205,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re deleteBatchSize := ttlbase.GetDeleteBatchSize(settingsValues, rowLevelTTL) selectRateLimit := ttlbase.GetSelectRateLimit(settingsValues, rowLevelTTL) deleteRateLimit := ttlbase.GetDeleteRateLimit(settingsValues, rowLevelTTL) - disableChangefeedReplication := ttlbase.GetChangefeedReplicationDisabled(settingsValues) + disableChangefeedReplication := ttlbase.GetChangefeedReplicationDisabled(settingsValues, rowLevelTTL) newTTLSpec := func(spans []roachpb.Span) *execinfrapb.TTLSpec { return &execinfrapb.TTLSpec{ JobID: jobID,