Skip to content

Commit

Permalink
ttljob: implement a delete based rate limit
Browse files Browse the repository at this point in the history
This commit introduces a delete based rate limit for TTL. It can be
configured by the `ttl_delete_rate_limit` storage parameter, but
defaults to the value in `sql.ttl.default_delete_rate_limit` (if zero,
assume no rate limit).

Release note: None
  • Loading branch information
otan committed Feb 18, 2022
1 parent ce9a343 commit abd060e
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ sql.trace.session_eventlog.enabled boolean false set to true to enable session t
sql.trace.stmt.enable_threshold duration 0s duration beyond which all statements are traced (set to 0 to disable). This applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold.
sql.trace.txn.enable_threshold duration 0s duration beyond which all transactions are traced (set to 0 to disable). This setting is coarser grained thansql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries).
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.
sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
<tr><td><code>sql.trace.stmt.enable_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration beyond which all statements are traced (set to 0 to disable). This applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold.</td></tr>
<tr><td><code>sql.trace.txn.enable_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration beyond which all transactions are traced (set to 0 to disable). This setting is coarser grained thansql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries).</td></tr>
<tr><td><code>sql.ttl.default_delete_batch_size</code></td><td>integer</td><td><code>100</code></td><td>default amount of rows to delete in a single query during a TTL job</td></tr>
<tr><td><code>sql.ttl.default_delete_rate_limit</code></td><td>integer</td><td><code>0</code></td><td>default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.</td></tr>
<tr><td><code>sql.ttl.default_range_concurrency</code></td><td>integer</td><td><code>1</code></td><td>default amount of ranges to process at once during a TTL delete</td></tr>
<tr><td><code>sql.ttl.default_select_batch_size</code></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td></tr>
<tr><td><code>timeseries.storage.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/catpb/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,6 @@ message RowLevelTTL {
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
// RangeConcurrency is the number of ranges to process at a time.
optional int64 range_concurrency = 6 [(gogoproto.nullable)=false];
// DeleteRateLimit is the maximum amount of rows to delete per second.
optional int64 delete_rate_limit = 7 [(gogoproto.nullable)=false];
}
17 changes: 17 additions & 0 deletions pkg/sql/catalog/tabledesc/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func ValidateRowLevelTTL(ttl *catpb.RowLevelTTL) error {
return err
}
}
if ttl.DeleteRateLimit != 0 {
if err := ValidateTTLRateLimit("ttl_delete_rate_limit", ttl.DeleteRateLimit); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -87,3 +92,15 @@ func ValidateTTLCronExpr(key string, str string) error {
}
return nil
}

// ValidateTTLRateLimit validates the rate limit parameters of TTL.
func ValidateTTLRateLimit(key string, val int64) error {
if val <= 0 {
return pgerror.Newf(
pgcode.InvalidParameterValue,
`"%s" must be at least 1`,
key,
)
}
return nil
}
31 changes: 17 additions & 14 deletions pkg/sql/logictest/testdata/logic_test/row_level_ttl
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,18 @@ CREATE TABLE tbl (
id INT PRIMARY KEY,
text TEXT,
FAMILY (id, text)
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 50, ttl_range_concurrency = 2)
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 50, ttl_range_concurrency = 2, ttl_delete_rate_limit = 100)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
----
CREATE TABLE public.tbl (
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50, ttl_range_concurrency = 2)
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50, ttl_range_concurrency = 2, ttl_delete_rate_limit = 100)

statement ok
ALTER TABLE tbl SET (ttl_delete_batch_size = 100)
Expand All @@ -341,12 +341,12 @@ query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
----
CREATE TABLE public.tbl (
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50, ttl_delete_batch_size = 100, ttl_range_concurrency = 2)
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50, ttl_delete_batch_size = 100, ttl_range_concurrency = 2, ttl_delete_rate_limit = 100)

statement error "ttl_select_batch_size" must be at least 1
ALTER TABLE tbl SET (ttl_select_batch_size = -1)
Expand All @@ -357,8 +357,11 @@ ALTER TABLE tbl SET (ttl_delete_batch_size = -1)
statement error "ttl_range_concurrency" must be at least 1
ALTER TABLE tbl SET (ttl_range_concurrency = -1)

statement error "ttl_delete_rate_limit" must be at least 1
ALTER TABLE tbl SET (ttl_delete_rate_limit = -1)

statement ok
ALTER TABLE tbl RESET (ttl_delete_batch_size, ttl_select_batch_size, ttl_range_concurrency)
ALTER TABLE tbl RESET (ttl_delete_batch_size, ttl_select_batch_size, ttl_range_concurrency, ttl_delete_rate_limit)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/paramparse/paramobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,28 @@ var tableParams = map[string]tableParam{
return nil
},
},
`ttl_delete_rate_limit`: {
onSet: func(ctx context.Context, po *TableStorageParamObserver, semaCtx *tree.SemaContext, evalCtx *tree.EvalContext, key string, datum tree.Datum) error {
if po.tableDesc.RowLevelTTL == nil {
po.tableDesc.RowLevelTTL = &catpb.RowLevelTTL{}
}
val, err := DatumAsInt(evalCtx, key, datum)
if err != nil {
return err
}
if err := tabledesc.ValidateTTLRateLimit(key, val); err != nil {
return err
}
po.tableDesc.RowLevelTTL.DeleteRateLimit = val
return nil
},
onReset: func(po *TableStorageParamObserver, evalCtx *tree.EvalContext, key string) error {
if po.tableDesc.RowLevelTTL != nil {
po.tableDesc.RowLevelTTL.DeleteRateLimit = 0
}
return nil
},
},
`ttl_job_cron`: {
onSet: func(ctx context.Context, po *TableStorageParamObserver, semaCtx *tree.SemaContext, evalCtx *tree.EvalContext, key string, datum tree.Datum) error {
if po.tableDesc.RowLevelTTL == nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/show_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ func ShowCreateTable(
if rc := ttl.RangeConcurrency; rc != 0 {
storageParams = append(storageParams, fmt.Sprintf(`ttl_range_concurrency = %d`, rc))
}
if rc := ttl.DeleteRateLimit; rc != 0 {
storageParams = append(storageParams, fmt.Sprintf(`ttl_delete_rate_limit = %d`, rc))
}
}
if exclude := desc.GetExcludeDataFromBackup(); exclude {
storageParams = append(storageParams, `exclude_data_from_backup = true`)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/quotapool",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
37 changes: 37 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package ttljob

import (
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand All @@ -56,6 +58,13 @@ var (
1,
settings.PositiveInt,
).WithPublic()
defaultDeleteRateLimit = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.ttl.default_delete_rate_limit",
"default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.",
0,
settings.NonNegativeInt,
).WithPublic()
)

type rowLevelTTLResumer struct {
Expand Down Expand Up @@ -149,6 +158,12 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
rangeConcurrency := getRangeConcurrency(p.ExecCfg().SV(), ttlSettings)
selectBatchSize := getSelectBatchSize(p.ExecCfg().SV(), ttlSettings)
deleteBatchSize := getDeleteBatchSize(p.ExecCfg().SV(), ttlSettings)
deleteRateLimit := getDeleteRateLimit(p.ExecCfg().SV(), ttlSettings)
deleteRateLimiter := quotapool.NewRateLimiter(
"ttl-delete",
quotapool.Limit(deleteRateLimit),
deleteRateLimit,
)

ch := make(chan rangeToProcess, rangeConcurrency)
for i := 0; i < rangeConcurrency; i++ {
Expand All @@ -165,6 +180,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
pkColumns,
selectBatchSize,
deleteBatchSize,
deleteRateLimiter,
*aost,
); err != nil {
// Continue until channel is fully read.
Expand Down Expand Up @@ -226,6 +242,20 @@ func getRangeConcurrency(sv *settings.Values, ttl catpb.RowLevelTTL) int {
return int(defaultRangeConcurrency.Get(sv))
}

func getDeleteRateLimit(sv *settings.Values, ttl catpb.RowLevelTTL) int64 {
val := func() int64 {
if bs := ttl.DeleteRateLimit; bs != 0 {
return bs
}
return defaultDeleteRateLimit.Get(sv)
}()
// Put the maximum tokens possible if there is no rate limit.
if val == 0 {
return math.MaxInt64
}
return val
}

func runTTLOnRange(
ctx context.Context,
execCfg *sql.ExecutorConfig,
Expand All @@ -236,6 +266,7 @@ func runTTLOnRange(
endPK tree.Datums,
pkColumns []string,
selectBatchSize, deleteBatchSize int,
deleteRateLimiter *quotapool.RateLimiter,
aost tree.DTimestampTZ,
) error {
ie := execCfg.InternalExecutor
Expand Down Expand Up @@ -303,6 +334,12 @@ func runTTLOnRange(
)
}

tokens, err := deleteRateLimiter.Acquire(ctx, int64(len(deleteBatch)))
if err != nil {
return err
}
defer tokens.Consume()

// TODO(#75428): configure admission priority
return deleteBuilder.run(ctx, ie, txn, deleteBatch)
}); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,18 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
numExpiredRows: 1001,
numNonExpiredRows: 5,
},
{
desc: "three column pk with rate limit",
createTable: `CREATE TABLE tbl (
id UUID DEFAULT gen_random_uuid(),
other_col INT,
"quote-kw-col" TIMESTAMPTZ,
text TEXT,
PRIMARY KEY (id, other_col, "quote-kw-col")
) WITH (ttl_expire_after = '30 days', ttl_delete_rate_limit = 350)`,
numExpiredRows: 1001,
numNonExpiredRows: 5,
},
{
desc: "three column pk, concurrentSchemaChange",
createTable: `CREATE TABLE tbl (
Expand Down

0 comments on commit abd060e

Please sign in to comment.