Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: add tenant operations to backup nemesis #113851

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 184 additions & 38 deletions pkg/ccl/backupccl/tenant_backup_nemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"math/rand"
"net/url"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -145,7 +146,8 @@ func TestTenantBackupNemesis(t *testing.T) {
defer hostClusterCleanupFn()

tenant10, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantID: roachpb.MustMakeTenantID(10),
TenantID: roachpb.MustMakeTenantID(10),
TenantName: "tenant-10",
TestingKnobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
Expand All @@ -170,8 +172,6 @@ func TestTenantBackupNemesis(t *testing.T) {
_, err = workloadsql.Setup(ctx, tenant10Conn, bankData, l)
require.NoError(t, err)

backupLoc := "nodelocal://1/tenant-backup"

backupDone := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -204,8 +204,22 @@ func TestTenantBackupNemesis(t *testing.T) {
}
})

hostURL, cleanup, err := sqlutils.PGUrlE(
tc.SystemLayer(0).AdvSQLAddr(), "backup-nemesis", url.User(username.RootUser))
require.NoError(t, err)
defer cleanup()

// Effectively reserve tenant 11 for restore. We care about
// the ID here because the IDs are built into the test
// certificates.
hostSQLDB.Exec(t, "SELECT crdb_internal.create_tenant(12)")
// Disabled until #113852 is fixed or the feature is removed.
hostSQLDB.Exec(t, "SET CLUSTER SETTING kv.bulkio.write_metadata_sst.enabled=false")
_, err = tenant10Conn.Exec("SET CLUSTER SETTING kv.bulkio.write_metadata_sst.enabled=false")
require.NoError(t, err)

rng, _ := randutil.NewPseudoRand()
nemesisRunner := newBackupNemesis(t, rng, tenant10Conn)
nemesisRunner := newBackupNemesis(t, rng, tenant10Conn, hostSQLDB.DB.(*gosql.DB), hostURL.String())
nemesisRunner.Start(ctx)

var tablesToCheck []string
Expand All @@ -214,11 +228,10 @@ func TestTenantBackupNemesis(t *testing.T) {
defer close(backupDone)
defer nemesisRunner.Stop()

t.Logf("backup-nemesis: full backup started")
hostSQLDB.Exec(t, fmt.Sprintf("BACKUP TENANT 10 INTO '%s'", backupLoc))
t.Logf("backup-nemesis: full backup finished")

numIncrementals := 30
if err := nemesisRunner.FullBackup(); err != nil {
return err
}
numIncrementals := 20
if util.RaceEnabled {
numIncrementals = 10
}
Expand All @@ -227,38 +240,38 @@ func TestTenantBackupNemesis(t *testing.T) {
// don't try to coordinate between the backups
// and the nemesis runner.
tablesToCheck = nemesisRunner.TablesToCheck()
hostSQLDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&aost)
backupQuery := fmt.Sprintf("BACKUP TENANT 10 INTO LATEST IN '%s' AS OF SYSTEM TIME %s", backupLoc, aost)
t.Logf("backup-nemesis: incremental backup started: %s", backupQuery)
hostSQLDB.Exec(t, backupQuery)
t.Logf("backup-nemesis: incremental backup finished")
var err error
aost, err = nemesisRunner.IncrementalBackup()
if err != nil {
return err
}
}
return nil
})

require.NoError(t, g.Wait())
restoreQuery := fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' AS OF SYSTEM TIME %s WITH virtual_cluster_name = 'cluster-11'", backupLoc, aost)
t.Logf("backup-nemesis: restoring tenant 10 into 11: %s", restoreQuery)
restoreQuery := fmt.Sprintf("RESTORE TENANT 10 FROM LATEST IN '%s' AS OF SYSTEM TIME %s WITH virtual_cluster = '11', virtual_cluster_name = 'restored-tenant-10'", nemesisRunner.BackupLocation(), aost)
t.Logf("backup-nemesis: restoring tenant 10 into restored-tenant-10: %s", restoreQuery)
hostSQLDB.Exec(t, restoreQuery)

// Validation
//
// We check bank.bank which has had the workload running against it
// and any table from a completed nemesis.
tenant11, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantName: "cluster-11",
restoredTenant, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantName: "restored-tenant-10",
DisableCreateTenant: true,
})
require.NoError(t, err)

tenant11Conn := tenant11.SQLConn(t, serverutils.DBName("bank"))
restoreTenantConn := restoredTenant.SQLConn(t, serverutils.DBName("bank"))

tenant10SQLDB := sqlutils.MakeSQLRunner(tenant10Conn)
tenant11SQLDB := sqlutils.MakeSQLRunner(tenant11Conn)
restoredTenantSQLDB := sqlutils.MakeSQLRunner(restoreTenantConn)

validateTableAcrossTenantRestore(t, tenant10SQLDB, tenant11SQLDB, "bank.bank", aost)
validateTableAcrossTenantRestore(t, tenant10SQLDB, restoredTenantSQLDB, "bank.bank", aost)
for _, name := range tablesToCheck {
validateTableAcrossTenantRestore(t, tenant10SQLDB, tenant11SQLDB, fmt.Sprintf(`bank."%s"`, name), aost)
validateTableAcrossTenantRestore(t, tenant10SQLDB, restoredTenantSQLDB, fmt.Sprintf(`bank."%s"`, name), aost)
}
}

Expand Down Expand Up @@ -297,35 +310,110 @@ func assertEqualQueries(
}

type randomBackupNemesis struct {
t *testing.T
db *gosql.DB
t *testing.T
tenantDB *gosql.DB
hostDB *gosql.DB
hostURL string

grp ctxgroup.Group
cancel context.CancelFunc
rng *rand.Rand

nemeses []nemesis
useClusterBackup bool
// We keep some state for the replication nemesis to avoid
// having hundreds of running replication streams slowing down
// the test.
replCount atomic.Int32
nemeses []nemesis

mu struct {
syncutil.Mutex
tablesToCheck []string

otherTenants []string
}
}

type nemesis struct {
name string
impl func(context.Context, *randomBackupNemesis, *gosql.DB) error
impl func(context.Context, *randomBackupNemesis, *gosql.DB, *gosql.DB) error
}

func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupNemesis {
func newBackupNemesis(
t *testing.T, rng *rand.Rand, tenantDB *gosql.DB, hostDB *gosql.DB, hostURL string,
) *randomBackupNemesis {
return &randomBackupNemesis{
t: t,
rng: rng,
db: db,
t: t,
rng: rng,
tenantDB: tenantDB,
hostDB: hostDB,
hostURL: hostURL,
// Prefer cluster backups most of the time since those
// are what we use more often at the moment.
useClusterBackup: rng.Float64() > 0.2,

nemeses: []nemesis{
{name: "CREATE TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
tenantName := fmt.Sprintf("test-tenant-%d", n.rng.Int63())
if _, err := hostDB.Exec("CREATE TENANT $1", tenantName); err != nil {
return err
}
n.addTenant(tenantName)
return nil
},
},
{name: "DELETE TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
tenant := n.popTenant()
if tenant == "" {
n.t.Log("no tenants to delete")
return nil
}
if _, err := hostDB.Exec("DROP TENANT $1", tenant); err != nil {
return err
}
return nil
},
},
{name: "RENAME TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
tenant := n.popTenant()
if tenant == "" {
n.t.Log("no tenants to rename")
return nil
}
newName := fmt.Sprintf("test-tenant-%d", n.rng.Int63())
if _, err := hostDB.Exec("ALTER TENANT $1 RENAME TO $2", tenant, newName); err != nil {
return err
}
n.addTenant(newName)
return nil
},
},
{name: "REPLICATE TENANT",
impl: func(ctx context.Context, n *randomBackupNemesis, _, hostDB *gosql.DB) error {
if n.replCount.Load() > 3 {
n.t.Log("already hit replication limit")
return nil
}
if _, err := hostDB.Exec("SET CLUSTER SETTING physical_replication.enabled = true"); err != nil {
return err
}
if _, err := hostDB.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
return err
}
newName := fmt.Sprintf("repl-tenant-%d", n.rng.Int63())
if _, err := hostDB.Exec("CREATE TENANT $1 FROM REPLICATION OF $2 ON $3", newName, "tenant-10", n.hostURL); err != nil {
return err
}
n.addTenant(newName)
n.replCount.Add(1)
return nil
},
},
{name: "CANCELED IMPORT INTO",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
if _, err := db.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest'"); err != nil {
return err
}
Expand Down Expand Up @@ -359,7 +447,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN
},
},
{name: "IMPORT INTO",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
tableName, err := n.makeRandomBankTable("import_into")
if err != nil {
return err
Expand All @@ -369,7 +457,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN
},
},
{name: "CREATE INDEX",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
tableName, err := n.makeRandomBankTable("create_index")
if err != nil {
return err
Expand All @@ -382,7 +470,7 @@ func newBackupNemesis(t *testing.T, rng *rand.Rand, db *gosql.DB) *randomBackupN
},
},
{name: "CREATE UNIQUE INDEX (will fail)",
impl: func(ctx context.Context, n *randomBackupNemesis, db *gosql.DB) error {
impl: func(ctx context.Context, n *randomBackupNemesis, db, _ *gosql.DB) error {
tableName, err := n.makeRandomBankTable("create_unique_index")
if err != nil {
return err
Expand Down Expand Up @@ -429,10 +517,68 @@ func (r *randomBackupNemesis) TablesToCheck() []string {
return ret
}

func (r *randomBackupNemesis) BackupLocation() string {
if r.useClusterBackup {
return "nodelocal://1/cluster-backup"
}
return "nodelocal://1/tenant-backup"
}

func (r *randomBackupNemesis) FullBackup() error {
backupQuery := "BACKUP TENANT 10 INTO $1"
if r.useClusterBackup {
backupQuery = "BACKUP INTO $1 WITH include_all_secondary_tenants"
}
r.t.Logf("backup-nemesis: full backup started: %s", backupQuery)
if _, err := r.hostDB.Exec(backupQuery, r.BackupLocation()); err != nil {
return err
}
r.t.Log("backup-nemesis: full backup finished")
return nil
}

func (r *randomBackupNemesis) IncrementalBackup() (string, error) {
var aost string
if err := r.hostDB.QueryRow("SELECT cluster_logical_timestamp()").Scan(&aost); err != nil {
return aost, err
}
backupQuery := fmt.Sprintf("BACKUP TENANT 10 INTO LATEST IN '%s' AS OF SYSTEM TIME %s", r.BackupLocation(), aost)
if r.useClusterBackup {
backupQuery = fmt.Sprintf("BACKUP INTO LATEST IN '%s' AS OF SYSTEM TIME %s WITH include_all_secondary_tenants", r.BackupLocation(), aost)
}

r.t.Logf("backup-nemesis: incremental backup started: %s", backupQuery)
if _, err := r.hostDB.Exec(backupQuery); err != nil {
return aost, err
}
r.t.Logf("backup-nemesis: incremental backup finished")
return aost, nil

}

func (r *randomBackupNemesis) addTable(name string) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.tablesToCheck = append(r.mu.tablesToCheck, name)
r.mu.Unlock()

}

func (r *randomBackupNemesis) addTenant(name string) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.otherTenants = append(r.mu.otherTenants, name)

}

func (r *randomBackupNemesis) popTenant() string {
r.mu.Lock()
defer r.mu.Unlock()
var tenant string
if len(r.mu.otherTenants) < 1 {
return tenant
}
tenant, r.mu.otherTenants = r.mu.otherTenants[0], r.mu.otherTenants[1:]
return tenant
}

func (r *randomBackupNemesis) runNemesis(ctx context.Context) error {
Expand All @@ -444,7 +590,7 @@ func (r *randomBackupNemesis) runNemesis(ctx context.Context) error {
}
n := r.nemeses[r.rng.Intn(len(r.nemeses))]
r.t.Logf("backup-nemesis: %s started", n.name)
if err := n.impl(ctx, r, r.db); err != nil {
if err := n.impl(ctx, r, r.tenantDB, r.hostDB); err != nil {
r.t.Logf("backup-nemesis: %s failed: %s", n.name, err)
return err
}
Expand All @@ -454,11 +600,11 @@ func (r *randomBackupNemesis) runNemesis(ctx context.Context) error {

func (r *randomBackupNemesis) makeRandomBankTable(prefix string) (string, error) {
tableName := fmt.Sprintf("%s_%s", prefix, uuid.FastMakeV4().String())
if _, err := r.db.Exec(fmt.Sprintf(`CREATE TABLE "%s" (id INT PRIMARY KEY, n INT, s STRING)`, tableName)); err != nil {
if _, err := r.tenantDB.Exec(fmt.Sprintf(`CREATE TABLE "%s" (id INT PRIMARY KEY, n INT, s STRING)`, tableName)); err != nil {
return "", err
}
importStmt := fmt.Sprintf(`IMPORT INTO "%s" CSV DATA ('workload:///csv/bank/bank?payload-bytes=100&row-end=1&row-start=0&rows=1000&seed=1&version=1.0.0')`, tableName)
if _, err := r.db.Exec(importStmt); err != nil {
if _, err := r.tenantDB.Exec(importStmt); err != nil {
return tableName, err
}
return tableName, nil
Expand Down