diff --git a/pkg/ccl/backupccl/tenant_backup_nemesis_test.go b/pkg/ccl/backupccl/tenant_backup_nemesis_test.go index 097c3d4ecd4e..7021bc18744e 100644 --- a/pkg/ccl/backupccl/tenant_backup_nemesis_test.go +++ b/pkg/ccl/backupccl/tenant_backup_nemesis_test.go @@ -14,6 +14,7 @@ import ( "fmt" "math/rand" "net/url" + "sync/atomic" "testing" "time" @@ -145,7 +146,8 @@ func TestTenantBackupNemesis(t *testing.T) { defer hostClusterCleanupFn() tenant10, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{ - TenantID: roachpb.MustMakeTenantID(10), + TenantID: roachpb.MustMakeTenantID(10), + TenantName: "tenant-10", TestingKnobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, @@ -170,8 +172,6 @@ func TestTenantBackupNemesis(t *testing.T) { _, err = workloadsql.Setup(ctx, tenant10Conn, bankData, l) require.NoError(t, err) - backupLoc := "nodelocal://1/tenant-backup" - backupDone := make(chan struct{}) g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { @@ -204,8 +204,22 @@ func TestTenantBackupNemesis(t *testing.T) { } }) + hostURL, cleanup, err := sqlutils.PGUrlE( + tc.SystemLayer(0).AdvSQLAddr(), "backup-nemesis", url.User(username.RootUser)) + require.NoError(t, err) + defer cleanup() + + // Effectively reserve tenant 11 for restore. We care about + // the ID here because the IDs are built into the test + // certificates. + hostSQLDB.Exec(t, "SELECT crdb_internal.create_tenant(12)") + // Disabled until #113852 is fixed or the feature is removed. + hostSQLDB.Exec(t, "SET CLUSTER SETTING kv.bulkio.write_metadata_sst.enabled=false") + _, err = tenant10Conn.Exec("SET CLUSTER SETTING kv.bulkio.write_metadata_sst.enabled=false") + require.NoError(t, err) + rng, _ := randutil.NewPseudoRand() - nemesisRunner := newBackupNemesis(t, rng, tenant10Conn) + nemesisRunner := newBackupNemesis(t, rng, tenant10Conn, hostSQLDB.DB.(*gosql.DB), hostURL.String()) nemesisRunner.Start(ctx) var tablesToCheck []string @@ -214,11 +228,10 @@ func TestTenantBackupNemesis(t *testing.T) { defer close(backupDone) defer nemesisRunner.Stop() - t.Logf("backup-nemesis: full backup started") - hostSQLDB.Exec(t, fmt.Sprintf("BACKUP TENANT 10 INTO '%s'", backupLoc)) - t.Logf("backup-nemesis: full backup finished") - - numIncrementals := 30 + if err := nemesisRunner.FullBackup(); err != nil { + return err + } + numIncrementals := 20 if util.RaceEnabled { numIncrementals = 10 } @@ -227,38 +240,38 @@ func TestTenantBackupNemesis(t *testing.T) { // don't try to coordinate between the backups // and the nemesis runner. tablesToCheck = nemesisRunner.TablesToCheck() - hostSQLDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&aost) - backupQuery := fmt.Sprintf("BACKUP TENANT 10 INTO LATEST IN '%s' AS OF SYSTEM TIME %s", backupLoc, aost) - t.Logf("backup-nemesis: incremental backup started: %s", backupQuery) - hostSQLDB.Exec(t, backupQuery) - t.Logf("backup-nemesis: incremental backup finished") + var err error + aost, err = nemesisRunner.IncrementalBackup() + if err != nil { + return err + } } return nil }) require.NoError(t, g.Wait()) - restoreQuery := fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' AS OF SYSTEM TIME %s WITH virtual_cluster_name = 'cluster-11'", backupLoc, aost) - t.Logf("backup-nemesis: restoring tenant 10 into 11: %s", restoreQuery) + restoreQuery := fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' AS OF SYSTEM TIME %s WITH virtual_cluster = '11', virtual_cluster_name = 'restored-tenant-10'", nemesisRunner.BackupLocation(), aost) + t.Logf("backup-nemesis: restoring tenant 10 into restored-tenant-10: %s", restoreQuery) hostSQLDB.Exec(t, restoreQuery) // Validation // // We check bank.bank which has had the workload running against it // and any table from a completed nemesis. - tenant11, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{ - TenantName: "cluster-11", + restoredTenant, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{ + TenantName: "restored-tenant-10", DisableCreateTenant: true, }) require.NoError(t, err) - tenant11Conn := tenant11.SQLConn(t, serverutils.DBName("bank")) + restoreTenantConn := restoredTenant.SQLConn(t, serverutils.DBName("bank")) tenant10SQLDB := sqlutils.MakeSQLRunner(tenant10Conn) - tenant11SQLDB := sqlutils.MakeSQLRunner(tenant11Conn) + restoredTenantSQLDB := sqlutils.MakeSQLRunner(restoreTenantConn) - validateTableAcrossTenantRestore(t, tenant10SQLDB, tenant11SQLDB, "bank.bank", aost) + validateTableAcrossTenantRestore(t, tenant10SQLDB, restoredTenantSQLDB, "bank.bank", aost) for _, name := range tablesToCheck { - validateTableAcrossTenantRestore(t, tenant10SQLDB, tenant11SQLDB, fmt.Sprintf(`bank."%s"`, name), aost) + validateTableAcrossTenantRestore(t, tenant10SQLDB, restoredTenantSQLDB, fmt.Sprintf(`bank."%s"`, name), aost) } } @@ -297,35 +310,110 @@ func assertEqualQueries( } type randomBackupNemesis struct { - t *testing.T - db *gosql.DB + t *testing.T + tenantDB *gosql.DB + hostDB *gosql.DB + hostURL string grp ctxgroup.Group cancel context.CancelFunc rng *rand.Rand - nemeses []nemesis + useClusterBackup bool + // We keep some state for the replication nemesis to avoid + // having hundreds of running replication streams slowing down + // the test. + replCount atomic.Int32 + nemeses []nemesis mu struct { syncutil.Mutex tablesToCheck []string + + otherTenants []string } } type nemesis struct { name string - impl func(context.Context, *randomBackupNemesis, *gosql.DB) error + impl func(context.Context, *randomBackupNemesis, *gosql.DB, *gosql.DB) error } -func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupNemesis { +func newBackupNemesis( + t *testing.T, rng *rand.Rand, tenantDB *gosql.DB, hostDB *gosql.DB, hostURL string, +) *randomBackupNemesis { return &randomBackupNemesis{ - t: t, - rng: rng, - db: db, + t: t, + rng: rng, + tenantDB: tenantDB, + hostDB: hostDB, + hostURL: hostURL, + // Prefer cluster backups most of the time since those + // are what we use more often at the moment. + useClusterBackup: rng.Float64() > 0.2, nemeses: []nemesis{ + {name: "CREATE TENANT", + impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error { + tenantName := fmt.Sprintf("test-tenant-%d", n.rng.Int63()) + if _, err := hostDB.Exec("CREATE TENANT $1", tenantName); err != nil { + return err + } + n.addTenant(tenantName) + return nil + }, + }, + {name: "DELETE TENANT", + impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error { + tenant := n.popTenant() + if tenant == "" { + n.t.Log("no tenants to delete") + return nil + } + if _, err := hostDB.Exec("DROP TENANT $1", tenant); err != nil { + return err + } + return nil + }, + }, + {name: "RENAME TENANT", + impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error { + tenant := n.popTenant() + if tenant == "" { + n.t.Log("no tenants to rename") + return nil + } + newName := fmt.Sprintf("test-tenant-%d", n.rng.Int63()) + if _, err := hostDB.Exec("ALTER TENANT $1 RENAME TO $2", tenant, newName); err != nil { + return err + } + n.addTenant(newName) + return nil + }, + }, + {name: "REPLICATE TENANT", + impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error { + if n.replCount.Load() > 3 { + n.t.Log("already hit replication limit") + return nil + } + if _, err := hostDB.Exec("SET CLUSTER SETTING physical_replication.enabled = true"); err != nil { + return err + } + if _, err := hostDB.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil { + return err + } + newName := fmt.Sprintf("repl-tenant-%d", n.rng.Int63()) + if _, err := hostDB.Exec("CREATE TENANT $1 FROM REPLICATION OF $2 ON $3", newName, "tenant-10", n.hostURL); err != nil { + return err + } + n.addTenant(newName) + n.replCount.Add(1) + return nil + }, + }, {name: "CANCELED IMPORT INTO", - impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error { + impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error { if _, err := db.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest'"); err != nil { return err } @@ -359,7 +447,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN }, }, {name: "IMPORT INTO", - impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error { + impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error { tableName, err := n.makeRandomBankTable("import_into") if err != nil { return err @@ -369,7 +457,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN }, }, {name: "CREATE INDEX", - impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error { + impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error { tableName, err := n.makeRandomBankTable("create_index") if err != nil { return err @@ -382,7 +470,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN }, }, {name: "CREATE UNIQUE INDEX (will fail)", - impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error { + impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error { tableName, err := n.makeRandomBankTable("create_unique_index") if err != nil { return err @@ -429,10 +517,68 @@ func (r *randomBackupNemesis) TablesToCheck() []string { return ret } +func (r *randomBackupNemesis) BackupLocation() string { + if r.useClusterBackup { + return "nodelocal://1/cluster-backup" + } + return "nodelocal://1/tenant-backup" +} + +func (r *randomBackupNemesis) FullBackup() error { + backupQuery := "BACKUP TENANT 10 INTO $1" + if r.useClusterBackup { + backupQuery = "BACKUP INTO $1 WITH include_all_secondary_tenants" + } + r.t.Logf("backup-nemesis: full backup started: %s", backupQuery) + if _, err := r.hostDB.Exec(backupQuery, r.BackupLocation()); err != nil { + return err + } + r.t.Log("backup-nemesis: full backup finished") + return nil +} + +func (r *randomBackupNemesis) IncrementalBackup() (string, error) { + var aost string + if err := r.hostDB.QueryRow("SELECT cluster_logical_timestamp()").Scan(&aost); err != nil { + return aost, err + } + backupQuery := fmt.Sprintf("BACKUP TENANT 10 INTO LATEST IN '%s' AS OF SYSTEM TIME %s", r.BackupLocation(), aost) + if r.useClusterBackup { + backupQuery = fmt.Sprintf("BACKUP INTO LATEST IN '%s' AS OF SYSTEM TIME %s WITH include_all_secondary_tenants", r.BackupLocation(), aost) + } + + r.t.Logf("backup-nemesis: incremental backup started: %s", backupQuery) + if _, err := r.hostDB.Exec(backupQuery); err != nil { + return aost, err + } + r.t.Logf("backup-nemesis: incremental backup finished") + return aost, nil + +} + func (r *randomBackupNemesis) addTable(name string) { r.mu.Lock() + defer r.mu.Unlock() r.mu.tablesToCheck = append(r.mu.tablesToCheck, name) - r.mu.Unlock() + +} + +func (r *randomBackupNemesis) addTenant(name string) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.otherTenants = append(r.mu.otherTenants, name) + +} + +func (r *randomBackupNemesis) popTenant() string { + r.mu.Lock() + defer r.mu.Unlock() + var tenant string + if len(r.mu.otherTenants) < 1 { + return tenant + } + tenant, r.mu.otherTenants = r.mu.otherTenants[0], r.mu.otherTenants[1:] + return tenant } func (r *randomBackupNemesis) runNemesis(ctx context.Context) error { @@ -444,7 +590,7 @@ func (r *randomBackupNemesis) runNemesis(ctx context.Context) error { } n := r.nemeses[r.rng.Intn(len(r.nemeses))] r.t.Logf("backup-nemesis: %s started", n.name) - if err := n.impl(ctx, r, r.db); err != nil { + if err := n.impl(ctx, r, r.tenantDB, r.hostDB); err != nil { r.t.Logf("backup-nemesis: %s failed: %s", n.name, err) return err } @@ -454,11 +600,11 @@ func (r *randomBackupNemesis) runNemesis(ctx context.Context) error { func (r *randomBackupNemesis) makeRandomBankTable(prefix string) (string, error) { tableName := fmt.Sprintf("%s_%s", prefix, uuid.FastMakeV4().String()) - if _, err := r.db.Exec(fmt.Sprintf(`CREATE TABLE "%s" (id INT PRIMARY KEY, n INT, s STRING)`, tableName)); err != nil { + if _, err := r.tenantDB.Exec(fmt.Sprintf(`CREATE TABLE "%s" (id INT PRIMARY KEY, n INT, s STRING)`, tableName)); err != nil { return "", err } importStmt := fmt.Sprintf(`IMPORT INTO "%s" CSV DATA ('workload:///csv/bank/bank?payload-bytes=100&row-end=1&row-start=0&rows=1000&seed=1&version=1.0.0')`, tableName) - if _, err := r.db.Exec(importStmt); err != nil { + if _, err := r.tenantDB.Exec(importStmt); err != nil { return tableName, err } return tableName, nil diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index 9cfe7eab8c95..09d8c5f6d49d 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -268,10 +268,10 @@ func alterTenantJobCutover( if alterTenantStmt.Cutover.Latest { replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress) if replicatedTime.IsEmpty() { - return hlc.Timestamp{}, - errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName) + cutoverTime = details.ReplicationStartTime + } else { + cutoverTime = replicatedTime } - cutoverTime = replicatedTime } // TODO(ssd): We could use the replication manager here, but diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 6e23b719969c..7eaf5a2d2ddc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -276,6 +276,10 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{} return s.handleResumeError(ctx, jobExecCtx, err) } + if err := jobExecCtx.ExecCfg().JobRegistry.CheckPausepoint("stream_ingestion.before_ingestion"); err != nil { + return err + } + // Start ingesting KVs from the replication stream. err = ingestWithRetries(ctx, jobExecCtx, s) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover b/pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover new file mode 100644 index 000000000000..62e998bea81a --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover @@ -0,0 +1,28 @@ +# This test ensures 1) the user can set a cutover before the initial scan completes; 2) cannot set a +# cutover time before the replicatedStartTime. + +create-replication-clusters +---- + +exec-sql as=destination-system +SET CLUSTER SETTING jobs.debug.pausepoints = 'stream_ingestion.before_ingestion'; +---- + +let $pre as=source-system +SELECT clock_timestamp()::timestamp::string +---- + +start-replication-stream +---- + +job as=destination-system wait-for-state=paused +---- + +query-sql as=destination-system regex-error=(.*before earliest safe cutover.*) +ALTER TENANT "destination" COMPLETE REPLICATION TO SYSTEM TIME '$pre' +---- + +exec-sql as=destination-system +ALTER TENANT "destination" COMPLETE REPLICATION TO LATEST +---- +