Skip to content

Commit

Permalink
Merge #76487
Browse files Browse the repository at this point in the history
76487: ttljob: wire up range_concurrency and cleanup query construction r=rafiss a=otan

See individual commits for details.

Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Feb 17, 2022
2 parents a83b43c + 7d45a85 commit ce9a343
Show file tree
Hide file tree
Showing 14 changed files with 1,381 additions and 427 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ sql.trace.session_eventlog.enabled boolean false set to true to enable session t
sql.trace.stmt.enable_threshold duration 0s duration beyond which all statements are traced (set to 0 to disable). This applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold.
sql.trace.txn.enable_threshold duration 0s duration beyond which all transactions are traced (set to 0 to disable). This setting is coarser grained thansql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries).
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job
sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
<tr><td><code>sql.trace.stmt.enable_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration beyond which all statements are traced (set to 0 to disable). This applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold.</td></tr>
<tr><td><code>sql.trace.txn.enable_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration beyond which all transactions are traced (set to 0 to disable). This setting is coarser grained thansql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries).</td></tr>
<tr><td><code>sql.ttl.default_delete_batch_size</code></td><td>integer</td><td><code>100</code></td><td>default amount of rows to delete in a single query during a TTL job</td></tr>
<tr><td><code>sql.ttl.default_range_concurrency</code></td><td>integer</td><td><code>1</code></td><td>default amount of ranges to process at once during a TTL delete</td></tr>
<tr><td><code>sql.ttl.default_select_batch_size</code></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td></tr>
<tr><td><code>timeseries.storage.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td></tr>
<tr><td><code>timeseries.storage.resolution_10s.ttl</code></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/catpb/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,6 @@ message RowLevelTTL {
optional string deletion_cron = 4 [(gogoproto.nullable)=false];
// ScheduleID is the ID of the row-level TTL job schedules.
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
// RangeConcurrency is the number of ranges to process at a time.
optional int64 range_concurrency = 6 [(gogoproto.nullable)=false];
}
17 changes: 17 additions & 0 deletions pkg/sql/catalog/tabledesc/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func ValidateRowLevelTTL(ttl *catpb.RowLevelTTL) error {
return err
}
}
if ttl.RangeConcurrency != 0 {
if err := ValidateTTLRangeConcurrency("ttl_range_concurrency", ttl.RangeConcurrency); err != nil {
return err
}
}
return nil
}

Expand All @@ -58,6 +63,18 @@ func ValidateTTLBatchSize(key string, val int64) error {
return nil
}

// ValidateTTLRangeConcurrency validates the batch size of a TTL.
func ValidateTTLRangeConcurrency(key string, val int64) error {
if val <= 0 {
return pgerror.Newf(
pgcode.InvalidParameterValue,
`"%s" must be at least 1`,
key,
)
}
return nil
}

// ValidateTTLCronExpr validates the cron expression of TTL.
func ValidateTTLCronExpr(key string, str string) error {
if _, err := cron.ParseStandard(str); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
Expand Down Expand Up @@ -1477,6 +1478,9 @@ type TTLTestingKnobs struct {
// AOSTDuration changes the AOST timestamp duration to add to the
// current time.
AOSTDuration *time.Duration
// MockDescriptorVersionDuringDelete is a version to mock the delete descriptor
// as during delete.
MockDescriptorVersionDuringDelete *descpb.DescriptorVersion
}

// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface.
Expand Down
31 changes: 17 additions & 14 deletions pkg/sql/logictest/testdata/logic_test/row_level_ttl
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,18 @@ CREATE TABLE tbl (
id INT PRIMARY KEY,
text TEXT,
FAMILY (id, text)
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 50)
) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 50, ttl_range_concurrency = 2)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
----
CREATE TABLE public.tbl (
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50)
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50, ttl_range_concurrency = 2)

statement ok
ALTER TABLE tbl SET (ttl_delete_batch_size = 100)
Expand All @@ -341,21 +341,24 @@ query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
----
CREATE TABLE public.tbl (
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50, ttl_delete_batch_size = 100)
id INT8 NOT NULL,
text STRING NULL,
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_select_batch_size = 50, ttl_delete_batch_size = 100, ttl_range_concurrency = 2)

statement error "ttl_select_batch_size" must be at least 1
ALTER TABLE tbl SET (ttl_select_batch_size = -1)

statement error "ttl_delete_batch_size" must be at least 1
ALTER TABLE tbl SET (ttl_delete_batch_size = -1)

statement error "ttl_range_concurrency" must be at least 1
ALTER TABLE tbl SET (ttl_range_concurrency = -1)

statement ok
ALTER TABLE tbl RESET (ttl_delete_batch_size, ttl_select_batch_size)
ALTER TABLE tbl RESET (ttl_delete_batch_size, ttl_select_batch_size, ttl_range_concurrency)

query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/paramparse/paramobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,28 @@ var tableParams = map[string]tableParam{
return nil
},
},
`ttl_range_concurrency`: {
onSet: func(ctx context.Context, po *TableStorageParamObserver, semaCtx *tree.SemaContext, evalCtx *tree.EvalContext, key string, datum tree.Datum) error {
if po.tableDesc.RowLevelTTL == nil {
po.tableDesc.RowLevelTTL = &catpb.RowLevelTTL{}
}
val, err := DatumAsInt(evalCtx, key, datum)
if err != nil {
return err
}
if err := tabledesc.ValidateTTLRangeConcurrency(key, val); err != nil {
return err
}
po.tableDesc.RowLevelTTL.RangeConcurrency = val
return nil
},
onReset: func(po *TableStorageParamObserver, evalCtx *tree.EvalContext, key string) error {
if po.tableDesc.RowLevelTTL != nil {
po.tableDesc.RowLevelTTL.RangeConcurrency = 0
}
return nil
},
},
`ttl_job_cron`: {
onSet: func(ctx context.Context, po *TableStorageParamObserver, semaCtx *tree.SemaContext, evalCtx *tree.EvalContext, key string, datum tree.Datum) error {
if po.tableDesc.RowLevelTTL == nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/show_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func ShowCreateTable(
if cron := ttl.DeletionCron; cron != "" {
storageParams = append(storageParams, fmt.Sprintf(`ttl_job_cron = '%s'`, cron))
}
if rc := ttl.RangeConcurrency; rc != 0 {
storageParams = append(storageParams, fmt.Sprintf(`ttl_range_concurrency = %d`, rc))
}
}
if exclude := desc.GetExcludeDataFromBackup(); exclude {
storageParams = append(storageParams, `exclude_data_from_backup = true`)
Expand Down
18 changes: 16 additions & 2 deletions pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,30 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ttljob",
srcs = ["ttljob.go"],
srcs = [
"ttljob.go",
"ttljob_query_builder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttljob",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/lexbase",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
Expand All @@ -24,21 +35,24 @@ go_test(
name = "ttljob_test",
srcs = [
"main_test.go",
"ttljob_integration_test.go",
"ttljob_query_builder_test.go",
"ttljob_test.go",
],
embed = [":ttljob"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv",
"//pkg/scheduledjobs",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/lexbase",
"//pkg/sql/parser",
"//pkg/sql/randgen",
Expand Down
Loading

0 comments on commit ce9a343

Please sign in to comment.