Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttljob: paginate ranges when doing TTL #76726

Merged
merged 1 commit into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete i
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.
sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table during the TTL job
timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.
timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
<tr><td><code>sql.ttl.default_delete_rate_limit</code></td><td>integer</td><td><code>0</code></td><td>default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.</td></tr>
<tr><td><code>sql.ttl.default_range_concurrency</code></td><td>integer</td><td><code>1</code></td><td>default amount of ranges to process at once during a TTL delete</td></tr>
<tr><td><code>sql.ttl.default_select_batch_size</code></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td></tr>
<tr><td><code>sql.ttl.range_batch_size</code></td><td>integer</td><td><code>100</code></td><td>amount of ranges to fetch at a time for a table during the TTL job</td></tr>
<tr><td><code>timeseries.storage.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td></tr>
<tr><td><code>timeseries.storage.resolution_10s.ttl</code></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td></tr>
<tr><td><code>timeseries.storage.resolution_30m.ttl</code></td><td>duration</td><td><code>2160h0m0s</code></td><td>the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.</td></tr>
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
77 changes: 58 additions & 19 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -70,6 +69,14 @@ var (
0,
settings.NonNegativeInt,
).WithPublic()

rangeBatchSize = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.ttl.range_batch_size",
"amount of ranges to fetch at a time for a table during the TTL job",
100,
settings.PositiveInt,
).WithPublic()
)

type rowLevelTTLResumer struct {
Expand Down Expand Up @@ -228,8 +235,8 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
var pkColumns []string
var pkTypes []*types.T
var pkDirs []descpb.IndexDescriptor_Direction
var ranges []kv.KeyValue
var name string
var rangeSpan roachpb.Span
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := descs.GetImmutableTableByID(
ctx,
Expand Down Expand Up @@ -263,12 +270,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
if ttl == nil {
return errors.Newf("unable to find TTL on table %s", desc.GetName())
}
ttlSettings = *ttl

ranges, err = kvclient.ScanMetaKVs(ctx, txn, desc.TableSpan(p.ExecCfg().Codec))
if err != nil {
return err
}

_, dbDesc, err := descs.GetImmutableDatabaseByID(
ctx,
Expand Down Expand Up @@ -299,6 +300,8 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
tree.Name(desc.GetName()),
)
name = tn.FQString()
rangeSpan = desc.TableSpan(p.ExecCfg().Codec)
ttlSettings = *ttl
return nil
}); err != nil {
return err
Expand Down Expand Up @@ -362,20 +365,56 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
close(ch)
retErr = errors.CombineErrors(retErr, g.Wait())
}()
for _, r := range ranges {
if err := r.ValueProto(&rangeDesc); err != nil {
return err
}
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
done := false

batchSize := rangeBatchSize.Get(p.ExecCfg().SV())
for !done {
var ranges []kv.KeyValue

// Scan ranges up to rangeBatchSize.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
metaStart := keys.RangeMetaKey(keys.MustAddr(rangeSpan.Key).Next())
metaEnd := keys.RangeMetaKey(keys.MustAddr(rangeSpan.EndKey))

kvs, err := txn.Scan(ctx, metaStart, metaEnd, batchSize)
if err != nil {
return err
}
if len(kvs) < int(batchSize) {
done = true
if len(kvs) == 0 || !kvs[len(kvs)-1].Key.Equal(metaEnd.AsRawKey()) {
// Normally we need to scan one more KV because the ranges are addressed by
// the end key.
extraKV, err := txn.Scan(ctx, metaEnd, keys.Meta2Prefix.PrefixEnd(), 1 /* one result */)
if err != nil {
return err
}
kvs = append(kvs, extraKV[0])
}
}
ranges = kvs
return nil
}); err != nil {
return err
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err

// Send these to each goroutine worker.
for _, r := range ranges {
if err := r.ValueProto(&rangeDesc); err != nil {
return err
}
rangeSpan.Key = rangeDesc.EndKey.AsRawKey()
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err
}
ch <- nextRange
}
ch <- nextRange
}
return nil
}(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,11 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
})
defer cleanupFunc()

rangeBatchSize := 1 + rng.Intn(3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the default is 100, should the test be closer to that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we used a smaller number here here because we have less ranges (maximum of ~8).

t.Logf("range batch size: %d", rangeBatchSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random thought - we should have one place to randomize + log all the TTL settings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is more or less this above -- it's just this is the only one randomised by a cluster setting


th.sqlDB.Exec(t, tc.createTable)
th.sqlDB.Exec(t, `SET CLUSTER SETTING sql.ttl.range_batch_size = $1`, rangeBatchSize)

// Extract the columns from CREATE TABLE.
stmt, err := parser.ParseOne(tc.createTable)
Expand Down