Skip to content

Commit

Permalink
Merge #112119
Browse files Browse the repository at this point in the history
112119: ttljob: introduce sql.ttl.default_select_rate_limit cluster setting, ttl_select_rate_limit storage param r=ecwall a=ecwall

Fixes #110742
    
Add sql.ttl.default_select_rate_limit cluster setting and ttl_select_rate_limit table storage param to set TTL select rate limit. This sets the number of records per table per second per node that can be selected by the TTL job.

Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
craig[bot] and ecwall committed Oct 17, 2023
2 parents 45f6344 + d9ec826 commit cddcddd
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 164 deletions.
21 changes: 12 additions & 9 deletions docs/RFCS/20220120_row_level_ttl.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,29 +140,31 @@ message TableDescriptor {
option (gogoproto.equal) = true;
// DurationExpr is the automatically assigned interval for when the TTL should apply to a row.
optional string duration_expr = 1 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
optional string duration_expr = 1 [(gogoproto.nullable) = false, (gogoproto.casttype) = "Expression"];
// SelectBatchSize is the amount of rows that should be fetched at a time
optional int64 select_batch_size = 2 [(gogoproto.nullable)=false];
optional int64 select_batch_size = 2 [(gogoproto.nullable) = false];
// DeleteBatchSize is the amount of rows that should be deleted at a time.
optional int64 delete_batch_size = 3 [(gogoproto.nullable)=false];
optional int64 delete_batch_size = 3 [(gogoproto.nullable) = false];
// DeletionCron signifies how often the TTL deletion job runs in a cron format.
optional string deletion_cron = 4 [(gogoproto.nullable)=false];
optional string deletion_cron = 4 [(gogoproto.nullable) = false];
// ScheduleID is the ID of the row-level TTL job schedules.
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
optional int64 schedule_id = 5 [(gogoproto.customname) = "ScheduleID", (gogoproto.nullable) = false];
// RangeConcurrency is based on the number of spans and is no longer configurable.
reserved 6;
// DeleteRateLimit is the maximum amount of rows to delete per second.
optional int64 delete_rate_limit = 7 [(gogoproto.nullable)=false];
optional int64 delete_rate_limit = 7 [(gogoproto.nullable) = false];
// Pause is set if the TTL job should not run.
optional bool pause = 8 [(gogoproto.nullable)=false];
optional bool pause = 8 [(gogoproto.nullable) = false];
// RowStatsPollInterval is the interval to report row statistics (number of rows on table, number of expired
// rows on table) during row level TTL. If zero, no statistics are reported.
optional int64 row_stats_poll_interval = 9 [(gogoproto.nullable)=false, (gogoproto.casttype)="time.Duration"];
optional int64 row_stats_poll_interval = 9 [(gogoproto.nullable) = false, (gogoproto.casttype) = "time.Duration"];
// LabelMetrics is true if metrics for the TTL job should add a label containing
// the relation name.
optional bool label_metrics = 10 [(gogoproto.nullable) = false];
// ExpirationExpr is the custom assigned expression for calculating when the TTL should apply to a row.
optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
optional string expiration_expr = 11 [(gogoproto.nullable) = false, (gogoproto.casttype) = "Expression"];
// SelectRateLimit is the maximum amount of rows to select per second.
optional int64 select_rate_limit = 12 [(gogoproto.nullable) = false];
}
// ...
Expand All @@ -182,6 +184,7 @@ the following options to control the TTL job:
| `ttl_expiration_expression` | If set, uses the expression specified as the TTL expiration. Defaults to just using the `crdb_internal_expiration` column. |
| `ttl_select_batch_size` | How many rows to fetch from the range that have expired at a given time. Defaults to 500. Must be at least `1`. |
| `ttl_delete_batch_size` | How many rows to delete at a time. Defaults to 100. Must be at least `1`. |
| `ttl_select_rate_limit` | Maximum number of rows to be selected per second (acts as the rate limit). Defaults to 0 (signifying none). |
| `ttl_delete_rate_limit` | Maximum number of rows to be deleted per second (acts as the rate limit). Defaults to 0 (signifying none). |
| `ttl_row_stats_poll_interval` | Whilst the TTL job is running, counts rows and expired rows on the table to report as prometheus metrics. By default unset, meaning no stats are fetched. |
| `ttl_pause` | Stops the TTL job from executing. |
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ sql.trace.txn.enable_threshold duration 0s enables tracing on all transactions;
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job application
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job application
sql.ttl.default_select_rate_limit integer 0 default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application
sql.ttl.job.enabled boolean true whether the TTL job is enabled application
sql.txn.read_committed_syntax.enabled boolean false set to true to allow transactions to use the READ COMMITTED isolation level if specified by BEGIN/SET commands application
sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@
<tr><td><div id="setting-sql-ttl-default-delete-batch-size" class="anchored"><code>sql.ttl.default_delete_batch_size</code></div></td><td>integer</td><td><code>100</code></td><td>default amount of rows to delete in a single query during a TTL job</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-delete-rate-limit" class="anchored"><code>sql.ttl.default_delete_rate_limit</code></div></td><td>integer</td><td><code>0</code></td><td>default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-select-batch-size" class="anchored"><code>sql.ttl.default_select_batch_size</code></div></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-default-select-rate-limit" class="anchored"><code>sql.ttl.default_select_rate_limit</code></div></td><td>integer</td><td><code>0</code></td><td>default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-ttl-job-enabled" class="anchored"><code>sql.ttl.job.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>whether the TTL job is enabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-txn-read-committed-syntax-enabled" class="anchored"><code>sql.txn.read_committed_syntax.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to allow transactions to use the READ COMMITTED isolation level if specified by BEGIN/SET commands</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-txn-fingerprint-id-cache-capacity" class="anchored"><code>sql.txn_fingerprint_id_cache.capacity</code></div></td><td>integer</td><td><code>100</code></td><td>the maximum number of txn fingerprint IDs stored</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/catpb/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ message RowLevelTTL {
optional bool label_metrics = 10 [(gogoproto.nullable) = false];
// ExpirationExpr is the custom assigned expression for calculating when the TTL should apply to a row.
optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
// SelectRateLimit is the maximum amount of rows to select per second.
optional int64 select_rate_limit = 12 [(gogoproto.nullable)=false];
}

// AutoStatsSettings represents settings related to automatic statistics
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2389,6 +2389,9 @@ func (desc *wrapper) GetStorageParams(spaceBetweenEqual bool) []string {
if bs := ttl.DeleteBatchSize; bs != 0 {
appendStorageParam(`ttl_delete_batch_size`, fmt.Sprintf(`%d`, bs))
}
if rl := ttl.SelectRateLimit; rl != 0 {
appendStorageParam(`ttl_select_rate_limit`, fmt.Sprintf(`%d`, rl))
}
if rl := ttl.DeleteRateLimit; rl != 0 {
appendStorageParam(`ttl_delete_rate_limit`, fmt.Sprintf(`%d`, rl))
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/tabledesc/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func ValidateRowLevelTTL(ttl *catpb.RowLevelTTL) error {
return err
}
}
if ttl.SelectRateLimit != 0 {
if err := ValidateTTLRateLimit("ttl_select_rate_limit", ttl.SelectRateLimit); err != nil {
return err
}
}
if ttl.DeleteRateLimit != 0 {
if err := ValidateTTLRateLimit("ttl_delete_rate_limit", ttl.DeleteRateLimit); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/processors_ttl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,7 @@ message TTLSpec {
(gogoproto.customname) = "AOSTDuration",
(gogoproto.stdduration) = true
];

// SelectRateLimit controls how many records can be selected per second.
optional int64 select_rate_limit = 14 [(gogoproto.nullable) = false];
}
54 changes: 45 additions & 9 deletions pkg/sql/logictest/testdata/logic_test/row_level_ttl
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,29 @@ subtest reloptions
statement ok
CREATE TABLE tbl_reloptions (
id INT PRIMARY KEY
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 10, ttl_delete_batch_size=20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
) WITH (
ttl_expire_after = '10 minutes',
ttl_select_batch_size = 10,
ttl_delete_batch_size = 20,
ttl_select_rate_limit = 30,
ttl_delete_rate_limit = 40,
ttl_pause = true,
ttl_row_stats_poll_interval = '1 minute',
ttl_label_metrics = true
)

query T
SELECT reloptions FROM pg_class WHERE relname = 'tbl_reloptions'
query T rowsort
SELECT unnest(reloptions) FROM pg_class WHERE relname = 'tbl_reloptions'
----
{ttl='on',ttl_expire_after='00:10:00':::INTERVAL,ttl_select_batch_size=10,ttl_delete_batch_size=20,ttl_delete_rate_limit=30,ttl_pause=true,ttl_row_stats_poll_interval='1m0s',ttl_label_metrics=true}
ttl='on'
ttl_expire_after='00:10:00':::INTERVAL
ttl_select_batch_size=10
ttl_delete_batch_size=20
ttl_select_rate_limit=30
ttl_delete_rate_limit=40
ttl_pause=true
ttl_row_stats_poll_interval='1m0s'
ttl_label_metrics=true

subtest end

Expand Down Expand Up @@ -724,6 +741,9 @@ ALTER TABLE tbl_ttl_params_positive SET (ttl_select_batch_size = -1)
statement error "ttl_delete_batch_size" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_delete_batch_size = -1)

statement error "ttl_select_rate_limit" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_select_rate_limit = -1)

statement error "ttl_delete_rate_limit" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_delete_rate_limit = -1)

Expand All @@ -737,7 +757,16 @@ subtest set_ttl_params
statement ok
CREATE TABLE tbl_set_ttl_params (
id INT PRIMARY KEY
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 10, ttl_delete_batch_size=20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
) WITH (
ttl_expire_after = '10 minutes',
ttl_select_batch_size = 10,
ttl_delete_batch_size = 20,
ttl_select_rate_limit = 30,
ttl_delete_rate_limit = 40,
ttl_pause = true,
ttl_row_stats_poll_interval = '1 minute',
ttl_label_metrics = true
)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
Expand All @@ -746,10 +775,10 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_select_rate_limit = 30, ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)

statement ok
ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_delete_rate_limit = 130, ttl_row_stats_poll_interval = '2m0s')
ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_row_stats_poll_interval = '2m0s')

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
Expand All @@ -758,10 +787,17 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_delete_rate_limit = 130, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true)
) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true)

statement ok
ALTER TABLE tbl_set_ttl_params RESET (ttl_select_batch_size, ttl_delete_batch_size, ttl_delete_rate_limit, ttl_pause, ttl_row_stats_poll_interval)
ALTER TABLE tbl_set_ttl_params RESET (
ttl_select_batch_size,
ttl_delete_batch_size,
ttl_select_rate_limit,
ttl_delete_rate_limit,
ttl_pause,
ttl_row_stats_poll_interval
)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/storageparam/tablestorageparam/table_storage_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,26 @@ var tableParams = map[string]tableParam{
return nil
},
},
`ttl_select_rate_limit`: {
onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
val, err := paramparse.DatumAsInt(ctx, evalCtx, key, datum)
if err != nil {
return err
}
if err := tabledesc.ValidateTTLRateLimit(key, val); err != nil {
return err
}
rowLevelTTL := po.getOrCreateRowLevelTTL()
rowLevelTTL.SelectRateLimit = val
return nil
},
onReset: func(_ context.Context, po *Setter, evalCtx *eval.Context, key string) error {
if po.hasRowLevelTTL() {
po.UpdatedRowLevelTTL.SelectRateLimit = 0
}
return nil
},
},
`ttl_delete_rate_limit`: {
onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
val, err := paramparse.DatumAsInt(ctx, evalCtx, key, datum)
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/ttl/ttlbase/ttl_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ var (
settings.PositiveInt,
settings.WithPublic,
)
defaultSelectRateLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.ttl.default_select_rate_limit",
"default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.",
0,
settings.NonNegativeInt,
settings.WithPublic,
)
defaultDeleteRateLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.ttl.default_delete_rate_limit",
Expand Down Expand Up @@ -93,6 +101,20 @@ func GetDeleteBatchSize(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
return bs
}

// GetSelectRateLimit returns the table storage param value if specified or
// falls back to the cluster setting.
func GetSelectRateLimit(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
rl := ttl.SelectRateLimit
if rl == 0 {
rl = defaultSelectRateLimit.Get(sv)
}
// Put the maximum tokens possible if there is no rate limit.
if rl == 0 {
rl = math.MaxInt64
}
return rl
}

// GetDeleteRateLimit returns the table storage param value if specified or
// falls back to the cluster setting.
func GetDeleteRateLimit(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
"github.com/cockroachdb/errors"
)

// rowLevelTTLResumer implements the TTL job. The job can run on any node, but
// the job node distributes SELECT/DELETE work via DistSQL to ttlProcessor
// nodes. DistSQL divides work into spans that each ttlProcessor scans in a
// SELECT/DELETE loop.
type rowLevelTTLResumer struct {
job *jobs.Job
st *cluster.Settings
Expand Down Expand Up @@ -185,6 +189,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
jobID := t.job.ID()
selectBatchSize := ttlbase.GetSelectBatchSize(settingsValues, rowLevelTTL)
deleteBatchSize := ttlbase.GetDeleteBatchSize(settingsValues, rowLevelTTL)
selectRateLimit := ttlbase.GetSelectRateLimit(settingsValues, rowLevelTTL)
deleteRateLimit := ttlbase.GetDeleteRateLimit(settingsValues, rowLevelTTL)
newTTLSpec := func(spans []roachpb.Span) *execinfrapb.TTLSpec {
return &execinfrapb.TTLSpec{
Expand All @@ -194,6 +199,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
Spans: spans,
SelectBatchSize: selectBatchSize,
DeleteBatchSize: deleteBatchSize,
SelectRateLimit: selectRateLimit,
DeleteRateLimit: deleteRateLimit,
LabelMetrics: rowLevelTTL.LabelMetrics,
PreDeleteChangeTableVersion: knobs.PreDeleteChangeTableVersion,
Expand Down
Loading

0 comments on commit cddcddd

Please sign in to comment.