Skip to content

Commit

Permalink
backupccl: moved datadriven test framework to its own file
Browse files Browse the repository at this point in the history
This change only moves the datadriven test and its utility methods
into its own file.

Informs: cockroachdb#77129

Release note: None
  • Loading branch information
adityamaru committed May 26, 2022
1 parent e4ca082 commit 777426d
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 283 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ go_test(
"bench_covering_test.go",
"bench_test.go",
"create_scheduled_backup_test.go",
"datadriven_test.go",
"full_cluster_backup_restore_test.go",
"incrementals_test.go",
"insert_missing_public_schema_namespace_entry_restore_test.go",
Expand Down
279 changes: 0 additions & 279 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -94,14 +93,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/proto"
pgx "github.com/jackc/pgx/v4"
"github.com/kr/pretty"
"github.com/lib/pq"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand All @@ -116,282 +113,6 @@ func makeTableSpan(tableID uint32) roachpb.Span {
return roachpb.Span{Key: k, EndKey: k.PrefixEnd()}
}

type sqlDBKey struct {
server string
user string
}

type datadrivenTestState struct {
servers map[string]serverutils.TestServerInterface
dataDirs map[string]string
sqlDBs map[sqlDBKey]*gosql.DB
noticeBuffer []string
cleanupFns []func()
}

var localityCfgs = map[string]roachpb.Locality{
"us-east-1": {
Tiers: []roachpb.Tier{
{Key: "region", Value: "us-east-1"},
{Key: "availability-zone", Value: "us-east1"},
},
},
"us-west-1": {
Tiers: []roachpb.Tier{
{Key: "region", Value: "us-west-1"},
{Key: "availability-zone", Value: "us-west1"},
},
},
"eu-central-1": {
Tiers: []roachpb.Tier{
{Key: "region", Value: "eu-central-1"},
{Key: "availability-zone", Value: "eu-central-1"},
},
},
"eu-north-1": {
Tiers: []roachpb.Tier{
{Key: "region", Value: "eu-north-1"},
{Key: "availability-zone", Value: "eu-north-1"},
},
},
}

func (d *datadrivenTestState) cleanup(ctx context.Context) {
for _, db := range d.sqlDBs {
db.Close()
}
for _, s := range d.servers {
s.Stopper().Stop(ctx)
}
for _, f := range d.cleanupFns {
f()
}
d.noticeBuffer = nil
}

func (d *datadrivenTestState) addServer(
t *testing.T,
name, iodir, tempCleanupFrequency string,
ioConf base.ExternalIODirConfig,
localities string,
) error {
var tc serverutils.TestClusterInterface
var cleanup func()
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODirConfig = ioConf
params.ServerArgs.Knobs = base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}

settings := cluster.MakeTestingClusterSettings()
closedts.TargetDuration.Override(context.Background(), &settings.SV, 10*time.Millisecond)
closedts.SideTransportCloseInterval.Override(context.Background(), &settings.SV, 10*time.Millisecond)

if tempCleanupFrequency != "" {
duration, err := time.ParseDuration(tempCleanupFrequency)
if err != nil {
return errors.New("unable to parse tempCleanupFrequency during server creation")
}
sql.TempObjectCleanupInterval.Override(context.Background(), &settings.SV, duration)
sql.TempObjectWaitInterval.Override(context.Background(), &settings.SV, time.Millisecond)
}
params.ServerArgs.Settings = settings

clusterSize := singleNode

if localities != "" {
cfgs := strings.Split(localities, ",")
clusterSize = len(cfgs)
serverArgsPerNode := make(map[int]base.TestServerArgs)
for i, cfg := range cfgs {
param := params.ServerArgs
param.Locality = localityCfgs[cfg]
serverArgsPerNode[i] = param
}
params.ServerArgsPerNode = serverArgsPerNode
}
if iodir == "" {
tc, _, iodir, cleanup = backupRestoreTestSetupWithParams(t, clusterSize, 0, InitManualReplication, params)
} else {
tc, _, cleanup = backupRestoreTestSetupEmptyWithParams(t, clusterSize, iodir, InitManualReplication, params)
}
d.servers[name] = tc.Server(0)
d.dataDirs[name] = iodir
d.cleanupFns = append(d.cleanupFns, cleanup)

return nil
}

func (d *datadrivenTestState) getIODir(t *testing.T, server string) string {
dir, ok := d.dataDirs[server]
if !ok {
t.Fatalf("server %s does not exist", server)
}
return dir
}

func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) *gosql.DB {
key := sqlDBKey{server, user}
if db, ok := d.sqlDBs[key]; ok {
return db
}
addr := d.servers[server].ServingSQLAddr()
pgURL, cleanup := sqlutils.PGUrl(t, addr, "TestBackupRestoreDataDriven", url.User(user))
d.cleanupFns = append(d.cleanupFns, cleanup)

base, err := pq.NewConnector(pgURL.String())
if err != nil {
t.Fatal(err)
}
connector := pq.ConnectorWithNoticeHandler(base, func(notice *pq.Error) {
d.noticeBuffer = append(d.noticeBuffer, notice.Severity+": "+notice.Message)
if notice.Detail != "" {
d.noticeBuffer = append(d.noticeBuffer, "DETAIL: "+notice.Detail)
}
if notice.Hint != "" {
d.noticeBuffer = append(d.noticeBuffer, "HINT: "+notice.Hint)
}
})
d.sqlDBs[key] = gosql.OpenDB(connector)
return d.sqlDBs[key]
}

func newDatadrivenTestState() datadrivenTestState {
return datadrivenTestState{
servers: make(map[string]serverutils.TestServerInterface),
dataDirs: make(map[string]string),
sqlDBs: make(map[sqlDBKey]*gosql.DB),
}
}

// TestBackupRestoreDataDriven is a datadriven test to test standard
// backup/restore interactions involving setting up clusters and running
// different SQL commands. The test files are in testdata/backup-restore.
// It has the following commands:
//
// - "new-server name=<name> [share-io-dir=<name>]": create a new server with
// the input name. It takes in an optional share-io-dir argument to share an
// IO directory with an existing server. This is useful when restoring from a
// backup taken in another server.
// - "exec-sql server=<name>": executes the input SQL query on the target server.
// By default, server is the last created server.
// - "query-sql server=<name>": executes the input SQL query on the target server
// and expects that the results are as desired. By default, server is the last
// created server.
// - "reset": clear all state associated with the test.
func TestBackupRestoreDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "takes >1 min under race")
skip.UnderDeadlock(t, "assertion failure under deadlock")

// This test uses this mock HTTP server to pass the backup files between tenants.
httpAddr, httpServerCleanup := makeInsecureHTTPServer(t)
defer httpServerCleanup()

ctx := context.Background()
datadriven.Walk(t, testutils.TestDataPath(t, "backup-restore", ""), func(t *testing.T, path string) {
var lastCreatedServer string
ds := newDatadrivenTestState()
defer ds.cleanup(ctx)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "sleep":
var sleepDuration string
d.ScanArgs(t, "time", &sleepDuration)
duration, err := time.ParseDuration(sleepDuration)
if err != nil {
return err.Error()
}
time.Sleep(duration)
return ""
case "reset":
ds.cleanup(ctx)
ds = newDatadrivenTestState()
return ""
case "new-server":
var name, shareDirWith, iodir, tempCleanupFrequency, localities string
var io base.ExternalIODirConfig
d.ScanArgs(t, "name", &name)
if d.HasArg("share-io-dir") {
d.ScanArgs(t, "share-io-dir", &shareDirWith)
}
if shareDirWith != "" {
iodir = ds.getIODir(t, shareDirWith)
}
if d.HasArg("allow-implicit-access") {
io.EnableNonAdminImplicitAndArbitraryOutbound = true
}
if d.HasArg("disable-http") {
io.DisableHTTP = true
}
if d.HasArg("temp-cleanup-freq") {
d.ScanArgs(t, "temp-cleanup-freq", &tempCleanupFrequency)
}
if d.HasArg("localities") {
d.ScanArgs(t, "localities", &localities)
}
lastCreatedServer = name
err := ds.addServer(t, name, iodir, tempCleanupFrequency, io, localities)
if err != nil {
return err.Error()
}
return ""
case "exec-sql":
server := lastCreatedServer
user := "root"
if d.HasArg("server") {
d.ScanArgs(t, "server", &server)
}
if d.HasArg("user") {
d.ScanArgs(t, "user", &user)
}
ds.noticeBuffer = nil
d.Input = strings.ReplaceAll(d.Input, "http://COCKROACH_TEST_HTTP_SERVER/", httpAddr)
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)
ret := ds.noticeBuffer
if err != nil {
// pausepoint errors have the job ID in them, and datadriven tests
// don't seem to support regex matching. Clean the error up to not
// include the job ID.
if i := strings.Index(err.Error(), "paused before it completed with reason"); i != -1 {
ret = append(ds.noticeBuffer, err.Error()[i:])
} else if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) {
ret = append(ds.noticeBuffer, err.Error())
if pqErr.Detail != "" {
ret = append(ret, "DETAIL: "+pqErr.Detail)
}
if pqErr.Hint != "" {
ret = append(ret, "HINT: "+pqErr.Hint)
}
} else {
t.Errorf("failed to execute stmt %s due to %s", d.Input, err.Error())
}
}
return strings.Join(ret, "\n")
case "query-sql":
server := lastCreatedServer
user := "root"
if d.HasArg("server") {
d.ScanArgs(t, "server", &server)
}
if d.HasArg("user") {
d.ScanArgs(t, "user", &user)
}
rows, err := ds.getSQLDB(t, server, user).Query(d.Input)
if err != nil {
return err.Error()
}
output, err := sqlutils.RowsToDataDrivenOutput(rows)
require.NoError(t, err)
return output
default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
})
}

func TestBackupRestoreStatementResult(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit 777426d

Please sign in to comment.