Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
81499: backupccl: rewrite temp-table datadriven test to be more deterministic r=stevendanna a=adityamaru

Previously, the test would override the temp cleanup frequency and
then sleep for long enough such that the temp cleanup would run. This was
always going to be flaky especially when run under stress, race or deadlock.

This changes the test to instead rely on channels to nudge the reconciliation
loop making it more robust and faster!

Fixes: cockroachdb#79026

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed May 20, 2022
2 parents fc6222e + 6b42f89 commit 92947c2
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 55 deletions.
142 changes: 90 additions & 52 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
Expand Down Expand Up @@ -70,22 +71,26 @@ type sqlDBKey struct {
}

type datadrivenTestState struct {
servers map[string]serverutils.TestServerInterface
dataDirs map[string]string
sqlDBs map[sqlDBKey]*gosql.DB
jobTags map[string]jobspb.JobID
clusterTimestamps map[string]string
noticeBuffer []string
cleanupFns []func()
servers map[string]serverutils.TestServerInterface
// tempObjectCleanupAndWait is a mapping from server name to a method that can
// be used to nudge and wait for temporary object cleanup.
tempObjectCleanupAndWait map[string]func()
dataDirs map[string]string
sqlDBs map[sqlDBKey]*gosql.DB
jobTags map[string]jobspb.JobID
clusterTimestamps map[string]string
noticeBuffer []string
cleanupFns []func()
}

func newDatadrivenTestState() datadrivenTestState {
return datadrivenTestState{
servers: make(map[string]serverutils.TestServerInterface),
dataDirs: make(map[string]string),
sqlDBs: make(map[sqlDBKey]*gosql.DB),
jobTags: make(map[string]jobspb.JobID),
clusterTimestamps: make(map[string]string),
servers: make(map[string]serverutils.TestServerInterface),
tempObjectCleanupAndWait: make(map[string]func()),
dataDirs: make(map[string]string),
sqlDBs: make(map[sqlDBKey]*gosql.DB),
jobTags: make(map[string]jobspb.JobID),
clusterTimestamps: make(map[string]string),
}
}

Expand All @@ -103,13 +108,18 @@ func (d *datadrivenTestState) cleanup(ctx context.Context) {
}

type serverCfg struct {
name string
iodir string
tempCleanupFrequency string
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
name string
iodir string
// nudgeTempObjectsCleanup is a channel used to nudge the temporary object
// reconciliation job to run.
nudgeTempObjectsCleanup chan time.Time
// tempObjectCleanupDone is the channel used by the temporary object
// reconciliation job to signal it is done cleaning up.
tempObjectCleanupDone chan struct{}
nodes int
splits int
ioConf base.ExternalIODirConfig
localities string
}

func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
Expand All @@ -121,17 +131,21 @@ func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}

// If the server needs to control temporary object cleanup, let us set that up
// now.
if cfg.nudgeTempObjectsCleanup != nil && cfg.tempObjectCleanupDone != nil {
params.ServerArgs.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
OnTempObjectsCleanupDone: func() {
cfg.tempObjectCleanupDone <- struct{}{}
},
TempObjectsCleanupCh: cfg.nudgeTempObjectsCleanup,
}
}

settings := cluster.MakeTestingClusterSettings()
closedts.TargetDuration.Override(context.Background(), &settings.SV, 10*time.Millisecond)
closedts.SideTransportCloseInterval.Override(context.Background(), &settings.SV, 10*time.Millisecond)
if cfg.tempCleanupFrequency != "" {
duration, err := time.ParseDuration(cfg.tempCleanupFrequency)
if err != nil {
return errors.New("unable to parse tempCleanupFrequency during server creation")
}
sql.TempObjectCleanupInterval.Override(context.Background(), &settings.SV, duration)
sql.TempObjectWaitInterval.Override(context.Background(), &settings.SV, time.Millisecond)
}
sql.TempObjectWaitInterval.Override(context.Background(), &settings.SV, time.Millisecond)
params.ServerArgs.Settings = settings

clusterSize := cfg.nodes
Expand All @@ -154,9 +168,25 @@ func (d *datadrivenTestState) addServer(t *testing.T, cfg serverCfg) error {
tc, _, cleanup = backupRestoreTestSetupEmptyWithParams(t, clusterSize, cfg.iodir,
InitManualReplication, params)
}
cleanupFn := func() {
if cfg.nudgeTempObjectsCleanup != nil {
close(cfg.nudgeTempObjectsCleanup)
}
if cfg.tempObjectCleanupDone != nil {
close(cfg.tempObjectCleanupDone)
}
cleanup()
}
d.servers[cfg.name] = tc.Server(0)
d.dataDirs[cfg.name] = cfg.iodir
d.cleanupFns = append(d.cleanupFns, cleanup)
d.cleanupFns = append(d.cleanupFns, cleanupFn)

if cfg.nudgeTempObjectsCleanup != nil && cfg.tempObjectCleanupDone != nil {
d.tempObjectCleanupAndWait[cfg.name] = func() {
cfg.nudgeTempObjectsCleanup <- timeutil.Now()
<-cfg.tempObjectCleanupDone
}
}

return nil
}
Expand Down Expand Up @@ -214,9 +244,6 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
//
// + disable-http: disables use of external HTTP endpoints.
//
// + temp-cleanup-freq: specifies the frequency with which the temporary table
// cleanup reconciliation job runs
//
// + localities: specifies the localities that will be used when starting up
// the test cluster. The cluster will have len(localities) nodes, with each
// node assigned a locality config corresponding to the locality. Please
Expand All @@ -226,6 +253,10 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
//
// + splits: specifies the number of ranges the bank table is split into.
//
// + control-temp-object-cleanup: sets up the server in a way that the test
// can control when to run the temporary object reconciliation loop using
// nudge-and-wait-for-temp-cleanup
//
// - "exec-sql [server=<name>] [user=<name>] [args]"
// Executes the input SQL query on the target server. By default, server is
// the last created server.
Expand Down Expand Up @@ -289,6 +320,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
//
// + expect-pausepoint: expects the schema change job to end up in a paused state because
// of a pausepoint error.
//
// - "nudge-and-wait-for-temp-cleanup"
// Nudges the temporary object reconciliation loop to run, and waits for completion.
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -307,24 +341,16 @@ func TestDataDriven(t *testing.T) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {

switch d.Cmd {
case "sleep":
var sleepDuration string
d.ScanArgs(t, "time", &sleepDuration)
duration, err := time.ParseDuration(sleepDuration)
if err != nil {
return err.Error()
}
time.Sleep(duration)
return ""

case "reset":
ds.cleanup(ctx)
ds = newDatadrivenTestState()
return ""

case "new-server":
var name, shareDirWith, iodir, tempCleanupFrequency, localities string
var name, shareDirWith, iodir, localities string
var splits int
var nudgeTempObjectCleanup chan time.Time
var tempObjectCleanupDone chan struct{}
nodes := singleNode
var io base.ExternalIODirConfig
d.ScanArgs(t, "name", &name)
Expand All @@ -340,9 +366,6 @@ func TestDataDriven(t *testing.T) {
if d.HasArg("disable-http") {
io.DisableHTTP = true
}
if d.HasArg("temp-cleanup-freq") {
d.ScanArgs(t, "temp-cleanup-freq", &tempCleanupFrequency)
}
if d.HasArg("localities") {
d.ScanArgs(t, "localities", &localities)
}
Expand All @@ -352,15 +375,21 @@ func TestDataDriven(t *testing.T) {
if d.HasArg("splits") {
d.ScanArgs(t, "splits", &splits)
}
if d.HasArg("control-temp-object-cleanup") {
nudgeTempObjectCleanup = make(chan time.Time)
tempObjectCleanupDone = make(chan struct{})
}

lastCreatedServer = name
cfg := serverCfg{
name: name,
iodir: iodir,
tempCleanupFrequency: tempCleanupFrequency,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
name: name,
iodir: iodir,
nudgeTempObjectsCleanup: nudgeTempObjectCleanup,
tempObjectCleanupDone: tempObjectCleanupDone,
nodes: nodes,
splits: splits,
ioConf: io,
localities: localities,
}
err := ds.addServer(t, cfg)
if err != nil {
Expand Down Expand Up @@ -605,6 +634,15 @@ func TestDataDriven(t *testing.T) {
ds.clusterTimestamps[timestampTag] = ts
return ""

case "nudge-and-wait-for-temp-cleanup":
server := lastCreatedServer
if nudgeAndWait, ok := ds.tempObjectCleanupAndWait[server]; !ok {
t.Fatalf("server %s was not configured with `control-temp-object-cleanup`", server)
} else {
nudgeAndWait()
}
return ""

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/testdata/backup-restore/temp-tables
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ SELECT table_name FROM [SHOW TABLES] ORDER BY table_name
----
perm_table

new-server name=s2 share-io-dir=s1 temp-cleanup-freq=5s
new-server name=s2 share-io-dir=s1 control-temp-object-cleanup
----

exec-sql
Expand Down Expand Up @@ -155,8 +155,8 @@ perm_table
temp_seq
temp_table

# Wait for the temp cleanup job to run.
sleep time=5s
# Nudge and wait for the temp cleanup job to run.
nudge-and-wait-for-temp-cleanup
----

# The synthetic temp schema should have been erased.
Expand Down

0 comments on commit 92947c2

Please sign in to comment.