diff --git a/docs/RFCS/20220120_row_level_ttl.md b/docs/RFCS/20220120_row_level_ttl.md
index 293db662088e..8204ac456334 100644
--- a/docs/RFCS/20220120_row_level_ttl.md
+++ b/docs/RFCS/20220120_row_level_ttl.md
@@ -140,29 +140,31 @@ message TableDescriptor {
option (gogoproto.equal) = true;
// DurationExpr is the automatically assigned interval for when the TTL should apply to a row.
- optional string duration_expr = 1 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
+ optional string duration_expr = 1 [(gogoproto.nullable) = false, (gogoproto.casttype) = "Expression"];
// SelectBatchSize is the amount of rows that should be fetched at a time
- optional int64 select_batch_size = 2 [(gogoproto.nullable)=false];
+ optional int64 select_batch_size = 2 [(gogoproto.nullable) = false];
// DeleteBatchSize is the amount of rows that should be deleted at a time.
- optional int64 delete_batch_size = 3 [(gogoproto.nullable)=false];
+ optional int64 delete_batch_size = 3 [(gogoproto.nullable) = false];
// DeletionCron signifies how often the TTL deletion job runs in a cron format.
- optional string deletion_cron = 4 [(gogoproto.nullable)=false];
+ optional string deletion_cron = 4 [(gogoproto.nullable) = false];
// ScheduleID is the ID of the row-level TTL job schedules.
- optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
+ optional int64 schedule_id = 5 [(gogoproto.customname) = "ScheduleID", (gogoproto.nullable) = false];
// RangeConcurrency is based on the number of spans and is no longer configurable.
reserved 6;
// DeleteRateLimit is the maximum amount of rows to delete per second.
- optional int64 delete_rate_limit = 7 [(gogoproto.nullable)=false];
+ optional int64 delete_rate_limit = 7 [(gogoproto.nullable) = false];
// Pause is set if the TTL job should not run.
- optional bool pause = 8 [(gogoproto.nullable)=false];
+ optional bool pause = 8 [(gogoproto.nullable) = false];
// RowStatsPollInterval is the interval to report row statistics (number of rows on table, number of expired
// rows on table) during row level TTL. If zero, no statistics are reported.
- optional int64 row_stats_poll_interval = 9 [(gogoproto.nullable)=false, (gogoproto.casttype)="time.Duration"];
+ optional int64 row_stats_poll_interval = 9 [(gogoproto.nullable) = false, (gogoproto.casttype) = "time.Duration"];
// LabelMetrics is true if metrics for the TTL job should add a label containing
// the relation name.
optional bool label_metrics = 10 [(gogoproto.nullable) = false];
// ExpirationExpr is the custom assigned expression for calculating when the TTL should apply to a row.
- optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
+ optional string expiration_expr = 11 [(gogoproto.nullable) = false, (gogoproto.casttype) = "Expression"];
+ // SelectRateLimit is the maximum amount of rows to select per second.
+ optional int64 select_rate_limit = 12 [(gogoproto.nullable) = false];
}
// ...
@@ -182,6 +184,7 @@ the following options to control the TTL job:
| `ttl_expiration_expression` | If set, uses the expression specified as the TTL expiration. Defaults to just using the `crdb_internal_expiration` column. |
| `ttl_select_batch_size` | How many rows to fetch from the range that have expired at a given time. Defaults to 500. Must be at least `1`. |
| `ttl_delete_batch_size` | How many rows to delete at a time. Defaults to 100. Must be at least `1`. |
+| `ttl_select_rate_limit` | Maximum number of rows to be selected per second (acts as the rate limit). Defaults to 0 (signifying none). |
| `ttl_delete_rate_limit` | Maximum number of rows to be deleted per second (acts as the rate limit). Defaults to 0 (signifying none). |
| `ttl_row_stats_poll_interval` | Whilst the TTL job is running, counts rows and expired rows on the table to report as prometheus metrics. By default unset, meaning no stats are fetched. |
| `ttl_pause` | Stops the TTL job from executing. |
diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index f33bddb97958..b2f96406ea9c 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -305,6 +305,7 @@ sql.trace.txn.enable_threshold duration 0s enables tracing on all transactions;
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job application
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job application
+sql.ttl.default_select_rate_limit integer 0 default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. application
sql.ttl.job.enabled boolean true whether the TTL job is enabled application
sql.txn.read_committed_syntax.enabled boolean false set to true to allow transactions to use the READ COMMITTED isolation level if specified by BEGIN/SET commands application
sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored application
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index a7f4a257477d..b2fcc38fc943 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -255,6 +255,7 @@
sql.ttl.default_delete_batch_size
| integer | 100 | default amount of rows to delete in a single query during a TTL job | Serverless/Dedicated/Self-Hosted |
sql.ttl.default_delete_rate_limit
| integer | 0 | default delete rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. | Serverless/Dedicated/Self-Hosted |
sql.ttl.default_select_batch_size
| integer | 500 | default amount of rows to select in a single query during a TTL job | Serverless/Dedicated/Self-Hosted |
+sql.ttl.default_select_rate_limit
| integer | 0 | default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit. | Serverless/Dedicated/Self-Hosted |
sql.ttl.job.enabled
| boolean | true | whether the TTL job is enabled | Serverless/Dedicated/Self-Hosted |
sql.txn.read_committed_syntax.enabled
| boolean | false | set to true to allow transactions to use the READ COMMITTED isolation level if specified by BEGIN/SET commands | Serverless/Dedicated/Self-Hosted |
sql.txn_fingerprint_id_cache.capacity
| integer | 100 | the maximum number of txn fingerprint IDs stored | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/sql/catalog/catpb/catalog.proto b/pkg/sql/catalog/catpb/catalog.proto
index 11271cc864da..7e99c958b586 100644
--- a/pkg/sql/catalog/catpb/catalog.proto
+++ b/pkg/sql/catalog/catpb/catalog.proto
@@ -195,6 +195,8 @@ message RowLevelTTL {
optional bool label_metrics = 10 [(gogoproto.nullable) = false];
// ExpirationExpr is the custom assigned expression for calculating when the TTL should apply to a row.
optional string expiration_expr = 11 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
+ // SelectRateLimit is the maximum amount of rows to select per second.
+ optional int64 select_rate_limit = 12 [(gogoproto.nullable)=false];
}
// AutoStatsSettings represents settings related to automatic statistics
diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go
index 36fe052e7ac8..e5c19cb977db 100644
--- a/pkg/sql/catalog/tabledesc/structured.go
+++ b/pkg/sql/catalog/tabledesc/structured.go
@@ -2389,6 +2389,9 @@ func (desc *wrapper) GetStorageParams(spaceBetweenEqual bool) []string {
if bs := ttl.DeleteBatchSize; bs != 0 {
appendStorageParam(`ttl_delete_batch_size`, fmt.Sprintf(`%d`, bs))
}
+ if rl := ttl.SelectRateLimit; rl != 0 {
+ appendStorageParam(`ttl_select_rate_limit`, fmt.Sprintf(`%d`, rl))
+ }
if rl := ttl.DeleteRateLimit; rl != 0 {
appendStorageParam(`ttl_delete_rate_limit`, fmt.Sprintf(`%d`, rl))
}
diff --git a/pkg/sql/catalog/tabledesc/ttl.go b/pkg/sql/catalog/tabledesc/ttl.go
index 90002cb46871..d9f96ff05765 100644
--- a/pkg/sql/catalog/tabledesc/ttl.go
+++ b/pkg/sql/catalog/tabledesc/ttl.go
@@ -51,6 +51,11 @@ func ValidateRowLevelTTL(ttl *catpb.RowLevelTTL) error {
return err
}
}
+ if ttl.SelectRateLimit != 0 {
+ if err := ValidateTTLRateLimit("ttl_select_rate_limit", ttl.SelectRateLimit); err != nil {
+ return err
+ }
+ }
if ttl.DeleteRateLimit != 0 {
if err := ValidateTTLRateLimit("ttl_delete_rate_limit", ttl.DeleteRateLimit); err != nil {
return err
diff --git a/pkg/sql/execinfrapb/processors_ttl.proto b/pkg/sql/execinfrapb/processors_ttl.proto
index fdb1848c1b55..cbb2855c6a79 100644
--- a/pkg/sql/execinfrapb/processors_ttl.proto
+++ b/pkg/sql/execinfrapb/processors_ttl.proto
@@ -87,4 +87,7 @@ message TTLSpec {
(gogoproto.customname) = "AOSTDuration",
(gogoproto.stdduration) = true
];
+
+ // SelectRateLimit controls how many records can be selected per second.
+ optional int64 select_rate_limit = 14 [(gogoproto.nullable) = false];
}
diff --git a/pkg/sql/logictest/testdata/logic_test/row_level_ttl b/pkg/sql/logictest/testdata/logic_test/row_level_ttl
index 599b0d41ab6e..208055250d01 100644
--- a/pkg/sql/logictest/testdata/logic_test/row_level_ttl
+++ b/pkg/sql/logictest/testdata/logic_test/row_level_ttl
@@ -217,12 +217,29 @@ subtest reloptions
statement ok
CREATE TABLE tbl_reloptions (
id INT PRIMARY KEY
-) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 10, ttl_delete_batch_size=20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
+) WITH (
+ ttl_expire_after = '10 minutes',
+ ttl_select_batch_size = 10,
+ ttl_delete_batch_size = 20,
+ ttl_select_rate_limit = 30,
+ ttl_delete_rate_limit = 40,
+ ttl_pause = true,
+ ttl_row_stats_poll_interval = '1 minute',
+ ttl_label_metrics = true
+)
-query T
-SELECT reloptions FROM pg_class WHERE relname = 'tbl_reloptions'
+query T rowsort
+SELECT unnest(reloptions) FROM pg_class WHERE relname = 'tbl_reloptions'
----
-{ttl='on',ttl_expire_after='00:10:00':::INTERVAL,ttl_select_batch_size=10,ttl_delete_batch_size=20,ttl_delete_rate_limit=30,ttl_pause=true,ttl_row_stats_poll_interval='1m0s',ttl_label_metrics=true}
+ttl='on'
+ttl_expire_after='00:10:00':::INTERVAL
+ttl_select_batch_size=10
+ttl_delete_batch_size=20
+ttl_select_rate_limit=30
+ttl_delete_rate_limit=40
+ttl_pause=true
+ttl_row_stats_poll_interval='1m0s'
+ttl_label_metrics=true
subtest end
@@ -724,6 +741,9 @@ ALTER TABLE tbl_ttl_params_positive SET (ttl_select_batch_size = -1)
statement error "ttl_delete_batch_size" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_delete_batch_size = -1)
+statement error "ttl_select_rate_limit" must be at least 1
+ALTER TABLE tbl_ttl_params_positive SET (ttl_select_rate_limit = -1)
+
statement error "ttl_delete_rate_limit" must be at least 1
ALTER TABLE tbl_ttl_params_positive SET (ttl_delete_rate_limit = -1)
@@ -737,7 +757,16 @@ subtest set_ttl_params
statement ok
CREATE TABLE tbl_set_ttl_params (
id INT PRIMARY KEY
-) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 10, ttl_delete_batch_size=20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
+) WITH (
+ ttl_expire_after = '10 minutes',
+ ttl_select_batch_size = 10,
+ ttl_delete_batch_size = 20,
+ ttl_select_rate_limit = 30,
+ ttl_delete_rate_limit = 40,
+ ttl_pause = true,
+ ttl_row_stats_poll_interval = '1 minute',
+ ttl_label_metrics = true
+)
query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
@@ -746,10 +775,10 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
-) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_delete_rate_limit = 30, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
+) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 10, ttl_delete_batch_size = 20, ttl_select_rate_limit = 30, ttl_delete_rate_limit = 40, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
statement ok
-ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_delete_rate_limit = 130, ttl_row_stats_poll_interval = '2m0s')
+ALTER TABLE tbl_set_ttl_params SET (ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_row_stats_poll_interval = '2m0s')
query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
@@ -758,10 +787,17 @@ CREATE TABLE public.tbl_set_ttl_params (
id INT8 NOT NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_set_ttl_params_pkey PRIMARY KEY (id ASC)
-) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_delete_rate_limit = 130, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true)
+) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 110, ttl_delete_batch_size = 120, ttl_select_rate_limit = 130, ttl_delete_rate_limit = 140, ttl_pause = true, ttl_row_stats_poll_interval = '2m0s', ttl_label_metrics = true)
statement ok
-ALTER TABLE tbl_set_ttl_params RESET (ttl_select_batch_size, ttl_delete_batch_size, ttl_delete_rate_limit, ttl_pause, ttl_row_stats_poll_interval)
+ALTER TABLE tbl_set_ttl_params RESET (
+ ttl_select_batch_size,
+ ttl_delete_batch_size,
+ ttl_select_rate_limit,
+ ttl_delete_rate_limit,
+ ttl_pause,
+ ttl_row_stats_poll_interval
+)
query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl_set_ttl_params]
diff --git a/pkg/sql/storageparam/tablestorageparam/table_storage_param.go b/pkg/sql/storageparam/tablestorageparam/table_storage_param.go
index 2a1b23892a77..a04d47058179 100644
--- a/pkg/sql/storageparam/tablestorageparam/table_storage_param.go
+++ b/pkg/sql/storageparam/tablestorageparam/table_storage_param.go
@@ -325,6 +325,26 @@ var tableParams = map[string]tableParam{
return nil
},
},
+ `ttl_select_rate_limit`: {
+ onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
+ val, err := paramparse.DatumAsInt(ctx, evalCtx, key, datum)
+ if err != nil {
+ return err
+ }
+ if err := tabledesc.ValidateTTLRateLimit(key, val); err != nil {
+ return err
+ }
+ rowLevelTTL := po.getOrCreateRowLevelTTL()
+ rowLevelTTL.SelectRateLimit = val
+ return nil
+ },
+ onReset: func(_ context.Context, po *Setter, evalCtx *eval.Context, key string) error {
+ if po.hasRowLevelTTL() {
+ po.UpdatedRowLevelTTL.SelectRateLimit = 0
+ }
+ return nil
+ },
+ },
`ttl_delete_rate_limit`: {
onSet: func(ctx context.Context, po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
val, err := paramparse.DatumAsInt(ctx, evalCtx, key, datum)
diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go
index 4fac728adbac..d9041b59113a 100644
--- a/pkg/sql/ttl/ttlbase/ttl_helpers.go
+++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go
@@ -48,6 +48,14 @@ var (
settings.PositiveInt,
settings.WithPublic,
)
+ defaultSelectRateLimit = settings.RegisterIntSetting(
+ settings.ApplicationLevel,
+ "sql.ttl.default_select_rate_limit",
+ "default select rate limit (rows per second) per node for each TTL job. Use 0 to signify no rate limit.",
+ 0,
+ settings.NonNegativeInt,
+ settings.WithPublic,
+ )
defaultDeleteRateLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.ttl.default_delete_rate_limit",
@@ -93,6 +101,20 @@ func GetDeleteBatchSize(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
return bs
}
+// GetSelectRateLimit returns the table storage param value if specified or
+// falls back to the cluster setting.
+func GetSelectRateLimit(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
+ rl := ttl.SelectRateLimit
+ if rl == 0 {
+ rl = defaultSelectRateLimit.Get(sv)
+ }
+ // Put the maximum tokens possible if there is no rate limit.
+ if rl == 0 {
+ rl = math.MaxInt64
+ }
+ return rl
+}
+
// GetDeleteRateLimit returns the table storage param value if specified or
// falls back to the cluster setting.
func GetDeleteRateLimit(sv *settings.Values, ttl *catpb.RowLevelTTL) int64 {
diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel
index f573e5404b1e..e660bc6fa45b 100644
--- a/pkg/sql/ttl/ttljob/BUILD.bazel
+++ b/pkg/sql/ttl/ttljob/BUILD.bazel
@@ -100,7 +100,10 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
+ "//pkg/util/metric",
+ "//pkg/util/metric/aggmetric",
"//pkg/util/protoutil",
+ "//pkg/util/quotapool",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go
index 81d795e41787..4d4621a89603 100644
--- a/pkg/sql/ttl/ttljob/ttljob.go
+++ b/pkg/sql/ttl/ttljob/ttljob.go
@@ -38,6 +38,10 @@ import (
"github.com/cockroachdb/errors"
)
+// rowLevelTTLResumer implements the TTL job. The job can run on any node, but
+// the job node distributes SELECT/DELETE work via DistSQL to ttlProcessor
+// nodes. DistSQL divides work into spans that each ttlProcessor scans in a
+// SELECT/DELETE loop.
type rowLevelTTLResumer struct {
job *jobs.Job
st *cluster.Settings
@@ -185,6 +189,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
jobID := t.job.ID()
selectBatchSize := ttlbase.GetSelectBatchSize(settingsValues, rowLevelTTL)
deleteBatchSize := ttlbase.GetDeleteBatchSize(settingsValues, rowLevelTTL)
+ selectRateLimit := ttlbase.GetSelectRateLimit(settingsValues, rowLevelTTL)
deleteRateLimit := ttlbase.GetDeleteRateLimit(settingsValues, rowLevelTTL)
newTTLSpec := func(spans []roachpb.Span) *execinfrapb.TTLSpec {
return &execinfrapb.TTLSpec{
@@ -194,6 +199,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
Spans: spans,
SelectBatchSize: selectBatchSize,
DeleteBatchSize: deleteBatchSize,
+ SelectRateLimit: selectRateLimit,
DeleteRateLimit: deleteRateLimit,
LabelMetrics: rowLevelTTL.LabelMetrics,
PreDeleteChangeTableVersion: knobs.PreDeleteChangeTableVersion,
diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go
index c156e0d8f717..f2257a6e5f2e 100644
--- a/pkg/sql/ttl/ttljob/ttljob_processor.go
+++ b/pkg/sql/ttl/ttljob/ttljob_processor.go
@@ -13,6 +13,7 @@ package ttljob
import (
"bytes"
"context"
+ "math"
"runtime"
"sync/atomic"
"time"
@@ -44,11 +45,18 @@ import (
"github.com/cockroachdb/errors"
)
+// ttlProcessor manages the work managed by a single node for a job run by
+// rowLevelTTLResumer. SpanToQueryBounds converts a DistSQL span into
+// QueryBounds. The QueryBounds are passed to SelectQueryBuilder and
+// DeleteQueryBuilder which manage the state for the SELECT/DELETE loop
+// that is run by runTTLOnQueryBounds.
type ttlProcessor struct {
execinfra.ProcessorBase
ttlSpec execinfrapb.TTLSpec
}
+var _ execinfra.RowSource = (*ttlProcessor)(nil)
+
func (t *ttlProcessor) Start(ctx context.Context) {
ctx = t.StartInternal(ctx, "ttl")
err := t.work(ctx)
@@ -65,6 +73,21 @@ func (t *ttlProcessor) work(ctx context.Context) error {
codec := serverCfg.Codec
details := ttlSpec.RowLevelTTLDetails
tableID := details.TableID
+ cutoff := details.Cutoff
+ ttlExpr := ttlSpec.TTLExpr
+
+ selectRateLimit := ttlSpec.SelectRateLimit
+ // Default 0 value to "unlimited" in case job started on node <= v23.2.
+ // todo(sql-foundations): Remove this in 25.1 for consistency with
+ // deleteRateLimit.
+ if selectRateLimit == 0 {
+ selectRateLimit = math.MaxInt64
+ }
+ selectRateLimiter := quotapool.NewRateLimiter(
+ "ttl-select",
+ quotapool.Limit(selectRateLimit),
+ selectRateLimit,
+ )
deleteRateLimit := ttlSpec.DeleteRateLimit
deleteRateLimiter := quotapool.NewRateLimiter(
@@ -73,14 +96,13 @@ func (t *ttlProcessor) work(ctx context.Context) error {
deleteRateLimit,
)
- processorRowCount := int64(0)
-
var (
- relationName string
- pkColNames []string
- pkColTypes []*types.T
- pkColDirs []catenumpb.IndexColumn_Direction
- labelMetrics bool
+ relationName string
+ pkColNames []string
+ pkColTypes []*types.T
+ pkColDirs []catenumpb.IndexColumn_Direction
+ labelMetrics bool
+ processorRowCount int64
)
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
desc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, tableID)
@@ -139,18 +161,39 @@ func (t *ttlProcessor) work(ctx context.Context) error {
group.GoCtx(func(ctx context.Context) error {
for bounds := range boundsChan {
start := timeutil.Now()
+ selectBuilder := MakeSelectQueryBuilder(
+ SelectQueryParams{
+ RelationName: relationName,
+ PKColNames: pkColNames,
+ PKColDirs: pkColDirs,
+ Bounds: bounds,
+ AOSTDuration: ttlSpec.AOSTDuration,
+ SelectBatchSize: ttlSpec.SelectBatchSize,
+ TTLExpr: ttlExpr,
+ SelectDuration: metrics.SelectDuration,
+ SelectRateLimiter: selectRateLimiter,
+ },
+ cutoff,
+ )
+ deleteBuilder := MakeDeleteQueryBuilder(
+ DeleteQueryParams{
+ RelationName: relationName,
+ PKColNames: pkColNames,
+ DeleteBatchSize: ttlSpec.DeleteBatchSize,
+ TTLExpr: ttlExpr,
+ DeleteDuration: metrics.DeleteDuration,
+ DeleteRateLimiter: deleteRateLimiter,
+ },
+ cutoff,
+ )
spanRowCount, err := t.runTTLOnQueryBounds(
ctx,
metrics,
- bounds,
- pkColNames,
- pkColDirs,
- relationName,
- deleteRateLimiter,
+ selectBuilder,
+ deleteBuilder,
)
// add before returning err in case of partial success
atomic.AddInt64(&processorRowCount, spanRowCount)
- metrics.SpanTotalDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
// Continue until channel is fully read.
// Otherwise, the keys input will be blocked.
@@ -158,6 +201,7 @@ func (t *ttlProcessor) work(ctx context.Context) error {
}
return err
}
+ metrics.SpanTotalDuration.RecordValue(int64(timeutil.Since(start)))
}
return nil
})
@@ -223,15 +267,14 @@ func (t *ttlProcessor) work(ctx context.Context) error {
)
}
-// spanRowCount should be checked even if the function returns an error because it may have partially succeeded
+// runTTLOnQueryBounds runs the SELECT/DELETE loop for a single DistSQL span.
+// spanRowCount should be checked even if the function returns an error
+// because it may have partially succeeded.
func (t *ttlProcessor) runTTLOnQueryBounds(
ctx context.Context,
metrics rowLevelTTLMetrics,
- bounds QueryBounds,
- pkColNames []string,
- pkColDirs []catenumpb.IndexColumn_Direction,
- relationName string,
- deleteRateLimiter *quotapool.RateLimiter,
+ selectBuilder SelectQueryBuilder,
+ deleteBuilder DeleteQueryBuilder,
) (spanRowCount int64, err error) {
metrics.NumActiveSpans.Inc(1)
defer metrics.NumActiveSpans.Dec(1)
@@ -240,32 +283,10 @@ func (t *ttlProcessor) runTTLOnQueryBounds(
ttlSpec := t.ttlSpec
details := ttlSpec.RowLevelTTLDetails
- cutoff := details.Cutoff
- ttlExpr := ttlSpec.TTLExpr
flowCtx := t.FlowCtx
serverCfg := flowCtx.Cfg
ie := serverCfg.DB.Executor()
- selectBatchSize := ttlSpec.SelectBatchSize
- selectBuilder := MakeSelectQueryBuilder(
- cutoff,
- pkColNames,
- pkColDirs,
- relationName,
- bounds,
- ttlSpec.AOSTDuration,
- selectBatchSize,
- ttlExpr,
- )
- deleteBatchSize := ttlSpec.DeleteBatchSize
- deleteBuilder := MakeDeleteQueryBuilder(
- cutoff,
- pkColNames,
- relationName,
- deleteBatchSize,
- ttlExpr,
- )
-
preSelectStatement := ttlSpec.PreSelectStatement
if preSelectStatement != "" {
if _, err := ie.ExecEx(
@@ -288,16 +309,16 @@ func (t *ttlProcessor) runTTLOnQueryBounds(
// Step 1. Fetch some rows we want to delete using a historical
// SELECT query.
- start := timeutil.Now()
expiredRowsPKs, hasNext, err := selectBuilder.Run(ctx, ie)
- metrics.SelectDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
return spanRowCount, errors.Wrapf(err, "error selecting rows to delete")
}
+
numExpiredRows := int64(len(expiredRowsPKs))
metrics.RowSelections.Inc(numExpiredRows)
// Step 2. Delete the rows which have expired.
+ deleteBatchSize := deleteBuilder.DeleteBatchSize
for startRowIdx := int64(0); startRowIdx < numExpiredRows; startRowIdx += deleteBatchSize {
until := startRowIdx + deleteBatchSize
if until > numExpiredRows {
@@ -319,18 +340,10 @@ func (t *ttlProcessor) runTTLOnQueryBounds(
desc.GetModificationTime().GoTime().Format(time.RFC3339),
)
}
- tokens, err := deleteRateLimiter.Acquire(ctx, int64(len(deleteBatch)))
- if err != nil {
- return err
- }
- defer tokens.Consume()
-
- start := timeutil.Now()
batchRowCount, err = deleteBuilder.Run(ctx, txn, deleteBatch)
if err != nil {
return err
}
- metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return nil
}
if err := serverCfg.DB.Txn(
diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go
index 71be6e5b0c56..dc28b8ca373e 100644
--- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go
+++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go
@@ -23,6 +23,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase"
+ "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
+ "github.com/cockroachdb/cockroach/pkg/util/quotapool"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
@@ -47,18 +50,23 @@ type QueryBounds struct {
End tree.Datums
}
+type SelectQueryParams struct {
+ RelationName string
+ PKColNames []string
+ PKColDirs []catenumpb.IndexColumn_Direction
+ Bounds QueryBounds
+ AOSTDuration time.Duration
+ SelectBatchSize int64
+ TTLExpr catpb.Expression
+ SelectDuration *aggmetric.Histogram
+ SelectRateLimiter *quotapool.RateLimiter
+}
+
// SelectQueryBuilder is responsible for maintaining state around the SELECT
// portion of the TTL job.
type SelectQueryBuilder struct {
- relationName string
- pkColNames []string
- pkColDirs []catenumpb.IndexColumn_Direction
- selectOpName string
- bounds QueryBounds
- selectBatchSize int64
- aostDuration time.Duration
- ttlExpr catpb.Expression
-
+ SelectQueryParams
+ selectOpName string
// isFirst is true if we have not invoked a query using the builder yet.
isFirst bool
// cachedQuery is the cached query, which stays the same from the second
@@ -69,61 +77,43 @@ type SelectQueryBuilder struct {
cachedArgs []interface{}
}
-func MakeSelectQueryBuilder(
- cutoff time.Time,
- pkColNames []string,
- pkColDirs []catenumpb.IndexColumn_Direction,
- relationName string,
- bounds QueryBounds,
- aostDuration time.Duration,
- selectBatchSize int64,
- ttlExpr catpb.Expression,
-) SelectQueryBuilder {
- numPkCols := len(pkColNames)
+func MakeSelectQueryBuilder(params SelectQueryParams, cutoff time.Time) SelectQueryBuilder {
+ numPkCols := len(params.PKColNames)
if numPkCols == 0 {
- panic("pkColNames is empty")
+ panic("PKColNames is empty")
}
- if numPkCols != len(pkColDirs) {
- panic("different number of pkColNames and pkColDirs")
+ if numPkCols != len(params.PKColDirs) {
+ panic("different number of PKColNames and PKColDirs")
}
- // We will have a maximum of 1 + len(pkColNames)*2 columns, where one
- // is reserved for AOST, and len(pkColNames) for both start and end key.
+ // We will have a maximum of 1 + len(PKColNames)*2 columns, where one
+ // is reserved for AOST, and len(PKColNames) for both start and end key.
cachedArgs := make([]interface{}, 0, 1+numPkCols*2)
cachedArgs = append(cachedArgs, cutoff)
- endPK := bounds.End
- for _, d := range endPK {
+ for _, d := range params.Bounds.End {
cachedArgs = append(cachedArgs, d)
}
- startPK := bounds.Start
- for _, d := range startPK {
+ for _, d := range params.Bounds.Start {
cachedArgs = append(cachedArgs, d)
}
return SelectQueryBuilder{
- relationName: relationName,
- pkColNames: pkColNames,
- pkColDirs: pkColDirs,
- selectOpName: fmt.Sprintf("ttl select %s", relationName),
- bounds: bounds,
- aostDuration: aostDuration,
- selectBatchSize: selectBatchSize,
- ttlExpr: ttlExpr,
-
- cachedArgs: cachedArgs,
- isFirst: true,
+ SelectQueryParams: params,
+ selectOpName: fmt.Sprintf("ttl select %s", params.RelationName),
+ cachedArgs: cachedArgs,
+ isFirst: true,
}
}
func (b *SelectQueryBuilder) buildQuery() string {
return ttlbase.BuildSelectQuery(
- b.relationName,
- b.pkColNames,
- b.pkColDirs,
- b.aostDuration,
- b.ttlExpr,
- len(b.bounds.Start),
- len(b.bounds.End),
- b.selectBatchSize,
+ b.RelationName,
+ b.PKColNames,
+ b.PKColDirs,
+ b.AOSTDuration,
+ b.TTLExpr,
+ len(b.Bounds.Start),
+ len(b.Bounds.End),
+ b.SelectBatchSize,
b.isFirst,
)
}
@@ -144,6 +134,13 @@ func (b *SelectQueryBuilder) Run(
query = b.cachedQuery
}
+ tokens, err := b.SelectRateLimiter.Acquire(ctx, b.SelectBatchSize)
+ if err != nil {
+ return nil, false, err
+ }
+ defer tokens.Consume()
+
+ start := timeutil.Now()
// Use a nil txn so that the AOST clause is handled correctly. Currently,
// the internal executor will treat a passed-in txn as an explicit txn, so
// the AOST clause on the SELECT query would not be interpreted correctly.
@@ -161,69 +158,66 @@ func (b *SelectQueryBuilder) Run(
if err != nil {
return nil, false, err
}
+ b.SelectDuration.RecordValue(int64(timeutil.Since(start)))
numRows := int64(len(rows))
if numRows > 0 {
// Move the cursor forward if SELECT returns rows.
lastRow := rows[numRows-1]
- if len(lastRow) != len(b.pkColNames) {
- return nil, false, errors.AssertionFailedf("expected %d columns for last row, got %d", len(b.pkColNames), len(lastRow))
+ if len(lastRow) != len(b.PKColNames) {
+ return nil, false, errors.AssertionFailedf("expected %d columns for last row, got %d", len(b.PKColNames), len(lastRow))
}
- b.cachedArgs = b.cachedArgs[:len(b.cachedArgs)-len(b.bounds.Start)]
+ b.cachedArgs = b.cachedArgs[:len(b.cachedArgs)-len(b.Bounds.Start)]
for _, d := range lastRow {
b.cachedArgs = append(b.cachedArgs, d)
}
- b.bounds.Start = lastRow
+ b.Bounds.Start = lastRow
}
- return rows, numRows == b.selectBatchSize, nil
+ return rows, numRows == b.SelectBatchSize, nil
+}
+
+type DeleteQueryParams struct {
+ RelationName string
+ PKColNames []string
+ DeleteBatchSize int64
+ TTLExpr catpb.Expression
+ DeleteDuration *aggmetric.Histogram
+ DeleteRateLimiter *quotapool.RateLimiter
}
// DeleteQueryBuilder is responsible for maintaining state around the DELETE
// portion of the TTL job.
type DeleteQueryBuilder struct {
- relationName string
- pkColNames []string
- deleteBatchSize int64
- deleteOpName string
- ttlExpr catpb.Expression
-
+ DeleteQueryParams
+ deleteOpName string
// cachedQuery is the cached query, which stays the same as long as we are
- // deleting up to deleteBatchSize elements.
+ // deleting up to DeleteBatchSize elements.
cachedQuery string
// cachedArgs keeps a cache of args to use in the run query.
// The cache is of form [cutoff, flattened PKs...].
cachedArgs []interface{}
}
-func MakeDeleteQueryBuilder(
- cutoff time.Time,
- pkColNames []string,
- relationName string,
- deleteBatchSize int64,
- ttlExpr catpb.Expression,
-) DeleteQueryBuilder {
- if len(pkColNames) == 0 {
- panic("pkColNames is empty")
+func MakeDeleteQueryBuilder(params DeleteQueryParams, cutoff time.Time) DeleteQueryBuilder {
+ if len(params.PKColNames) == 0 {
+ panic("PKColNames is empty")
}
- cachedArgs := make([]interface{}, 0, 1+int64(len(pkColNames))*deleteBatchSize)
+ cachedArgs := make([]interface{}, 0, 1+int64(len(params.PKColNames))*params.DeleteBatchSize)
cachedArgs = append(cachedArgs, cutoff)
return DeleteQueryBuilder{
- relationName: relationName,
- pkColNames: pkColNames,
- deleteBatchSize: deleteBatchSize,
- deleteOpName: fmt.Sprintf("ttl delete %s", relationName),
- ttlExpr: ttlExpr,
- cachedArgs: cachedArgs,
+ DeleteQueryParams: params,
+ deleteOpName: fmt.Sprintf("ttl delete %s", params.RelationName),
+ cachedArgs: cachedArgs,
}
}
func (b *DeleteQueryBuilder) buildQuery(numRows int) string {
return ttlbase.BuildDeleteQuery(
- b.relationName,
- b.pkColNames,
- b.ttlExpr,
+ b.RelationName,
+ b.PKColNames,
+ b.TTLExpr,
numRows,
)
}
@@ -233,7 +227,7 @@ func (b *DeleteQueryBuilder) Run(
) (int64, error) {
numRows := len(rows)
var query string
- if int64(numRows) == b.deleteBatchSize {
+ if int64(numRows) == b.DeleteBatchSize {
if b.cachedQuery == "" {
b.cachedQuery = b.buildQuery(numRows)
}
@@ -249,6 +243,13 @@ func (b *DeleteQueryBuilder) Run(
}
}
+ tokens, err := b.DeleteRateLimiter.Acquire(ctx, int64(numRows))
+ if err != nil {
+ return 0, err
+ }
+ defer tokens.Consume()
+
+ start := timeutil.Now()
rowCount, err := txn.ExecEx(
ctx,
b.deleteOpName,
@@ -260,5 +261,9 @@ func (b *DeleteQueryBuilder) Run(
query,
deleteArgs...,
)
- return int64(rowCount), err
+ if err != nil {
+ return 0, err
+ }
+ b.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
+ return int64(rowCount), nil
}
diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
index 64e739b7717c..52dd0025d2bb 100644
--- a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
+++ b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
@@ -13,6 +13,7 @@ package ttljob_test
import (
"context"
"fmt"
+ "math"
"strconv"
"strings"
"testing"
@@ -28,6 +29,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/metric"
+ "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
+ "github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/stretchr/testify/require"
)
@@ -298,7 +302,7 @@ func TestSelectQueryBuilder(t *testing.T) {
testServer := testCluster.Server(0)
ie := testServer.InternalExecutor().(*sql.InternalExecutor)
- // Generate pkColNames.
+ // Generate PKColNames.
pkColDirs := tc.pkColDirs
numPKCols := len(pkColDirs)
pkColNames := ttlbase.GenPKColNames(numPKCols)
@@ -333,14 +337,22 @@ func TestSelectQueryBuilder(t *testing.T) {
// Setup SelectQueryBuilder.
queryBuilder := ttljob.MakeSelectQueryBuilder(
+ ttljob.SelectQueryParams{
+ RelationName: relationName,
+ PKColNames: pkColNames,
+ PKColDirs: pkColDirs,
+ Bounds: tc.bounds,
+ AOSTDuration: 0,
+ SelectBatchSize: 2,
+ TTLExpr: ttlColName,
+ SelectDuration: testHistogram(),
+ SelectRateLimiter: quotapool.NewRateLimiter(
+ "",
+ quotapool.Inf(),
+ math.MaxInt64,
+ ),
+ },
cutoff,
- pkColNames,
- pkColDirs,
- relationName,
- tc.bounds,
- 0,
- 2,
- ttlColName,
)
// Verify queryBuilder iterations.
@@ -418,7 +430,7 @@ func TestDeleteQueryBuilder(t *testing.T) {
ie := testServer.InternalExecutor().(*sql.InternalExecutor)
db := testServer.InternalDB().(*sql.InternalDB)
- // Generate pkColNames.
+ // Generate PKColNames.
numPKCols := tc.numPKCols
pkColNames := ttlbase.GenPKColNames(numPKCols)
@@ -446,11 +458,19 @@ func TestDeleteQueryBuilder(t *testing.T) {
// Setup DeleteQueryBuilder.
queryBuilder := ttljob.MakeDeleteQueryBuilder(
+ ttljob.DeleteQueryParams{
+ RelationName: relationName,
+ PKColNames: pkColNames,
+ DeleteBatchSize: 2,
+ TTLExpr: ttlColName,
+ DeleteDuration: testHistogram(),
+ DeleteRateLimiter: quotapool.NewRateLimiter(
+ "",
+ quotapool.Inf(),
+ math.MaxInt64,
+ ),
+ },
cutoff,
- pkColNames,
- relationName,
- 2, /* deleteBatchSize */
- ttlColName,
)
// Verify rows are deleted.
@@ -470,3 +490,9 @@ func TestDeleteQueryBuilder(t *testing.T) {
})
}
}
+
+func testHistogram() *aggmetric.Histogram {
+ return aggmetric.MakeBuilder().Histogram(metric.HistogramOptions{
+ SigFigs: 1,
+ }).AddChild()
+}
diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go
index 6a236ef3af7d..bfa262b8b6af 100644
--- a/pkg/sql/ttl/ttljob/ttljob_test.go
+++ b/pkg/sql/ttl/ttljob/ttljob_test.go
@@ -685,7 +685,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
"quote-kw-col" TIMESTAMPTZ,
text TEXT,
PRIMARY KEY (id, other_col, "quote-kw-col")
-) WITH (ttl_expire_after = '30 days', ttl_delete_rate_limit = 350)`,
+) WITH (ttl_expire_after = '30 days', ttl_select_rate_limit = 350, ttl_delete_rate_limit = 350)`,
numExpiredRows: 1001,
numNonExpiredRows: 5,
},