From d16ada00c4b86b1a48934547c71c921cd0baed5a Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Thu, 17 Feb 2022 22:35:02 +1100 Subject: [PATCH] ttljob: paginate ranges when doing TTL This commit batches ranges instead of scanning them in all at once. The range batch size is controlled by the cluster setting `sql.ttl.range_batch_size`. I don't anticipate this needing to be controlled per table. Release note: None --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/sql/ttl/ttljob/BUILD.bazel | 1 - pkg/sql/ttl/ttljob/ttljob.go | 74 ++++++++++++++----- pkg/sql/ttl/ttljob/ttljob_test.go | 4 + 5 files changed, 62 insertions(+), 19 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 063c69ddee42..8e5eb5986f9e 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -171,6 +171,7 @@ sql.trace.txn.enable_threshold duration 0s duration beyond which all transaction sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job +sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 8d535d4ea4a8..75180456f1d9 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -184,6 +184,7 @@ sql.ttl.default_delete_batch_sizeinteger100default amount of rows to delete in a single query during a TTL job sql.ttl.default_range_concurrencyinteger1default amount of ranges to process at once during a TTL delete sql.ttl.default_select_batch_sizeinteger500default amount of rows to select in a single query during a TTL job +sql.ttl.range_batch_sizeinteger100amount of ranges to fetch at a time for a table timeseries.storage.enabledbooleantrueif set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttlduration240h0m0sthe maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttlduration2160h0m0sthe maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index abc7ca2df127..211a521e1586 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 859a79b845e5..0f1cca06b54a 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -56,6 +55,13 @@ var ( 1, settings.PositiveInt, ).WithPublic() + rangeBatchSize = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.ttl.range_batch_size", + "amount of ranges to fetch at a time for a table", + 100, + settings.PositiveInt, + ).WithPublic() ) type rowLevelTTLResumer struct { @@ -93,7 +99,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err var pkColumns []string var pkTypes []*types.T var pkDirs []descpb.IndexDescriptor_Direction - var ranges []kv.KeyValue + var rangeSpan roachpb.Span if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { desc, err := p.ExtendedEvalContext().Descs.GetImmutableTableByID( ctx, @@ -127,11 +133,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err if ttl == nil { return errors.Newf("unable to find TTL on table %s", desc.GetName()) } - - ranges, err = kvclient.ScanMetaKVs(ctx, txn, desc.TableSpan(p.ExecCfg().Codec)) - if err != nil { - return err - } + rangeSpan = desc.TableSpan(p.ExecCfg().Codec) ttlSettings = *ttl return nil }); err != nil { @@ -183,20 +185,56 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err close(ch) retErr = errors.CombineErrors(retErr, g.Wait()) }() - for _, r := range ranges { - if err := r.ValueProto(&rangeDesc); err != nil { - return err - } - var nextRange rangeToProcess - nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc) - if err != nil { + done := false + + batchSize := rangeBatchSize.Get(p.ExecCfg().SV()) + for !done { + var ranges []kv.KeyValue + + // Scan ranges up to rangeBatchSize. + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + metaStart := keys.RangeMetaKey(keys.MustAddr(rangeSpan.Key).Next()) + metaEnd := keys.RangeMetaKey(keys.MustAddr(rangeSpan.EndKey)) + + kvs, err := txn.Scan(ctx, metaStart, metaEnd, batchSize) + if err != nil { + return err + } + if len(kvs) < int(batchSize) { + done = true + if len(kvs) == 0 || !kvs[len(kvs)-1].Key.Equal(metaEnd.AsRawKey()) { + // Normally we need to scan one more KV because the ranges are addressed by + // the end key. + extraKV, err := txn.Scan(ctx, metaEnd, keys.Meta2Prefix.PrefixEnd(), 1 /* one result */) + if err != nil { + return err + } + kvs = append(kvs, extraKV[0]) + } + } + ranges = kvs + return nil + }); err != nil { return err } - nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc) - if err != nil { - return err + + // Send these to each goroutine worker. + for _, r := range ranges { + if err := r.ValueProto(&rangeDesc); err != nil { + return err + } + rangeSpan.Key = rangeDesc.EndKey.AsRawKey() + var nextRange rangeToProcess + nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc) + if err != nil { + return err + } + nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, p.ExecCfg().Codec, pkTypes, pkDirs, &alloc) + if err != nil { + return err + } + ch <- nextRange } - ch <- nextRange } return nil }(); err != nil { diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go index 0de832595250..572a1db3bfe3 100644 --- a/pkg/sql/ttl/ttljob/ttljob_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_test.go @@ -317,7 +317,11 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { }) defer cleanupFunc() + rangeBatchSize := 1 + rng.Intn(3) + t.Logf("range batch size: %d", rangeBatchSize) + th.sqlDB.Exec(t, tc.createTable) + th.sqlDB.Exec(t, `SET CLUSTER SETTING sql.ttl.range_batch_size = $1`, rangeBatchSize) // Extract the columns from CREATE TABLE. stmt, err := parser.ParseOne(tc.createTable)