Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113851: backupccl: add tenant operations to backup nemesis r=dt a=stevendanna

This adds:

  - CREATE TENANT
  - DROP TENANT
  - ALTER TENANT .. RENAME
  - CREATE TENANT FROM REPLICATION

To our tenant backup nemesis tests. Additionally, it modifies the test to occasionally use cluster backups over tenant-level backups.

Fixes #108745

Release note: None

114876: streamingccl: allow CUTOVER TO LATEST before initial scan finishes r=stevendanna a=msbutler

This patch allows the user to execute ALTER TENANT x COMPLETE REPLICATION TO LATEST before the initial scan completes. After this cmd, the cutover time is set to the replicated start time.

Fixes: #114734

Epic: none

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Nov 27, 2023
3 parents 2fe0f11 + 7bbcf40 + d457b0c commit 078c69a
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 41 deletions.
222 changes: 184 additions & 38 deletions pkg/ccl/backupccl/tenant_backup_nemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"math/rand"
"net/url"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -145,7 +146,8 @@ func TestTenantBackupNemesis(t *testing.T) {
defer hostClusterCleanupFn()

tenant10, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantID: roachpb.MustMakeTenantID(10),
TenantID: roachpb.MustMakeTenantID(10),
TenantName: "tenant-10",
TestingKnobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
Expand All @@ -170,8 +172,6 @@ func TestTenantBackupNemesis(t *testing.T) {
_, err = workloadsql.Setup(ctx, tenant10Conn, bankData, l)
require.NoError(t, err)

backupLoc := "nodelocal://1/tenant-backup"

backupDone := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -204,8 +204,22 @@ func TestTenantBackupNemesis(t *testing.T) {
}
})

hostURL, cleanup, err := sqlutils.PGUrlE(
tc.SystemLayer(0).AdvSQLAddr(), "backup-nemesis", url.User(username.RootUser))
require.NoError(t, err)
defer cleanup()

// Effectively reserve tenant 11 for restore. We care about
// the ID here because the IDs are built into the test
// certificates.
hostSQLDB.Exec(t, "SELECT crdb_internal.create_tenant(12)")
// Disabled until #113852 is fixed or the feature is removed.
hostSQLDB.Exec(t, "SET CLUSTER SETTING kv.bulkio.write_metadata_sst.enabled=false")
_, err = tenant10Conn.Exec("SET CLUSTER SETTING kv.bulkio.write_metadata_sst.enabled=false")
require.NoError(t, err)

rng, _ := randutil.NewPseudoRand()
nemesisRunner := newBackupNemesis(t, rng, tenant10Conn)
nemesisRunner := newBackupNemesis(t, rng, tenant10Conn, hostSQLDB.DB.(*gosql.DB), hostURL.String())
nemesisRunner.Start(ctx)

var tablesToCheck []string
Expand All @@ -214,11 +228,10 @@ func TestTenantBackupNemesis(t *testing.T) {
defer close(backupDone)
defer nemesisRunner.Stop()

t.Logf("backup-nemesis: full backup started")
hostSQLDB.Exec(t, fmt.Sprintf("BACKUP TENANT 10 INTO '%s'", backupLoc))
t.Logf("backup-nemesis: full backup finished")

numIncrementals := 30
if err := nemesisRunner.FullBackup(); err != nil {
return err
}
numIncrementals := 20
if util.RaceEnabled {
numIncrementals = 10
}
Expand All @@ -227,38 +240,38 @@ func TestTenantBackupNemesis(t *testing.T) {
// don't try to coordinate between the backups
// and the nemesis runner.
tablesToCheck = nemesisRunner.TablesToCheck()
hostSQLDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&aost)
backupQuery := fmt.Sprintf("BACKUP TENANT 10 INTO LATEST IN '%s' AS OF SYSTEM TIME %s", backupLoc, aost)
t.Logf("backup-nemesis: incremental backup started: %s", backupQuery)
hostSQLDB.Exec(t, backupQuery)
t.Logf("backup-nemesis: incremental backup finished")
var err error
aost, err = nemesisRunner.IncrementalBackup()
if err != nil {
return err
}
}
return nil
})

require.NoError(t, g.Wait())
restoreQuery := fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' AS OF SYSTEM TIME %s WITH virtual_cluster_name = 'cluster-11'", backupLoc, aost)
t.Logf("backup-nemesis: restoring tenant 10 into 11: %s", restoreQuery)
restoreQuery := fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' AS OF SYSTEM TIME %s WITH virtual_cluster = '11', virtual_cluster_name = 'restored-tenant-10'", nemesisRunner.BackupLocation(), aost)
t.Logf("backup-nemesis: restoring tenant 10 into restored-tenant-10: %s", restoreQuery)
hostSQLDB.Exec(t, restoreQuery)

// Validation
//
// We check bank.bank which has had the workload running against it
// and any table from a completed nemesis.
tenant11, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantName: "cluster-11",
restoredTenant, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantName: "restored-tenant-10",
DisableCreateTenant: true,
})
require.NoError(t, err)

tenant11Conn := tenant11.SQLConn(t, serverutils.DBName("bank"))
restoreTenantConn := restoredTenant.SQLConn(t, serverutils.DBName("bank"))

tenant10SQLDB := sqlutils.MakeSQLRunner(tenant10Conn)
tenant11SQLDB := sqlutils.MakeSQLRunner(tenant11Conn)
restoredTenantSQLDB := sqlutils.MakeSQLRunner(restoreTenantConn)

validateTableAcrossTenantRestore(t, tenant10SQLDB, tenant11SQLDB, "bank.bank", aost)
validateTableAcrossTenantRestore(t, tenant10SQLDB, restoredTenantSQLDB, "bank.bank", aost)
for _, name := range tablesToCheck {
validateTableAcrossTenantRestore(t, tenant10SQLDB, tenant11SQLDB, fmt.Sprintf(`bank."%s"`, name), aost)
validateTableAcrossTenantRestore(t, tenant10SQLDB, restoredTenantSQLDB, fmt.Sprintf(`bank."%s"`, name), aost)
}
}

Expand Down Expand Up @@ -297,35 +310,110 @@ func assertEqualQueries(
}

type randomBackupNemesis struct {
t *testing.T
db *gosql.DB
t *testing.T
tenantDB *gosql.DB
hostDB *gosql.DB
hostURL string

grp ctxgroup.Group
cancel context.CancelFunc
rng *rand.Rand

nemeses []nemesis
useClusterBackup bool
// We keep some state for the replication nemesis to avoid
// having hundreds of running replication streams slowing down
// the test.
replCount atomic.Int32
nemeses []nemesis

mu struct {
syncutil.Mutex
tablesToCheck []string

otherTenants []string
}
}

type nemesis struct {
name string
impl func(context.Context, *randomBackupNemesis, *gosql.DB) error
impl func(context.Context, *randomBackupNemesis, *gosql.DB, *gosql.DB) error
}

func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupNemesis {
func newBackupNemesis(
t *testing.T, rng *rand.Rand, tenantDB *gosql.DB, hostDB *gosql.DB, hostURL string,
) *randomBackupNemesis {
return &randomBackupNemesis{
t: t,
rng: rng,
db: db,
t: t,
rng: rng,
tenantDB: tenantDB,
hostDB: hostDB,
hostURL: hostURL,
// Prefer cluster backups most of the time since those
// are what we use more often at the moment.
useClusterBackup: rng.Float64() > 0.2,

nemeses: []nemesis{
{name: "CREATE TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
tenantName := fmt.Sprintf("test-tenant-%d", n.rng.Int63())
if _, err := hostDB.Exec("CREATE TENANT $1", tenantName); err != nil {
return err
}
n.addTenant(tenantName)
return nil
},
},
{name: "DELETE TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
tenant := n.popTenant()
if tenant == "" {
n.t.Log("no tenants to delete")
return nil
}
if _, err := hostDB.Exec("DROP TENANT $1", tenant); err != nil {
return err
}
return nil
},
},
{name: "RENAME TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
tenant := n.popTenant()
if tenant == "" {
n.t.Log("no tenants to rename")
return nil
}
newName := fmt.Sprintf("test-tenant-%d", n.rng.Int63())
if _, err := hostDB.Exec("ALTER TENANT $1 RENAME TO $2", tenant, newName); err != nil {
return err
}
n.addTenant(newName)
return nil
},
},
{name: "REPLICATE TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
if n.replCount.Load() > 3 {
n.t.Log("already hit replication limit")
return nil
}
if _, err := hostDB.Exec("SET CLUSTER SETTING physical_replication.enabled = true"); err != nil {
return err
}
if _, err := hostDB.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
return err
}
newName := fmt.Sprintf("repl-tenant-%d", n.rng.Int63())
if _, err := hostDB.Exec("CREATE TENANT $1 FROM REPLICATION OF $2 ON $3", newName, "tenant-10", n.hostURL); err != nil {
return err
}
n.addTenant(newName)
n.replCount.Add(1)
return nil
},
},
{name: "CANCELED IMPORT INTO",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
if _, err := db.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest'"); err != nil {
return err
}
Expand Down Expand Up @@ -359,7 +447,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN
},
},
{name: "IMPORT INTO",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
tableName, err := n.makeRandomBankTable("import_into")
if err != nil {
return err
Expand All @@ -369,7 +457,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN
},
},
{name: "CREATE INDEX",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
tableName, err := n.makeRandomBankTable("create_index")
if err != nil {
return err
Expand All @@ -382,7 +470,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN
},
},
{name: "CREATE UNIQUE INDEX (will fail)",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
tableName, err := n.makeRandomBankTable("create_unique_index")
if err != nil {
return err
Expand Down Expand Up @@ -429,10 +517,68 @@ func (r *randomBackupNemesis) TablesToCheck() []string {
return ret
}

func (r *randomBackupNemesis) BackupLocation() string {
if r.useClusterBackup {
return "nodelocal://1/cluster-backup"
}
return "nodelocal://1/tenant-backup"
}

func (r *randomBackupNemesis) FullBackup() error {
backupQuery := "BACKUP TENANT 10 INTO $1"
if r.useClusterBackup {
backupQuery = "BACKUP INTO $1 WITH include_all_secondary_tenants"
}
r.t.Logf("backup-nemesis: full backup started: %s", backupQuery)
if _, err := r.hostDB.Exec(backupQuery, r.BackupLocation()); err != nil {
return err
}
r.t.Log("backup-nemesis: full backup finished")
return nil
}

func (r *randomBackupNemesis) IncrementalBackup() (string, error) {
var aost string
if err := r.hostDB.QueryRow("SELECT cluster_logical_timestamp()").Scan(&aost); err != nil {
return aost, err
}
backupQuery := fmt.Sprintf("BACKUP TENANT 10 INTO LATEST IN '%s' AS OF SYSTEM TIME %s", r.BackupLocation(), aost)
if r.useClusterBackup {
backupQuery = fmt.Sprintf("BACKUP INTO LATEST IN '%s' AS OF SYSTEM TIME %s WITH include_all_secondary_tenants", r.BackupLocation(), aost)
}

r.t.Logf("backup-nemesis: incremental backup started: %s", backupQuery)
if _, err := r.hostDB.Exec(backupQuery); err != nil {
return aost, err
}
r.t.Logf("backup-nemesis: incremental backup finished")
return aost, nil

}

func (r *randomBackupNemesis) addTable(name string) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.tablesToCheck = append(r.mu.tablesToCheck, name)
r.mu.Unlock()

}

func (r *randomBackupNemesis) addTenant(name string) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.otherTenants = append(r.mu.otherTenants, name)

}

func (r *randomBackupNemesis) popTenant() string {
r.mu.Lock()
defer r.mu.Unlock()
var tenant string
if len(r.mu.otherTenants) < 1 {
return tenant
}
tenant, r.mu.otherTenants = r.mu.otherTenants[0], r.mu.otherTenants[1:]
return tenant
}

func (r *randomBackupNemesis) runNemesis(ctx context.Context) error {
Expand All @@ -444,7 +590,7 @@ func (r *randomBackupNemesis) runNemesis(ctx context.Context) error {
}
n := r.nemeses[r.rng.Intn(len(r.nemeses))]
r.t.Logf("backup-nemesis: %s started", n.name)
if err := n.impl(ctx, r, r.db); err != nil {
if err := n.impl(ctx, r, r.tenantDB, r.hostDB); err != nil {
r.t.Logf("backup-nemesis: %s failed: %s", n.name, err)
return err
}
Expand All @@ -454,11 +600,11 @@ func (r *randomBackupNemesis) runNemesis(ctx context.Context) error {

func (r *randomBackupNemesis) makeRandomBankTable(prefix string) (string, error) {
tableName := fmt.Sprintf("%s_%s", prefix, uuid.FastMakeV4().String())
if _, err := r.db.Exec(fmt.Sprintf(`CREATE TABLE "%s" (id INT PRIMARY KEY, n INT, s STRING)`, tableName)); err != nil {
if _, err := r.tenantDB.Exec(fmt.Sprintf(`CREATE TABLE "%s" (id INT PRIMARY KEY, n INT, s STRING)`, tableName)); err != nil {
return "", err
}
importStmt := fmt.Sprintf(`IMPORT INTO "%s" CSV DATA ('workload:///csv/bank/bank?payload-bytes=100&row-end=1&row-start=0&rows=1000&seed=1&version=1.0.0')`, tableName)
if _, err := r.db.Exec(importStmt); err != nil {
if _, err := r.tenantDB.Exec(importStmt); err != nil {
return tableName, err
}
return tableName, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ func alterTenantJobCutover(
if alterTenantStmt.Cutover.Latest {
replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress)
if replicatedTime.IsEmpty() {
return hlc.Timestamp{},
errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName)
cutoverTime = details.ReplicationStartTime
} else {
cutoverTime = replicatedTime
}
cutoverTime = replicatedTime
}

// TODO(ssd): We could use the replication manager here, but
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}
return s.handleResumeError(ctx, jobExecCtx, err)
}

if err := jobExecCtx.ExecCfg().JobRegistry.CheckPausepoint("stream_ingestion.before_ingestion"); err != nil {
return err
}

// Start ingesting KVs from the replication stream.
err = ingestWithRetries(ctx, jobExecCtx, s)
if err != nil {
Expand Down
Loading

0 comments on commit 078c69a

Please sign in to comment.