Skip to content

Commit

Permalink
ttljob: paginate ranges when doing TTL
Browse files Browse the repository at this point in the history
This commit batches ranges instead of scanning them in all at once. The
range batch size is controlled by the cluster setting
`sql.ttl.range_batch_size`. I don't anticipate this needing to be
controlled per table.

Release note: None
  • Loading branch information
otan committed Feb 17, 2022
1 parent ce9a343 commit d16ada0
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 19 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ sql.trace.txn.enable_threshold duration 0s duration beyond which all transaction
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job
sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table
timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.
timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
<tr><td><code>sql.ttl.default_delete_batch_size</code></td><td>integer</td><td><code>100</code></td><td>default amount of rows to delete in a single query during a TTL job</td></tr>
<tr><td><code>sql.ttl.default_range_concurrency</code></td><td>integer</td><td><code>1</code></td><td>default amount of ranges to process at once during a TTL delete</td></tr>
<tr><td><code>sql.ttl.default_select_batch_size</code></td><td>integer</td><td><code>500</code></td><td>default amount of rows to select in a single query during a TTL job</td></tr>
<tr><td><code>sql.ttl.range_batch_size</code></td><td>integer</td><td><code>100</code></td><td>amount of ranges to fetch at a time for a table</td></tr>
<tr><td><code>timeseries.storage.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td></tr>
<tr><td><code>timeseries.storage.resolution_10s.ttl</code></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td></tr>
<tr><td><code>timeseries.storage.resolution_30m.ttl</code></td><td>duration</td><td><code>2160h0m0s</code></td><td>the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.</td></tr>
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
74 changes: 56 additions & 18 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -56,6 +55,13 @@ var (
1,
settings.PositiveInt,
).WithPublic()
rangeBatchSize = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.ttl.range_batch_size",
"amount of ranges to fetch at a time for a table",
100,
settings.PositiveInt,
).WithPublic()
)

type rowLevelTTLResumer struct {
Expand Down Expand Up @@ -93,7 +99,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
var pkColumns []string
var pkTypes []*types.T
var pkDirs []descpb.IndexDescriptor_Direction
var ranges []kv.KeyValue
var rangeSpan roachpb.Span
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := p.ExtendedEvalContext().Descs.GetImmutableTableByID(
ctx,
Expand Down Expand Up @@ -127,11 +133,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
if ttl == nil {
return errors.Newf("unable to find TTL on table %s", desc.GetName())
}

ranges, err = kvclient.ScanMetaKVs(ctx, txn, desc.TableSpan(p.ExecCfg().Codec))
if err != nil {
return err
}
rangeSpan = desc.TableSpan(p.ExecCfg().Codec)
ttlSettings = *ttl
return nil
}); err != nil {
Expand Down Expand Up @@ -183,20 +185,56 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
close(ch)
retErr = errors.CombineErrors(retErr, g.Wait())
}()
for _, r := range ranges {
if err := r.ValueProto(&rangeDesc); err != nil {
return err
}
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
done := false

batchSize := rangeBatchSize.Get(p.ExecCfg().SV())
for !done {
var ranges []kv.KeyValue

// Scan ranges up to rangeBatchSize.
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
metaStart := keys.RangeMetaKey(keys.MustAddr(rangeSpan.Key).Next())
metaEnd := keys.RangeMetaKey(keys.MustAddr(rangeSpan.EndKey))

kvs, err := txn.Scan(ctx, metaStart, metaEnd, batchSize)
if err != nil {
return err
}
if len(kvs) < int(batchSize) {
done = true
if len(kvs) == 0 || !kvs[len(kvs)-1].Key.Equal(metaEnd.AsRawKey()) {
// Normally we need to scan one more KV because the ranges are addressed by
// the end key.
extraKV, err := txn.Scan(ctx, metaEnd, keys.Meta2Prefix.PrefixEnd(), 1 /* one result */)
if err != nil {
return err
}
kvs = append(kvs, extraKV[0])
}
}
ranges = kvs
return nil
}); err != nil {
return err
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err

// Send these to each goroutine worker.
for _, r := range ranges {
if err := r.ValueProto(&rangeDesc); err != nil {
return err
}
rangeSpan.Key = rangeDesc.EndKey.AsRawKey()
var nextRange rangeToProcess
nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err
}
nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc)
if err != nil {
return err
}
ch <- nextRange
}
ch <- nextRange
}
return nil
}(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,11 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
})
defer cleanupFunc()

rangeBatchSize := 1 + rng.Intn(3)
t.Logf("range batch size: %d", rangeBatchSize)

th.sqlDB.Exec(t, tc.createTable)
th.sqlDB.Exec(t, `SET CLUSTER SETTING sql.ttl.range_batch_size = $1`, rangeBatchSize)

// Extract the columns from CREATE TABLE.
stmt, err := parser.ParseOne(tc.createTable)
Expand Down

0 comments on commit d16ada0

Please sign in to comment.