Skip to content

Commit

Permalink
c2c: refactor roachtest driver to run and stream arbitrary workloads
Browse files Browse the repository at this point in the history
This patch refactors the roachtest driver such that:
1) the streamingWorkload interface can run a custom workload with arbitrary sql
queries.
2) to reduce helper function signature bloat, many helper functions are now
replicationTestSpec methods.
3) the test writer can specity an `additionalDuration` of 0, which allows the
workload to terminate on its own.
4) a health monitor will fail the test if it cannot connect to a node

This patch also adds two new roachtests:
- c2c/BulkOps: runs the backup/mvcc-range-tombstones roachtest on
  the source cluster (without the backup-restore roundtrips for now), and
  streams it to the destination.
- c2c/UnitTest: is quick roachtest that can be used to debug the c2c roachtest
  infrastructure.

Informs cockroachdb#89176

Release note: None
  • Loading branch information
msbutler committed Mar 13, 2023
1 parent 9ed78bf commit 9cde691
Show file tree
Hide file tree
Showing 2 changed files with 408 additions and 285 deletions.
109 changes: 65 additions & 44 deletions pkg/cmd/roachtest/tests/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,11 +1109,19 @@ func registerBackup(r registry.Registry) {
Cluster: r.MakeClusterSpec(3, spec.CPU(8)),
EncryptionSupport: registry.EncryptionMetamorphic,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runBackupMVCCRangeTombstones(ctx, t, c)
runBackupMVCCRangeTombstones(ctx, t, c, mvccRangeTombstoneConfig{})
},
})
}

type mvccRangeTombstoneConfig struct {
tenantName string
skipClusterSetup bool

// TODO(msbutler): delete once tenants can back up to nodelocal.
skipBackupRestore bool
}

// runBackupMVCCRangeTombstones tests that backup and restore works in the
// presence of MVCC range tombstones. It uses data from TPCH's order table, 16
// GB across 8 CSV files.
Expand All @@ -1132,21 +1140,27 @@ func registerBackup(r registry.Registry) {
// We then do point-in-time restores of the database at times 'initial',
// 'canceled', 'completed', and the latest time, and compare the fingerprints to
// the original data.
func runBackupMVCCRangeTombstones(ctx context.Context, t test.Test, c cluster.Cluster) {
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Put(ctx, t.DeprecatedWorkload(), "./workload") // required for tpch
c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings())
func runBackupMVCCRangeTombstones(
ctx context.Context, t test.Test, c cluster.Cluster, config mvccRangeTombstoneConfig,
) {
if !config.skipClusterSetup {
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Put(ctx, t.DeprecatedWorkload(), "./workload") // required for tpch
c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings())
}
t.Status("starting csv servers")
c.Run(ctx, c.All(), `./cockroach workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`)

conn := c.Conn(ctx, t.L(), 1)
conn := c.Conn(ctx, t.L(), 1, option.TenantName(config.tenantName))

// Configure cluster.
t.Status("configuring cluster")
_, err := conn.Exec(`SET CLUSTER SETTING kv.bulk_ingest.max_index_buffer_size = '2gb'`)
require.NoError(t, err)
_, err = conn.Exec(`SET CLUSTER SETTING storage.mvcc.range_tombstones.enabled = 't'`)
require.NoError(t, err)
if config.tenantName == "" {
_, err = conn.Exec(`SET CLUSTER SETTING storage.mvcc.range_tombstones.enabled = 't'`)
require.NoError(t, err)
}
_, err = conn.Exec(`SET CLUSTER SETTING server.debug.default_vmodule = 'txn=2,sst_batcher=4,
revert=2'`)
require.NoError(t, err)
Expand Down Expand Up @@ -1201,6 +1215,9 @@ revert=2'`)
}

fingerprint := func(name, database, table string) (string, string, string) {
if config.skipBackupRestore {
return "", "", ""
}
var ts string
require.NoError(t, conn.QueryRowContext(ctx, `SELECT now()`).Scan(&ts))

Expand Down Expand Up @@ -1243,10 +1260,12 @@ revert=2'`)

// Take a full backup, using a database backup in order to perform a final
// incremental backup after the table has been dropped.
t.Status("taking full backup")
dest := "nodelocal://1/" + destinationName(c)
_, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO $1 WITH revision_history`, dest)
require.NoError(t, err)
if !config.skipBackupRestore {
t.Status("taking full backup")
_, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO $1 WITH revision_history`, dest)
require.NoError(t, err)
}

// Import and cancel even-numbered files twice.
files = []string{
Expand Down Expand Up @@ -1337,42 +1356,44 @@ revert=2'`)
require.ErrorIs(t, err, gosql.ErrNoRows)

// Take a final incremental backup.
t.Status("taking incremental backup")
_, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO LATEST IN $1 WITH revision_history`,
dest)
require.NoError(t, err)
if !config.skipBackupRestore {
t.Status("taking incremental backup")
_, err = conn.ExecContext(ctx, `BACKUP DATABASE tpch INTO LATEST IN $1 WITH revision_history`,
dest)
require.NoError(t, err)

// Schedule a final restore of the latest backup (above).
restores = append(restores, restore{
name: "dropped",
expectNoTables: true,
})
// Schedule a final restore of the latest backup (above).
restores = append(restores, restore{
name: "dropped",
expectNoTables: true,
})

// Restore backups at specific times and verify them.
for _, r := range restores {
t.Status(fmt.Sprintf("restoring backup at time '%s'", r.name))
db := "restore_" + r.name
if r.time != "" {
_, err = conn.ExecContext(ctx, fmt.Sprintf(
`RESTORE DATABASE tpch FROM LATEST IN '%s' AS OF SYSTEM TIME '%s' WITH new_db_name = '%s'`,
dest, r.time, db))
require.NoError(t, err)
} else {
_, err = conn.ExecContext(ctx, fmt.Sprintf(
`RESTORE DATABASE tpch FROM LATEST IN '%s' WITH new_db_name = '%s'`, dest, db))
require.NoError(t, err)
}
// Restore backups at specific times and verify them.
for _, r := range restores {
t.Status(fmt.Sprintf("restoring backup at time '%s'", r.name))
db := "restore_" + r.name
if r.time != "" {
_, err = conn.ExecContext(ctx, fmt.Sprintf(
`RESTORE DATABASE tpch FROM LATEST IN '%s' AS OF SYSTEM TIME '%s' WITH new_db_name = '%s'`,
dest, r.time, db))
require.NoError(t, err)
} else {
_, err = conn.ExecContext(ctx, fmt.Sprintf(
`RESTORE DATABASE tpch FROM LATEST IN '%s' WITH new_db_name = '%s'`, dest, db))
require.NoError(t, err)
}

if expect := r.expectFingerprint; expect != "" {
_, _, fp = fingerprint(r.name, db, "orders")
require.Equal(t, expect, fp, "fingerprint mismatch for restore at time '%s'", r.name)
}
if r.expectNoTables {
var tableCount int
require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf(
`SELECT count(*) FROM [SHOW TABLES FROM %s]`, db)).Scan(&tableCount))
require.Zero(t, tableCount, "found tables in restore at time '%s'", r.name)
t.Status("confirmed no tables in database " + db)
if expect := r.expectFingerprint; expect != "" {
_, _, fp = fingerprint(r.name, db, "orders")
require.Equal(t, expect, fp, "fingerprint mismatch for restore at time '%s'", r.name)
}
if r.expectNoTables {
var tableCount int
require.NoError(t, conn.QueryRowContext(ctx, fmt.Sprintf(
`SELECT count(*) FROM [SHOW TABLES FROM %s]`, db)).Scan(&tableCount))
require.Zero(t, tableCount, "found tables in restore at time '%s'", r.name)
t.Status("confirmed no tables in database " + db)
}
}
}
}
Expand Down
Loading

0 comments on commit 9cde691

Please sign in to comment.