diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 1dbb6e204cb5..28599825f390 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -984,11 +984,24 @@ message AutoSQLStatsCompactionProgress { } message RowLevelTTLDetails { + + // TableID is the ID of the table that the TTL job removes records from. uint32 table_id = 1 [ (gogoproto.customname) = "TableID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID" ]; + + // Cutoff is compared against execinfrapb.TTLSpec.TTLExpr by the + // ttlProcessor to determine what records to delete. Records are deleted + // if TTLExpr <= Cutoff. google.protobuf.Timestamp cutoff = 2 [(gogoproto.nullable)=false, (gogoproto.stdtime) = true]; + + // TableVersion is the table descriptor version of the table when the TTLJob + // started. + uint64 table_version = 3 [ + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.DescriptorVersion" + ]; + } message RowLevelTTLProgress { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index ec76312fd5da..c7dd5246c6b1 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -842,6 +842,18 @@ func NewMetadataCallbackWriter( return &MetadataCallbackWriter{rowResultWriter: rowResultWriter, fn: metaFn} } +// NewMetadataOnlyMetadataCallbackWriter creates a new MetadataCallbackWriter +// that uses errOnlyResultWriter and only supports receiving +// execinfrapb.ProducerMetadata. +func NewMetadataOnlyMetadataCallbackWriter() *MetadataCallbackWriter { + return NewMetadataCallbackWriter( + &errOnlyResultWriter{}, + func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error { + return nil + }, + ) +} + // errOnlyResultWriter is a rowResultWriter and batchResultWriter that only // supports receiving an error. All other functions that deal with producing // results panic. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 46b961dca2ce..77def3b99754 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -54,7 +54,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" @@ -1528,13 +1527,18 @@ type TTLTestingKnobs struct { // AOSTDuration changes the AOST timestamp duration to add to the // current time. AOSTDuration *time.Duration - // ReturnStatsError causes stats errors to be returned instead of logged as warnings. + // RequireMultipleSpanPartitions is a flag to verify that the DistSQL will + // distribute the work across multiple nodes. + RequireMultipleSpanPartitions bool + // ReturnStatsError causes stats errors to be returned instead of logged as + // warnings. ReturnStatsError bool - // MockTableDescriptorVersionDuringDelete is a version to mock the table descriptor - // as during delete. - MockTableDescriptorVersionDuringDelete *descpb.DescriptorVersion - // PreSelectDeleteStatement runs before the start of the TTL select-delete loop - PreSelectDeleteStatement string + // PreDeleteChangeTableVersion is a flag to change the table descriptor + // during a delete. + PreDeleteChangeTableVersion bool + // PreSelectStatement runs before the start of the TTL select-delete + // loop. + PreSelectStatement string } // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/execinfrapb/BUILD.bazel b/pkg/sql/execinfrapb/BUILD.bazel index df62b4d8dac6..b4a2e6d31325 100644 --- a/pkg/sql/execinfrapb/BUILD.bazel +++ b/pkg/sql/execinfrapb/BUILD.bazel @@ -105,6 +105,7 @@ proto_library( "processors_changefeeds.proto", "processors_sql.proto", "processors_table_stats.proto", + "processors_ttl.proto", ], strip_import_prefix = "/pkg", visibility = ["//visibility:public"], diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 35e720e93955..84077e735bed 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -580,6 +580,16 @@ func (s *ChangeFrontierSpec) summary() (string, []string) { return "ChangeFrontier", []string{} } +// summary implements the diagramCellType interface. +func (s *TTLSpec) summary() (string, []string) { + details := s.RowLevelTTLDetails + return "TTL", []string{ + fmt.Sprintf("JobID: %d", s.JobID), + fmt.Sprintf("TableID: %d", details.TableID), + fmt.Sprintf("TableVersion: %d", details.TableVersion), + } +} + type diagramCell struct { Title string `json:"title"` Details []string `json:"details"` diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index 91c5bce4850b..0be96fcde427 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -22,6 +22,7 @@ option go_package = "execinfrapb"; import "sql/execinfrapb/data.proto"; import "sql/execinfrapb/processors_base.proto"; import "sql/execinfrapb/processors_sql.proto"; +import "sql/execinfrapb/processors_ttl.proto"; import "sql/execinfrapb/processors_bulk_io.proto"; import "sql/execinfrapb/processors_changefeeds.proto"; import "sql/execinfrapb/processors_table_stats.proto"; @@ -120,6 +121,7 @@ message ProcessorCoreUnion { optional StreamIngestionFrontierSpec streamIngestionFrontier = 36; optional ExportSpec exporter = 37; optional IndexBackfillMergerSpec indexBackfillMerger = 38; + optional TTLSpec ttl = 39; reserved 6, 12, 14, 17, 18, 19, 20; } diff --git a/pkg/sql/execinfrapb/processors_ttl.proto b/pkg/sql/execinfrapb/processors_ttl.proto new file mode 100644 index 000000000000..5a2c1d2e871d --- /dev/null +++ b/pkg/sql/execinfrapb/processors_ttl.proto @@ -0,0 +1,85 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// +// Processor definitions for distributed SQL APIs. See +// docs/RFCS/distributed_sql.md. +// All the concepts here are "physical plan" concepts. + +syntax = "proto2"; +// Beware! This package name must not be changed, even though it doesn't match +// the Go package name, because it defines the Protobuf message names which +// can't be changed without breaking backward compatibility. +package cockroach.sql.distsqlrun; +option go_package = "execinfrapb"; + +import "gogoproto/gogo.proto"; +import "google/protobuf/timestamp.proto"; +import "roachpb/data.proto"; +import "jobs/jobspb/jobs.proto"; + +message TTLSpec { + + // JobID of the job that ran the ttlProcessor. + optional int64 job_id = 1 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "JobID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb.JobID" + ]; + + // RowLevelTTLDetails are the details of the job that ran the ttlProcessor. + optional jobs.jobspb.RowLevelTTLDetails row_level_ttl_details = 2 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "RowLevelTTLDetails" + ]; + + // AOST is the AS OF SYSTEM TIME value the ttlProcessor uses to select records. + optional google.protobuf.Timestamp aost = 3 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "AOST", + (gogoproto.stdtime) = true + ]; + + // TTLExpr is compared against jobspb.RowLevelTTLDetails.Cutoff by the + // ttlProcessor to determine what records to delete. Records are deleted + // if TTLExpr <= Cutoff. + optional string ttl_expr = 4 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "TTLExpr", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.Expression" + ]; + + // Spans determine which records are processed by which nodes in the DistSQL + // flow. + repeated roachpb.Span spans = 5 [(gogoproto.nullable) = false]; + + // RangeConcurrency controls how many ranges a single ttlProcessor processes + // in parallel. + optional int64 range_concurrency = 6 [(gogoproto.nullable) = false]; + + // SelectBatchSize controls the batch size for SELECTs. + optional int64 select_batch_size = 7 [(gogoproto.nullable) = false]; + + // DeleteBatchSize controls the batch size for DELETEs. + optional int64 delete_batch_size = 8 [(gogoproto.nullable) = false]; + + // DeleteRateLimit controls how many records can be deleted per second. + optional int64 delete_rate_limit = 9 [(gogoproto.nullable) = false]; + + // LabelMetrics controls if metrics are labeled with the name of the table being TTLed. + optional bool label_metrics = 10 [(gogoproto.nullable) = false]; + + // PreDeleteChangeTableVersion is a test flag to change the table + // descriptor before a delete. + optional bool pre_delete_change_table_version = 11 [(gogoproto.nullable) = false]; + + // PreSelectStatement is a test setting to run a SQL statement + // before selecting records. + optional string pre_select_statement = 12 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 5d744a7fb31c..2106e1cb5592 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -359,6 +359,12 @@ func NewProcessor( } return backfill.NewIndexBackfillMerger(ctx, flowCtx, *core.IndexBackfillMerger, outputs[0]) } + if core.Ttl != nil { + if err := checkNumInOut(inputs, outputs, 0, 1); err != nil { + return nil, err + } + return NewTTLProcessor(flowCtx, processorID, *core.Ttl, outputs[0]) + } return nil, errors.Errorf("unsupported processor core %q", core) } @@ -391,3 +397,6 @@ var NewChangeFrontierProcessor func(*execinfra.FlowCtx, int32, execinfrapb.Chang // NewStreamIngestionFrontierProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewStreamIngestionFrontierProcessor func(*execinfra.FlowCtx, int32, execinfrapb.StreamIngestionFrontierSpec, execinfra.RowSource, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error) + +// NewTTLProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. +var NewTTLProcessor func(*execinfra.FlowCtx, int32, execinfrapb.TTLSpec, execinfra.RowReceiver) (execinfra.Processor, error) diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index b3bf06310b62..7b43ed3e990c 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttljob", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", @@ -27,12 +28,17 @@ go_library( "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", + "//pkg/sql/execinfra", + "//pkg/sql/execinfrapb", "//pkg/sql/lexbase", + "//pkg/sql/physicalplan", "//pkg/sql/rowenc", + "//pkg/sql/rowexec", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/sqltelemetry", + "//pkg/sql/sqlutil", "//pkg/sql/types", "//pkg/util/ctxgroup", "//pkg/util/log", @@ -68,7 +74,6 @@ go_test( "//pkg/server", "//pkg/sql", "//pkg/sql/catalog/colinfo", - "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/desctestutils", "//pkg/sql/lexbase", "//pkg/sql/parser", diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index f2e1ee80e3f6..1520d78d742f 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -15,19 +15,23 @@ import ( "math" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -78,10 +82,6 @@ type rowLevelTTLResumer struct { var _ jobs.Resumer = (*rowLevelTTLResumer)(nil) -type rangeToProcess struct { - startPK, endPK tree.Datums -} - // Resume implements the jobs.Resumer interface. func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) error { jobExecCtx := execCtx.(sql.JobExecContext) @@ -109,9 +109,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err } aost := details.Cutoff.Add(aostDuration) - var tableVersion descpb.DescriptorVersion var rowLevelTTL catpb.RowLevelTTL var relationName string + var entirePKSpan roachpb.Span if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { desc, err := descsCol.GetImmutableTableByID( ctx, @@ -122,7 +122,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err if err != nil { return err } - tableVersion = desc.GetVersion() // If the AOST timestamp is before the latest descriptor timestamp, exit // early as the delete will not work. if desc.GetModificationTime().GoTime().After(aost) { @@ -146,28 +145,22 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err if err != nil { return errors.Wrapf(err, "error fetching table relation name for TTL") } - relationName = tn.FQString() + + entirePKSpan = desc.PrimaryIndexSpan(execCfg.Codec) return nil }); err != nil { return err } - group := ctxgroup.WithContext(ctx) - - rangeConcurrency := getRangeConcurrency(settingsValues, rowLevelTTL) - selectBatchSize := getSelectBatchSize(settingsValues, rowLevelTTL) - deleteBatchSize := getDeleteBatchSize(settingsValues, rowLevelTTL) - deleteRateLimit := getDeleteRateLimit(settingsValues, rowLevelTTL) - ttlExpr := colinfo.DefaultTTLExpirationExpr if rowLevelTTL.HasExpirationExpr() { ttlExpr = "(" + rowLevelTTL.ExpirationExpr + ")" } labelMetrics := rowLevelTTL.LabelMetrics - - rowCount, err := func() (int64, error) { + group := ctxgroup.WithContext(ctx) + err := func() error { statsCloseCh := make(chan struct{}) defer close(statsCloseCh) if rowLevelTTL.RowStatsPollInterval != 0 { @@ -208,38 +201,99 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err }) } - return t.work( + distSQLPlanner := jobExecCtx.DistSQLPlanner() + evalCtx := jobExecCtx.ExtendedEvalContext() + + // We don't return the compatible nodes here since PartitionSpans will + // filter out incompatible nodes. + planCtx, _, err := distSQLPlanner.SetupAllNodesPlanning(ctx, evalCtx, execCfg) + if err != nil { + return err + } + spanPartitions, err := distSQLPlanner.PartitionSpans(ctx, planCtx, []roachpb.Span{entirePKSpan}) + if err != nil { + return err + } + if knobs.RequireMultipleSpanPartitions && len(spanPartitions) == 0 { + return errors.New("multiple span partitions required") + } + + sqlInstanceIDToTTLSpec := make(map[base.SQLInstanceID]*execinfrapb.TTLSpec) + for _, spanPartition := range spanPartitions { + ttlSpec := &execinfrapb.TTLSpec{ + JobID: t.job.ID(), + RowLevelTTLDetails: details, + AOST: aost, + TTLExpr: ttlExpr, + Spans: spanPartition.Spans, + RangeConcurrency: getRangeConcurrency(settingsValues, rowLevelTTL), + SelectBatchSize: getSelectBatchSize(settingsValues, rowLevelTTL), + DeleteBatchSize: getDeleteBatchSize(settingsValues, rowLevelTTL), + DeleteRateLimit: getDeleteRateLimit(settingsValues, rowLevelTTL), + LabelMetrics: rowLevelTTL.LabelMetrics, + PreDeleteChangeTableVersion: knobs.PreDeleteChangeTableVersion, + PreSelectStatement: knobs.PreSelectStatement, + } + sqlInstanceIDToTTLSpec[spanPartition.SQLInstanceID] = ttlSpec + } + + // Setup a one-stage plan with one proc per input spec. + processorCorePlacements := make([]physicalplan.ProcessorCorePlacement, len(sqlInstanceIDToTTLSpec)) + i := 0 + for sqlInstanceID, ttlSpec := range sqlInstanceIDToTTLSpec { + processorCorePlacements[i].SQLInstanceID = sqlInstanceID + processorCorePlacements[i].Core.Ttl = ttlSpec + i++ + } + + physicalPlan := planCtx.NewPhysicalPlan() + // Job progress is updated inside ttlProcessor, so we + // have an empty result stream. + physicalPlan.AddNoInputStage( + processorCorePlacements, + execinfrapb.PostProcessSpec{}, + []*types.T{}, + execinfrapb.Ordering{}, + ) + physicalPlan.PlanToStreamColMap = []int{} + + distSQLPlanner.FinalizePlan(planCtx, physicalPlan) + + metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter() + + distSQLReceiver := sql.MakeDistSQLReceiver( + ctx, + metadataCallbackWriter, + tree.Rows, + execCfg.RangeDescriptorCache, + nil, /* txn */ + nil, /* clockUpdater */ + evalCtx.Tracing, + execCfg.ContentionRegistry, + nil, /* testingPushCallback */ + ) + defer distSQLReceiver.Release() + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := *evalCtx + cleanup := distSQLPlanner.Run( ctx, - db, - rangeConcurrency, - execCfg, - details, - descsCol, - knobs, - tableVersion, - selectBatchSize, - deleteBatchSize, - deleteRateLimit, - aost, - ttlExpr, + planCtx, + nil, /* txn */ + physicalPlan, + distSQLReceiver, + &evalCtxCopy, + nil, /* finishedSetupFn */ ) + defer cleanup() + + return metadataCallbackWriter.Err() }() if err != nil { return err } - if err := group.Wait(); err != nil { - return err - } - - return db.Txn(ctx, func(_ context.Context, txn *kv.Txn) error { - return t.job.Update(ctx, txn, func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - progress := md.Progress - progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount += rowCount - ju.UpdateProgress(progress) - return nil - }) - }) + return group.Wait() } func checkEnabled(settingsValues *settings.Values) error { diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index 2ddeb1b808e8..57b64ca99e61 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -15,20 +15,25 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/quotapool" @@ -36,22 +41,32 @@ import ( "github.com/cockroachdb/errors" ) -func (t rowLevelTTLResumer) work( - ctx context.Context, - db *kv.DB, - rangeConcurrency int64, - execCfg *sql.ExecutorConfig, - details jobspb.RowLevelTTLDetails, - descsCol *descs.Collection, - knobs sql.TTLTestingKnobs, - tableVersion descpb.DescriptorVersion, - selectBatchSize int64, - deleteBatchSize int64, - deleteRateLimit int64, - aost time.Time, - ttlExpr catpb.Expression, -) (int64, error) { +type ttlProcessor struct { + execinfra.ProcessorBase + ttlSpec execinfrapb.TTLSpec +} + +func (ttl *ttlProcessor) Start(ctx context.Context) { + ctx = ttl.StartInternal(ctx, "ttl") + err := ttl.work(ctx) + ttl.MoveToDraining(err) +} +func (ttl *ttlProcessor) work(ctx context.Context) error { + + ttlSpec := ttl.ttlSpec + flowCtx := ttl.FlowCtx + descsCol := flowCtx.Descriptors + serverCfg := flowCtx.Cfg + db := serverCfg.DB + codec := serverCfg.Codec + details := ttlSpec.RowLevelTTLDetails + ttlExpr := ttlSpec.TTLExpr + rangeConcurrency := ttlSpec.RangeConcurrency + selectBatchSize := ttlSpec.SelectBatchSize + deleteBatchSize := ttlSpec.DeleteBatchSize + + deleteRateLimit := ttlSpec.DeleteRateLimit deleteRateLimiter := quotapool.NewRateLimiter( "ttl-delete", quotapool.Limit(deleteRateLimit), @@ -63,7 +78,6 @@ func (t rowLevelTTLResumer) work( var relationName string var pkColumns []string var pkTypes []*types.T - var rangeSpan, entirePKSpan roachpb.Span var labelMetrics bool if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { desc, err := descsCol.GetImmutableTableByID( @@ -99,43 +113,42 @@ func (t rowLevelTTLResumer) work( } relationName = tn.FQString() - entirePKSpan = desc.PrimaryIndexSpan(execCfg.Codec) - rangeSpan = entirePKSpan return nil }); err != nil { - return rowCount, err + return err } - metrics := execCfg.JobRegistry.MetricsStruct().RowLevelTTL.(*RowLevelTTLAggMetrics).loadMetrics( + metrics := serverCfg.JobRegistry.MetricsStruct().RowLevelTTL.(*RowLevelTTLAggMetrics).loadMetrics( labelMetrics, relationName, ) group := ctxgroup.WithContext(ctx) - if err := func() error { + err := func() error { rangeChan := make(chan rangeToProcess, rangeConcurrency) defer close(rangeChan) for i := int64(0); i < rangeConcurrency; i++ { group.GoCtx(func(ctx context.Context) error { - for r := range rangeChan { + for rangeToProcess := range rangeChan { start := timeutil.Now() rangeRowCount, err := runTTLOnRange( ctx, - execCfg, details, + db, + serverCfg.Executor, + serverCfg.Settings, descsCol, - knobs, metrics, - tableVersion, - r.startPK, - r.endPK, + rangeToProcess, pkColumns, relationName, selectBatchSize, deleteBatchSize, deleteRateLimiter, - aost, + ttlSpec.AOST, ttlExpr, + ttlSpec.PreDeleteChangeTableVersion, + ttlSpec.PreSelectStatement, ) // add before returning err in case of partial success atomic.AddInt64(&rowCount, rangeRowCount) @@ -143,7 +156,7 @@ func (t rowLevelTTLResumer) work( if err != nil { // Continue until channel is fully read. // Otherwise, the keys input will be blocked. - for r = range rangeChan { + for rangeToProcess = range rangeChan { } return err } @@ -154,106 +167,124 @@ func (t rowLevelTTLResumer) work( // Iterate over every range to feed work for the goroutine processors. var alloc tree.DatumAlloc - ri := kvcoord.MakeRangeIterator(execCfg.DistSender) - ri.Seek(ctx, roachpb.RKey(entirePKSpan.Key), kvcoord.Ascending) - for done := false; ri.Valid() && !done; ri.Next(ctx) { - // Send range info to each goroutine worker. - rangeDesc := ri.Desc() - var nextRange rangeToProcess - // A single range can contain multiple tables or indexes. - // If this is the case, the rangeDesc.StartKey would be less than entirePKSpan.Key - // or the rangeDesc.EndKey would be greater than the entirePKSpan.EndKey, meaning - // the range contains the start or the end of the range respectively. - // Trying to decode keys outside the PK range will lead to a decoding error. - // As such, only populate nextRange.startPK and nextRange.endPK if this is the case - // (by default, a 0 element startPK or endPK means the beginning or end). - if rangeDesc.StartKey.AsRawKey().Compare(entirePKSpan.Key) > 0 { - var err error - nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, execCfg.Codec, pkTypes, &alloc) - if err != nil { - return errors.Wrapf( - err, - "error decoding starting PRIMARY KEY for range ID %d (start key %x, table start key %x)", - rangeDesc.RangeID, - rangeDesc.StartKey.AsRawKey(), - entirePKSpan.Key, - ) + ri := kvcoord.MakeRangeIterator(serverCfg.DistSender) + for _, span := range ttlSpec.Spans { + rangeSpan := span + ri.Seek(ctx, roachpb.RKey(span.Key), kvcoord.Ascending) + for done := false; ri.Valid() && !done; ri.Next(ctx) { + // Send range info to each goroutine worker. + rangeDesc := ri.Desc() + var nextRange rangeToProcess + // A single range can contain multiple tables or indexes. + // If this is the case, the rangeDesc.StartKey would be less than span.Key + // or the rangeDesc.EndKey would be greater than the span.EndKey, meaning + // the range contains the start or the end of the range respectively. + // Trying to decode keys outside the PK range will lead to a decoding error. + // As such, only populate nextRange.startPK and nextRange.endPK if this is the case + // (by default, a 0 element startPK or endPK means the beginning or end). + if rangeDesc.StartKey.AsRawKey().Compare(span.Key) > 0 { + var err error + nextRange.startPK, err = keyToDatums(rangeDesc.StartKey, codec, pkTypes, &alloc) + if err != nil { + return errors.Wrapf( + err, + "error decoding starting PRIMARY KEY for range ID %d (start key %x, table start key %x)", + rangeDesc.RangeID, + rangeDesc.StartKey.AsRawKey(), + span.Key, + ) + } } - } - if rangeDesc.EndKey.AsRawKey().Compare(entirePKSpan.EndKey) < 0 { - rangeSpan.Key = rangeDesc.EndKey.AsRawKey() - var err error - nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, execCfg.Codec, pkTypes, &alloc) - if err != nil { - return errors.Wrapf( - err, - "error decoding ending PRIMARY KEY for range ID %d (end key %x, table end key %x)", - rangeDesc.RangeID, - rangeDesc.EndKey.AsRawKey(), - entirePKSpan.EndKey, - ) + if rangeDesc.EndKey.AsRawKey().Compare(span.EndKey) < 0 { + rangeSpan.Key = rangeDesc.EndKey.AsRawKey() + var err error + nextRange.endPK, err = keyToDatums(rangeDesc.EndKey, codec, pkTypes, &alloc) + if err != nil { + return errors.Wrapf( + err, + "error decoding ending PRIMARY KEY for range ID %d (end key %x, table end key %x)", + rangeDesc.RangeID, + rangeDesc.EndKey.AsRawKey(), + span.EndKey, + ) + } + } else { + done = true } - } else { - done = true + rangeChan <- nextRange } - rangeChan <- nextRange } return nil - }(); err != nil { - return rowCount, err + }() + if err != nil { + return err } - return rowCount, group.Wait() + if err := group.Wait(); err != nil { + return err + } + + job, err := serverCfg.JobRegistry.LoadJob(ctx, ttlSpec.JobID) + if err != nil { + return err + } + return db.Txn(ctx, func(_ context.Context, txn *kv.Txn) error { + return job.Update(ctx, txn, func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + progress := md.Progress + progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount += rowCount + ju.UpdateProgress(progress) + return nil + }) + }) } // rangeRowCount should be checked even if the function returns an error because it may have partially succeeded func runTTLOnRange( ctx context.Context, - execCfg *sql.ExecutorConfig, details jobspb.RowLevelTTLDetails, + db *kv.DB, + ie sqlutil.InternalExecutor, + settings *cluster.Settings, descsCol *descs.Collection, - knobs sql.TTLTestingKnobs, metrics rowLevelTTLMetrics, - tableVersion descpb.DescriptorVersion, - startPK tree.Datums, - endPK tree.Datums, + rangeToProcess rangeToProcess, pkColumns []string, relationName string, selectBatchSize, deleteBatchSize int64, deleteRateLimiter *quotapool.RateLimiter, aost time.Time, ttlExpr catpb.Expression, + preDeleteChangeTableVersion bool, + preSelectStatement string, ) (rangeRowCount int64, err error) { metrics.NumActiveRanges.Inc(1) defer metrics.NumActiveRanges.Dec(1) - ie := execCfg.InternalExecutor - db := execCfg.DB - // TODO(#76914): look at using a dist sql flow job, utilize any existing index // on crdb_internal_expiration. + tableID := details.TableID + cutoff := details.Cutoff selectBuilder := makeSelectQueryBuilder( - details.TableID, - details.Cutoff, + tableID, + cutoff, pkColumns, relationName, - startPK, - endPK, + rangeToProcess, aost, selectBatchSize, ttlExpr, ) deleteBuilder := makeDeleteQueryBuilder( - details.TableID, - details.Cutoff, + tableID, + cutoff, pkColumns, relationName, deleteBatchSize, ttlExpr, ) - if preSelectDeleteStatement := knobs.PreSelectDeleteStatement; preSelectDeleteStatement != "" { + if preSelectStatement != "" { if _, err := ie.ExecEx( ctx, "pre-select-delete-statement", @@ -261,7 +292,7 @@ func runTTLOnRange( sessiondata.InternalExecutorOverride{ User: username.RootUserName(), }, - preSelectDeleteStatement, + preSelectStatement, ); err != nil { return rangeRowCount, err } @@ -269,7 +300,7 @@ func runTTLOnRange( for { // Check the job is enabled on every iteration. - if err := checkEnabled(execCfg.SV()); err != nil { + if err := checkEnabled(&settings.SV); err != nil { return rangeRowCount, err } @@ -304,11 +335,7 @@ func runTTLOnRange( if err != nil { return err } - version := desc.GetVersion() - if mockVersion := knobs.MockTableDescriptorVersionDuringDelete; mockVersion != nil { - version = *mockVersion - } - if version != tableVersion { + if preDeleteChangeTableVersion || desc.GetVersion() != details.TableVersion { return errors.Newf( "table has had a schema change since the job has started at %s, aborting", desc.GetModificationTime().GoTime().Format(time.RFC3339), @@ -396,3 +423,35 @@ func keyToDatums( } return datums, nil } + +func (ttl *ttlProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + return nil, ttl.DrainHelper() +} + +func newTTLProcessor( + flowCtx *execinfra.FlowCtx, + processorID int32, + spec execinfrapb.TTLSpec, + output execinfra.RowReceiver, +) (execinfra.Processor, error) { + ttlProcessor := &ttlProcessor{ + ttlSpec: spec, + } + if err := ttlProcessor.Init( + ttlProcessor, + &execinfrapb.PostProcessSpec{}, + []*types.T{}, + flowCtx, + processorID, + output, + nil, /* memMonitor */ + execinfra.ProcStateOpts{}, + ); err != nil { + return nil, err + } + return ttlProcessor, nil +} + +func init() { + rowexec.NewTTLProcessor = newTTLProcessor +} diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go index d86e8781d33b..bdd2b10f2e9c 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go @@ -18,13 +18,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/lexbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -34,7 +34,7 @@ type selectQueryBuilder struct { tableID descpb.ID pkColumns []string selectOpName string - startPK, endPK tree.Datums + rangeToProcess rangeToProcess selectBatchSize int64 aost time.Time ttlExpr catpb.Expression @@ -53,12 +53,16 @@ type selectQueryBuilder struct { endPKColumnNamesSQL string } +type rangeToProcess struct { + startPK, endPK tree.Datums +} + func makeSelectQueryBuilder( tableID descpb.ID, cutoff time.Time, pkColumns []string, relationName string, - startPK, endPK tree.Datums, + rangeToProcess rangeToProcess, aost time.Time, selectBatchSize int64, ttlExpr catpb.Expression, @@ -67,9 +71,11 @@ func makeSelectQueryBuilder( // is reserved for AOST, and len(pkColumns) for both start and end key. cachedArgs := make([]interface{}, 0, 1+len(pkColumns)*2) cachedArgs = append(cachedArgs, cutoff) + endPK := rangeToProcess.endPK for _, d := range endPK { cachedArgs = append(cachedArgs, d) } + startPK := rangeToProcess.startPK for _, d := range startPK { cachedArgs = append(cachedArgs, d) } @@ -78,8 +84,7 @@ func makeSelectQueryBuilder( tableID: tableID, pkColumns: pkColumns, selectOpName: fmt.Sprintf("ttl select %s", relationName), - startPK: startPK, - endPK: endPK, + rangeToProcess: rangeToProcess, aost: aost, selectBatchSize: selectBatchSize, ttlExpr: ttlExpr, @@ -96,9 +101,10 @@ func (b *selectQueryBuilder) buildQuery() string { // Start from $2 as $1 is for the now clause. // The end key of a range is exclusive, so use <. var endFilterClause string - if len(b.endPK) > 0 { + endPK := b.rangeToProcess.endPK + if len(endPK) > 0 { endFilterClause = fmt.Sprintf(" AND (%s) < (", b.endPKColumnNamesSQL) - for i := range b.endPK { + for i := range endPK { if i > 0 { endFilterClause += ", " } @@ -107,6 +113,7 @@ func (b *selectQueryBuilder) buildQuery() string { endFilterClause += ")" } + startPK := b.rangeToProcess.startPK var filterClause string if !b.isFirst { // After the first query, we always want (col1, ...) > (cursor_col_1, ...) @@ -117,19 +124,19 @@ func (b *selectQueryBuilder) buildQuery() string { } // We start from 2 if we don't have an endPK clause, but add len(b.endPK) // if there is. - filterClause += fmt.Sprintf("$%d", 2+len(b.endPK)+i) + filterClause += fmt.Sprintf("$%d", 2+len(endPK)+i) } filterClause += ")" - } else if len(b.startPK) > 0 { + } else if len(startPK) > 0 { // For the the first query, we want (col1, ...) >= (cursor_col_1, ...) - filterClause = fmt.Sprintf(" AND (%s) >= (", makeColumnNamesSQL(b.pkColumns[:len(b.startPK)])) - for i := range b.startPK { + filterClause = fmt.Sprintf(" AND (%s) >= (", makeColumnNamesSQL(b.pkColumns[:len(startPK)])) + for i := range startPK { if i > 0 { filterClause += ", " } // We start from 2 if we don't have an endPK clause, but add len(b.endPK) // if there is. - filterClause += fmt.Sprintf("$%d", 2+len(b.endPK)+i) + filterClause += fmt.Sprintf("$%d", 2+len(endPK)+i) } filterClause += ")" } @@ -165,7 +172,7 @@ func (b *selectQueryBuilder) nextQuery() (string, []interface{}) { } func (b *selectQueryBuilder) run( - ctx context.Context, ie *sql.InternalExecutor, + ctx context.Context, ie sqlutil.InternalExecutor, ) ([]tree.Datums, error) { q, args := b.nextQuery() @@ -197,7 +204,7 @@ func (b *selectQueryBuilder) moveCursor(rows []tree.Datums) error { // Move the cursor forward. if len(rows) > 0 { lastRow := rows[len(rows)-1] - b.cachedArgs = b.cachedArgs[:1+len(b.endPK)] + b.cachedArgs = b.cachedArgs[:1+len(b.rangeToProcess.endPK)] if len(lastRow) != len(b.pkColumns) { return errors.AssertionFailedf("expected %d columns for last row, got %d", len(b.pkColumns), len(lastRow)) } @@ -292,7 +299,7 @@ func (b *deleteQueryBuilder) buildQueryAndArgs(rows []tree.Datums) (string, []in } func (b *deleteQueryBuilder) run( - ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, rows []tree.Datums, + ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, rows []tree.Datums, ) (int, error) { q, deleteArgs := b.buildQueryAndArgs(rows) qosLevel := sessiondatapb.TTLLow diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go index a2f34f8b7c2a..fc518f116814 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go @@ -44,8 +44,10 @@ func TestSelectQueryBuilder(t *testing.T) { mockTime, []string{"col1", "col2"}, "relation_name", - tree.Datums{tree.NewDInt(100), tree.NewDInt(5)}, - tree.Datums{tree.NewDInt(200), tree.NewDInt(15)}, + rangeToProcess{ + startPK: tree.Datums{tree.NewDInt(100), tree.NewDInt(5)}, + endPK: tree.Datums{tree.NewDInt(200), tree.NewDInt(15)}, + }, mockTime, 2, colinfo.TTLDefaultExpirationColumnName, @@ -105,8 +107,7 @@ LIMIT 2`, mockTime, []string{"col1", "col2"}, "table_name", - nil, - nil, + rangeToProcess{}, mockTime, 2, colinfo.TTLDefaultExpirationColumnName, @@ -162,8 +163,10 @@ LIMIT 2`, mockTime, []string{"col1", "col2"}, "table_name", - tree.Datums{tree.NewDInt(100)}, - tree.Datums{tree.NewDInt(181)}, + rangeToProcess{ + startPK: tree.Datums{tree.NewDInt(100)}, + endPK: tree.Datums{tree.NewDInt(181)}, + }, mockTime, 2, colinfo.TTLDefaultExpirationColumnName, @@ -223,8 +226,9 @@ LIMIT 2`, mockTime, []string{"col1", "col2"}, "table_name", - nil, - tree.Datums{tree.NewDInt(200), tree.NewDInt(15)}, + rangeToProcess{ + endPK: tree.Datums{tree.NewDInt(200), tree.NewDInt(15)}, + }, mockTime, 2, colinfo.TTLDefaultExpirationColumnName, @@ -283,8 +287,9 @@ LIMIT 2`, mockTime, []string{"col1", "col2"}, "table_name", - tree.Datums{tree.NewDInt(100), tree.NewDInt(5)}, - nil, + rangeToProcess{ + startPK: tree.Datums{tree.NewDInt(100), tree.NewDInt(5)}, + }, mockTime, 2, colinfo.TTLDefaultExpirationColumnName, diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go index 8caf5d1292b4..8b29ad0b3928 100644 --- a/pkg/sql/ttl/ttljob/ttljob_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_test.go @@ -28,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/lexbase" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -61,7 +60,7 @@ type rowLevelTTLTestJobTestHelper struct { } func newRowLevelTTLTestJobTestHelper( - t *testing.T, testingKnobs *sql.TTLTestingKnobs, testMultiTenant bool, + t *testing.T, testingKnobs *sql.TTLTestingKnobs, testMultiTenant bool, numNodes int, ) (*rowLevelTTLTestJobTestHelper, func()) { th := &rowLevelTTLTestJobTestHelper{ env: jobstest.NewJobSchedulerTestEnv( @@ -89,36 +88,39 @@ func newRowLevelTTLTestJobTestHelper( // As `ALTER TABLE ... SPLIT AT ...` is not supported in multi-tenancy, we // do not run those tests. + tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: baseTestingKnobs, + DisableWebSessionAuthentication: true, + }, + }) + ts := tc.Server(0) if testMultiTenant { - tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - DisableWebSessionAuthentication: true, - }, - }) - ts := tc.Server(0) tenantServer, db := serverutils.StartTenant( t, ts, base.TestTenantArgs{ TenantID: serverutils.TestTenantID(), TestingKnobs: baseTestingKnobs, }, ) - require.NotNil(t, th.cfg) th.sqlDB = sqlutils.MakeSQLRunner(db) - th.kvDB = ts.DB() th.server = tenantServer - - return th, func() { - tc.Stopper().Stop(context.Background()) - } + } else { + db := serverutils.OpenDBConn( + t, + ts.ServingSQLAddr(), + "", /* useDatabase */ + false, /* insecure */ + ts.Stopper(), + ) + th.sqlDB = sqlutils.MakeSQLRunner(db) + th.server = ts } - - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{Knobs: baseTestingKnobs}) require.NotNil(t, th.cfg) - th.kvDB = kvDB - th.sqlDB = sqlutils.MakeSQLRunner(db) - th.server = s + + th.kvDB = ts.DB() + return th, func() { - s.Stopper().Stop(context.Background()) + tc.Stopper().Stop(context.Background()) } } @@ -168,7 +170,12 @@ func TestRowLevelTTLNoTestingKnobs(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - th, cleanupFunc := newRowLevelTTLTestJobTestHelper(t, nil /* SQLTestingKnobs */, true /* testMultiTenant */) + th, cleanupFunc := newRowLevelTTLTestJobTestHelper( + t, + nil, /* SQLTestingKnobs */ + true, /* testMultiTenant */ + 1, /* numNodes */ + ) defer cleanupFunc() th.sqlDB.Exec(t, `CREATE TABLE t (id INT PRIMARY KEY) WITH (ttl_expire_after = '1 minute')`) @@ -193,13 +200,12 @@ func TestRowLevelTTLInterruptDuringExecution(t *testing.T) { ALTER TABLE t SPLIT AT VALUES (1), (2); INSERT INTO t (id, crdb_internal_expiration) VALUES (1, now() - '1 month'), (2, now() - '1 month');` - mockVersion := descpb.DescriptorVersion(0) testCases := []struct { - desc string - expectedTTLError string - aostDuration time.Duration - mockTableDescriptorVersionDuringDelete *descpb.DescriptorVersion - preSelectDeleteStatement string + desc string + expectedTTLError string + aostDuration time.Duration + preDeleteChangeTableVersion bool + preSelectStatement string }{ { desc: "schema change too recent to start TTL job", @@ -213,22 +219,27 @@ INSERT INTO t (id, crdb_internal_expiration) VALUES (1, now() - '1 month'), (2, // We cannot use a schema change to change the version in this test as // we overtook the job adoption method, which means schema changes get // blocked and may not run. - mockTableDescriptorVersionDuringDelete: &mockVersion, + preDeleteChangeTableVersion: true, }, { - desc: "disable cluster setting", - expectedTTLError: `ttl jobs are currently disabled by CLUSTER SETTING sql.ttl.job.enabled`, - preSelectDeleteStatement: `SET CLUSTER SETTING sql.ttl.job.enabled = false`, + desc: "disable cluster setting", + expectedTTLError: `ttl jobs are currently disabled by CLUSTER SETTING sql.ttl.job.enabled`, + preSelectStatement: `SET CLUSTER SETTING sql.ttl.job.enabled = false`, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - th, cleanupFunc := newRowLevelTTLTestJobTestHelper(t, &sql.TTLTestingKnobs{ - AOSTDuration: &tc.aostDuration, - MockTableDescriptorVersionDuringDelete: tc.mockTableDescriptorVersionDuringDelete, - PreSelectDeleteStatement: tc.preSelectDeleteStatement, - }, false /* testMultiTenant */) + th, cleanupFunc := newRowLevelTTLTestJobTestHelper( + t, + &sql.TTLTestingKnobs{ + AOSTDuration: &tc.aostDuration, + PreDeleteChangeTableVersion: tc.preDeleteChangeTableVersion, + PreSelectStatement: tc.preSelectStatement, + }, + false, /* testMultiTenant */ + 1, /* numNodes */ + ) defer cleanupFunc() th.sqlDB.Exec(t, createTable) @@ -276,9 +287,13 @@ INSERT INTO t (id, crdb_internal_expiration) VALUES (1, now() - '1 month'), (2, for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { var zeroDuration time.Duration - th, cleanupFunc := newRowLevelTTLTestJobTestHelper(t, &sql.TTLTestingKnobs{ - AOSTDuration: &zeroDuration, - }, true /* testMultiTenant */) + th, cleanupFunc := newRowLevelTTLTestJobTestHelper( + t, + &sql.TTLTestingKnobs{ + AOSTDuration: &zeroDuration, + }, + true, /* testMultiTenant */ + 1 /* numNodes */) defer cleanupFunc() th.sqlDB.ExecMultiple(t, strings.Split(tc.setup, ";")...) @@ -320,6 +335,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { numNonExpiredRows int numSplits int forceNonMultiTenant bool + numNodes int expirationExpression string addRow func(th *rowLevelTTLTestJobTestHelper, createTableStmt *tree.CreateTable, ts time.Time) } @@ -334,6 +350,17 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { numExpiredRows: 1001, numNonExpiredRows: 5, }, + { + desc: "one column pk multiple nodes", + createTable: `CREATE TABLE tbl ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + text TEXT +) WITH (ttl_expire_after = '30 days')`, + numExpiredRows: 1001, + numNonExpiredRows: 5, + numNodes: 5, + numSplits: 10, + }, { desc: "one column pk, table ranges overlap", createTable: `CREATE TABLE tbl ( @@ -519,13 +546,19 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { t.Logf("test case: %#v", tc) var zeroDuration time.Duration + numNodes := tc.numNodes + if numNodes == 0 { + numNodes = 1 + } th, cleanupFunc := newRowLevelTTLTestJobTestHelper( t, &sql.TTLTestingKnobs{ - AOSTDuration: &zeroDuration, - ReturnStatsError: true, + AOSTDuration: &zeroDuration, + ReturnStatsError: true, + RequireMultipleSpanPartitions: tc.numNodes > 0, // require if there is more than 1 node in tc }, tc.numSplits == 0 && !tc.forceNonMultiTenant, // SPLIT AT does not work with multi-tenant + numNodes, /* numNodes */ ) defer cleanupFunc() diff --git a/pkg/sql/ttl/ttlschedule/ttlschedule.go b/pkg/sql/ttl/ttlschedule/ttlschedule.go index 69fb9e86075a..4d9589de875c 100644 --- a/pkg/sql/ttl/ttlschedule/ttlschedule.go +++ b/pkg/sql/ttl/ttlschedule/ttlschedule.go @@ -215,9 +215,13 @@ func createRowLevelTTLJob( txn *kv.Txn, descsCol *descs.Collection, jobRegistry *jobs.Registry, - ttlDetails catpb.ScheduledRowLevelTTLArgs, + ttlArgs catpb.ScheduledRowLevelTTLArgs, ) (jobspb.JobID, error) { - tn, err := descs.GetTableNameByID(ctx, txn, descsCol, ttlDetails.TableID) + tableDesc, err := descsCol.GetImmutableTableByID(ctx, txn, ttlArgs.TableID, tree.ObjectLookupFlagsWithRequired()) + if err != nil { + return 0, err + } + tn, err := descs.GetTableNameByDesc(ctx, txn, descsCol, tableDesc) if err != nil { return 0, err } @@ -225,8 +229,9 @@ func createRowLevelTTLJob( Description: fmt.Sprintf("ttl for %s", tn.FQString()), Username: username.NodeUserName(), Details: jobspb.RowLevelTTLDetails{ - TableID: ttlDetails.TableID, - Cutoff: timeutil.Now(), + TableID: ttlArgs.TableID, + Cutoff: timeutil.Now(), + TableVersion: tableDesc.GetVersion(), }, Progress: jobspb.RowLevelTTLProgress{}, CreatedBy: createdByInfo,