diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 9746f69c3012..d33e60a4032d 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -530,6 +530,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { MaxRetries: 5, } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before_flow"); err != nil { + return err + } + // We want to retry a backup if there are transient failures (i.e. worker nodes // dying), so if we receive a retryable error, re-plan and retry the backup. var res roachpb.RowCount diff --git a/pkg/ccl/backupccl/backup_tenant_test.go b/pkg/ccl/backupccl/backup_tenant_test.go index 13ab267e5757..aa34b0b8ffc2 100644 --- a/pkg/ccl/backupccl/backup_tenant_test.go +++ b/pkg/ccl/backupccl/backup_tenant_test.go @@ -6,18 +6,21 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package backupccl_test +package backupccl import ( "context" + "fmt" "testing" "github.com/cockroachdb/cockroach/pkg/base" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" _ "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -41,6 +44,7 @@ func TestBackupTenantImportingTable(t *testing.T) { TestingKnobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}, }) defer tSQL.Close() + runner := sqlutils.MakeSQLRunner(tSQL) if _, err := tSQL.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';"); err != nil { t.Fatal(err) @@ -54,23 +58,11 @@ func TestBackupTenantImportingTable(t *testing.T) { if _, err := tSQL.Exec("IMPORT INTO x CSV DATA ('workload:///csv/bank/bank?rows=100&version=1.0.0')"); !testutils.IsError(err, "pause") { t.Fatal(err) } - var jobID int - if err := tSQL.QueryRow(`SELECT job_id FROM [show jobs] WHERE job_type = 'IMPORT'`).Scan(&jobID); err != nil { - t.Fatal(err) - } - tc.Servers[0].JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() - // wait for it to pause - - testutils.SucceedsSoon(t, func() error { - var status string - if err := tSQL.QueryRow(`SELECT status FROM [show jobs] WHERE job_id = $1`, jobID).Scan(&status); err != nil { - t.Fatal(err) - } - if status == string(jobs.StatusPaused) { - return nil - } - return errors.Newf("%s", status) - }) + var jobID jobspb.JobID + err := tSQL.QueryRow(fmt.Sprintf( + `SELECT job_id FROM [show jobs] WHERE job_type = 'IMPORT'`)).Scan(&jobID) + require.NoError(t, err) + jobutils.WaitForJobToPause(t, runner, jobID) // tenant now has a fully ingested, paused import, so back them up. const dst = "userfile:///t" diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 767a78d3e1fa..ba83320aede0 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1790,7 +1791,7 @@ func createAndWaitForJob( t, `INSERT INTO system.jobs (created, status, payload, progress) VALUES ($1, $2, $3, $4) RETURNING id`, timeutil.FromUnixMicros(now), jobs.StatusRunning, payload, progressBytes, ).Scan(&jobID) - jobutils.WaitForJob(t, db, jobID) + jobutils.WaitForJobToSucceed(t, db, jobID) } // TestBackupRestoreResume tests whether backup and restore jobs are properly @@ -2015,7 +2016,7 @@ func TestBackupRestoreControlJob(t *testing.T) { t.Fatalf("%d: expected 'job paused' error, but got %+v", i, err) } sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, jobID)) - jobutils.WaitForJob(t, sqlDB, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) } sqlDB.CheckQueryResults(t, @@ -2051,7 +2052,7 @@ func TestBackupRestoreControlJob(t *testing.T) { sqlDB.CheckQueryResults(t, fmt.Sprintf("SHOW BACKUP '%s'", noOfflineDir), [][]string{}) } sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, jobID)) - jobutils.WaitForJob(t, sqlDB, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) } sqlDB.CheckQueryResults(t, `SELECT count(*) FROM pause.bank`, @@ -2071,7 +2072,7 @@ func TestBackupRestoreControlJob(t *testing.T) { if err != nil { t.Fatalf("error while running backup %+v", err) } - jobutils.WaitForJob(t, sqlDB, backupJobID) + jobutils.WaitForJobToSucceed(t, sqlDB, backupJobID) sqlDB.Exec(t, `DROP DATABASE data`) @@ -9051,7 +9052,7 @@ func TestBackupWorkerFailure(t *testing.T) { } // But the job should be restarted and succeed eventually. - jobutils.WaitForJob(t, sqlDB, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) // Drop database and restore to ensure that the backup was successful. sqlDB.Exec(t, `DROP DATABASE data`) @@ -9400,7 +9401,7 @@ DROP INDEX foo@bar; close(allowGC) // Wait for the GC to complete. - jobutils.WaitForJob(t, sqlRunner, gcJobID) + jobutils.WaitForJobToSucceed(t, sqlRunner, gcJobID) waitForTableSplit(t, conn, "foo", "test") // This backup should succeed since the spans being backed up have a default @@ -9642,7 +9643,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { } args.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() args.ServerArgs.ExternalIODir = localExternalDir - tc := testcluster.StartTestCluster(t, 3, args) + tc := testcluster.StartTestCluster(t, 1, args) defer tc.Stopper().Stop(ctx) tc.WaitForNodeLiveness(t) @@ -9684,21 +9685,6 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { require.NoError(t, err) } } - const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC` - processedRegexp := regexp.MustCompile(processedPattern) - - gcSoon := func() { - testutils.SucceedsSoon(t, func() error { - upsertUntilBackpressure() - s, repl := getStoreAndReplica(t, tc, conn, "foo", "defaultdb") - trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false) - require.NoError(t, err) - if !processedRegexp.MatchString(trace.String()) { - return errors.Errorf("%q does not match %q", trace.String(), processedRegexp) - } - return nil - }) - } waitForTableSplit(t, conn, "foo", "defaultdb") waitForReplicaFieldToBeSet(t, tc, conn, "foo", "defaultdb", func(r *kvserver.Replica) (bool, error) { @@ -9710,7 +9696,15 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { var tsBefore string require.NoError(t, conn.QueryRow("SELECT cluster_logical_timestamp()").Scan(&tsBefore)) - gcSoon() + upsertUntilBackpressure() + runGCAndCheckTrace(ctx, t, tc, conn, false /* skipShouldQueue */, "foo", "defaultdb", func(traceStr string) error { + const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC` + processedRegexp := regexp.MustCompile(processedPattern) + if !processedRegexp.MatchString(traceStr) { + return errors.Errorf("%q does not match %q", traceStr, processedRegexp) + } + return nil + }) _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo) testutils.IsError(err, "must be after replica GC threshold") @@ -9728,6 +9722,116 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { require.NoError(t, err) } +// TestExcludeDataFromBackupDoesNotHoldupGC tests that a table marked as +// `exclude_data_from_backup` and with a protected timestamp record covering it +// does not holdup GC, since its data is not going to be backed up. +func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + params := base.TestClusterArgs{} + params.ServerArgs.ExternalIODir = dir + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + DisableGCQueue: true, + DisableLastProcessedCheck: true, + } + params.ServerArgs.Knobs.ProtectedTS = &protectedts.TestingKnobs{ + EnableProtectedTimestampForMultiTenant: true} + params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + tc := testcluster.StartTestCluster(t, 1, params) + defer tc.Stopper().Stop(ctx) + + tc.WaitForNodeLiveness(t) + require.NoError(t, tc.WaitForFullReplication()) + + conn := tc.ServerConn(0) + runner := sqlutils.MakeSQLRunner(conn) + runner.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + // speeds up the test + runner.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) + runner.Exec(t, `SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'`) + + runner.Exec(t, `CREATE DATABASE test;`) + runner.Exec(t, `CREATE TABLE test.foo (k INT PRIMARY KEY, v BYTES)`) + + // Exclude the table from backup so that it does not hold up GC. + runner.Exec(t, `ALTER TABLE test.foo SET (exclude_data_from_backup = true)`) + + const tableRangeMaxBytes = 1 << 18 + runner.Exec(t, "ALTER TABLE test.foo CONFIGURE ZONE USING "+ + "gc.ttlseconds = 1, range_max_bytes = $1, range_min_bytes = 1<<10;", tableRangeMaxBytes) + + rRand, _ := randutil.NewTestRand() + upsertUntilBackpressure := func() { + for { + _, err := conn.Exec("UPSERT INTO test.foo VALUES (1, $1)", + randutil.RandBytes(rRand, 1<<15)) + if testutils.IsError(err, "backpressure") { + break + } + require.NoError(t, err) + } + } + + // Wait for the span config fields to apply. + waitForTableSplit(t, conn, "foo", "test") + waitForReplicaFieldToBeSet(t, tc, conn, "foo", "test", func(r *kvserver.Replica) (bool, error) { + if !r.ExcludeDataFromBackup() { + return false, errors.New("waiting for exclude_data_from_backup to be applied") + } + conf := r.SpanConfig() + if conf.TTL() != 1*time.Second { + return false, errors.New("waiting for gc.ttlseconds to be applied") + } + if r.GetMaxBytes() != tableRangeMaxBytes { + return false, errors.New("waiting for range_max_bytes to be applied") + } + return true, nil + }) + + runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`) + if _, err := conn.Exec(`BACKUP DATABASE test INTO $1`, localFoo); !testutils.IsError(err, "pause") { + t.Fatal(err) + } + // We pause the backup resumer before it plans its flow so this timestamp + // should be very close to the timestamp protected by the record written by + // the backup. + afterBackup := tc.Server(0).Clock().Now() + var jobID jobspb.JobID + err := conn.QueryRow(fmt.Sprintf( + `SELECT job_id FROM [show jobs] WHERE job_type = 'BACKUP'`)).Scan(&jobID) + require.NoError(t, err) + jobutils.WaitForJobToPause(t, runner, jobID) + + // Ensure that the replica sees the ProtectionPolicies. + waitForReplicaFieldToBeSet(t, tc, conn, "foo", "test", func(r *kvserver.Replica) (bool, error) { + if len(r.SpanConfig().GCPolicy.ProtectionPolicies) == 0 { + return false, errors.New("no protection policy applied to replica") + } + return true, nil + }) + + // Now that the backup has written a PTS record protecting the database, we + // check that the replica corresponding to `test.foo` continue to GC data + // since it has been marked as `exclude_data_from_backup`. + upsertUntilBackpressure() + runGCAndCheckTrace(ctx, t, tc, conn, false /* skipShouldQueue */, "foo", "test", func(traceStr string) error { + const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC` + processedRegexp := regexp.MustCompile(processedPattern) + if !processedRegexp.MatchString(traceStr) { + return errors.Errorf("%q does not match %q", traceStr, processedRegexp) + } + thresh := thresholdFromTrace(t, traceStr) + require.Truef(t, afterBackup.Less(thresh), "%v >= %v", afterBackup, thresh) + return nil + }) +} + // TestBackupRestoreSystemUsers tests RESTORE SYSTEM USERS feature which allows user to // restore users from a backup into current cluster and regrant roles. func TestBackupRestoreSystemUsers(t *testing.T) { diff --git a/pkg/ccl/backupccl/testutils.go b/pkg/ccl/backupccl/testutils.go index a8f26a144145..d6ac830be313 100644 --- a/pkg/ccl/backupccl/testutils.go +++ b/pkg/ccl/backupccl/testutils.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "reflect" + "regexp" "strings" "testing" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/workload/bank" "github.com/cockroachdb/cockroach/pkg/workload/workloadsql" @@ -485,3 +487,44 @@ func waitForReplicaFieldToBeSet( return nil }) } + +func thresholdFromTrace(t *testing.T, traceString string) hlc.Timestamp { + t.Helper() + thresholdRE := regexp.MustCompile(`(?s).*Threshold:(?P[^\s]*)`) + threshStr := string(thresholdRE.ExpandString(nil, "$threshold", + traceString, thresholdRE.FindStringSubmatchIndex(traceString))) + thresh, err := hlc.ParseTimestamp(threshStr) + require.NoError(t, err) + return thresh +} + +// runGCAndCheckTrace manually enqueues the replica corresponding to +// `databaseName.tableName`, and runs `checkGCTrace` until it succeeds. +func runGCAndCheckTrace( + ctx context.Context, + t *testing.T, + tc *testcluster.TestCluster, + conn *gosql.DB, + skipShouldQueue bool, + tableName, databaseName string, + checkGCTrace func(traceStr string) error, +) { + t.Helper() + var startKey roachpb.Key + err := conn.QueryRow("SELECT start_key"+ + " FROM crdb_internal.ranges_no_leases"+ + " WHERE table_name = $1"+ + " AND database_name = $2"+ + " ORDER BY start_key ASC", tableName, databaseName).Scan(&startKey) + require.NoError(t, err) + r := tc.LookupRangeOrFatal(t, startKey) + l, _, err := tc.FindRangeLease(r, nil) + require.NoError(t, err) + lhServer := tc.Server(int(l.Replica.NodeID) - 1) + s, repl := getFirstStoreReplica(t, lhServer, startKey) + testutils.SucceedsSoon(t, func() error { + trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue) + require.NoError(t, err) + return checkGCTrace(trace.String()) + }) +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 36243ad1ad1a..ff7a0064b326 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -129,7 +129,7 @@ INSERT INTO d.t2 VALUES (2); `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, ingestionJobID, cutoverTime) - jobutils.WaitForJob(t, destSQL, jobspb.JobID(ingestionJobID)) + jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID)) query := "SELECT * FROM d.t1" sourceData := sourceSQL.QueryStr(t, query) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index c101783d9f7f..66cae3cd7b45 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -145,7 +145,7 @@ func TestPartitionedTenantStreamingEndToEnd(t *testing.T) { // Cut over the ingestion job and the job will stop eventually. destSysSQL.Exec(t, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, ingestionJobID, cutoverTime) - jobutils.WaitForJob(t, destSysSQL, jobspb.JobID(ingestionJobID)) + jobutils.WaitForJobToSucceed(t, destSysSQL, jobspb.JobID(ingestionJobID)) // TODO(casper): Make producer job exit normally in the cutover scenario. sourceSysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status, error FROM [SHOW JOBS] WHERE job_id = %d", streamProducerJobID), diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index f1e486366224..2ca290de445c 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -931,7 +931,7 @@ func (r *Replica) ExcludeDataFromBackup() bool { return r.mu.conf.ExcludeDataFromBackup } -func (r *Replica) exludeReplicaFromBackupRLocked() bool { +func (r *Replica) excludeReplicaFromBackupRLocked() bool { return r.mu.conf.ExcludeDataFromBackup } @@ -1540,7 +1540,7 @@ func (r *Replica) checkTSAboveGCThresholdRLocked( return &roachpb.BatchTimestampBeforeGCError{ Timestamp: ts, Threshold: threshold, - DataExcludedFromBackup: r.exludeReplicaFromBackupRLocked(), + DataExcludedFromBackup: r.excludeReplicaFromBackupRLocked(), } } diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 6261522678ad..f2aa3e231a45 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -5062,7 +5062,7 @@ func TestImportWorkerFailure(t *testing.T) { } // But the job should be restarted and succeed eventually. - jobutils.WaitForJob(t, sqlDB, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) sqlDB.CheckQueryResults(t, `SELECT * FROM t ORDER BY i`, sqlDB.QueryStr(t, `SELECT * FROM generate_series(0, $1)`, count-1), diff --git a/pkg/sql/stats/create_stats_job_test.go b/pkg/sql/stats/create_stats_job_test.go index a98917a9ed1d..06602b3c6f07 100644 --- a/pkg/sql/stats/create_stats_job_test.go +++ b/pkg/sql/stats/create_stats_job_test.go @@ -111,7 +111,7 @@ func TestCreateStatsControlJob(t *testing.T) { _, err := sqlDB.DB.ExecContext(context.Background(), `RESUME JOB $1`, jobID) return err }) - jobutils.WaitForJob(t, sqlDB, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) // Now the job should have succeeded in producing stats. sqlDB.CheckQueryResults(t, @@ -232,7 +232,7 @@ func TestAtMostOneRunningCreateStats(t *testing.T) { // Verify that the first job completed successfully. sqlDB.Exec(t, fmt.Sprintf("RESUME JOB %d", jobID)) - jobutils.WaitForJob(t, sqlDB, jobID) + jobutils.WaitForJobToSucceed(t, sqlDB, jobID) <-errCh } diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index b67e548e3843..823088faabba 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -34,8 +34,21 @@ import ( "github.com/lib/pq" ) -// WaitForJob waits for the specified job ID to terminate. -func WaitForJob(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { +// WaitForJobToSucceed waits for the specified job ID to succeed. +func WaitForJobToSucceed(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + waitForJobToHaveStatus(t, db, jobID, jobs.StatusSucceeded) +} + +// WaitForJobToPause waits for the specified job ID to be paused. +func WaitForJobToPause(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + waitForJobToHaveStatus(t, db, jobID, jobs.StatusPaused) +} + +func waitForJobToHaveStatus( + t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID, expectedStatus jobs.Status, +) { t.Helper() if err := retry.ForDuration(time.Minute*2, func() error { var status string @@ -50,7 +63,7 @@ func WaitForJob(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { } t.Fatalf("job failed") } - if e, a := jobs.StatusSucceeded, jobs.Status(status); e != a { + if e, a := expectedStatus, jobs.Status(status); e != a { return errors.Errorf("expected job status %s, but got %s", e, a) } return nil