diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index d82b3e6659c0..32811f0d7404 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -74,6 +74,7 @@ go_test( "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security/securityassets", diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index 343ee6262303..157a8f51b92a 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -314,7 +314,9 @@ func (t *ttlProcessor) runTTLOnQueryBounds( until = numExpiredRows } deleteBatch := expiredRowsPKs[startRowIdx:until] + var batchRowCount int64 do := func(ctx context.Context, txn isql.Txn) error { + txn.KV().SetDebugName("ttljob-delete-batch") // If we detected a schema change here, the DELETE will not succeed // (the SELECT still will because of the AOST). Early exit here. desc, err := flowCtx.Descriptors.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, details.TableID) @@ -334,14 +336,11 @@ func (t *ttlProcessor) runTTLOnQueryBounds( defer tokens.Consume() start := timeutil.Now() - batchRowCount, err := deleteBuilder.Run(ctx, txn, deleteBatch) + batchRowCount, err = deleteBuilder.Run(ctx, txn, deleteBatch) if err != nil { return err } - metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) - metrics.RowDeletions.Inc(batchRowCount) - spanRowCount += batchRowCount return nil } if err := serverCfg.DB.Txn( @@ -349,6 +348,8 @@ func (t *ttlProcessor) runTTLOnQueryBounds( ); err != nil { return spanRowCount, errors.Wrapf(err, "error during row deletion") } + metrics.RowDeletions.Inc(batchRowCount) + spanRowCount += batchRowCount } // Step 3. Early exit if necessary. diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go index 6b7a48e3ad08..637db0ee9c73 100644 --- a/pkg/sql/ttl/ttljob/ttljob_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql" @@ -76,7 +77,11 @@ func newRowLevelTTLTestJobTestHelper( ), } + requestFilter, _ := testutils.TestingRequestFilterRetryTxnWithPrefix(t, "ttljob-", 1) baseTestingKnobs := base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: requestFilter, + }, JobsTestingKnobs: &jobs.TestingKnobs{ JobSchedulerEnv: th.env, TakeOverJobsScheduling: func(fn func(ctx context.Context, maxSchedules int64) error) { @@ -151,7 +156,7 @@ func (h *rowLevelTTLTestJobTestHelper) waitForScheduledJob( require.NoError(t, h.executeSchedules()) query := fmt.Sprintf( - `SELECT status, error FROM [SHOW JOBS] + `SELECT status, error FROM [SHOW JOBS] WHERE job_id IN ( SELECT id FROM %s WHERE created_by_id IN (SELECT schedule_id FROM %s WHERE executor_type = 'scheduled-row-level-ttl-executor')