Skip to content

Commit

Permalink
sql: add test to randomly pause schema change job
Browse files Browse the repository at this point in the history
This test randomly pauses a schema change before a call
to (*schemaChanger).txn.

Release note: None
  • Loading branch information
stevendanna committed Feb 10, 2022
1 parent 1b7124c commit 0ab10e6
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/jobs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func IsPermanentJobError(err error) bool {
return errors.Is(err, errJobPermanentSentinel)
}

// IsPauseSelfError checks whether the given error is a
// PauseRequestError.
func IsPauseSelfError(err error) bool {
return errors.Is(err, errPauseSelfSentinel)
}

// errPauseSelfSentinel exists so the errors returned from PauseRequestErr can
// be marked with it.
var errPauseSelfSentinel = errors.New("job requested it be paused")
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ func (sc *SchemaChanger) exec(ctx context.Context) error {
}
// Go through the recording motions. See comment above.
sqltelemetry.RecordError(ctx, err, &sc.settings.SV)
if jobs.IsPauseSelfError(err) {
// For testing only
return err
}
}

// Run through mutation state machine and backfill.
Expand Down Expand Up @@ -2130,6 +2134,10 @@ type SchemaChangerTestingKnobs struct {
// RunBeforeResume runs at the start of the Resume hook.
RunBeforeResume func(jobID jobspb.JobID) error

// RunBeforeDescTxn runs at the start of every call to
// (*schemaChanger).txn.
RunBeforeDescTxn func() error

// OldNamesDrainedNotification is called during a schema change,
// after all leases on the version of the descriptor with the old
// names are gone, and just before the mapping of the old names to the
Expand Down Expand Up @@ -2173,6 +2181,12 @@ func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {}
func (sc *SchemaChanger) txn(
ctx context.Context, f func(context.Context, *kv.Txn, *descs.Collection) error,
) error {
if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil {
if err := fn(); err != nil {
return err
}
}

return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, f)
}

Expand Down
125 changes: 125 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -7496,3 +7497,127 @@ func TestAddIndexResumeAfterSettingFlippedFails(t *testing.T) {

require.Error(t, <-errC, "schema change requires MVCC-compliant backfiller, but MVCC-compliant backfiller is not supported")
}

func TestPauseBeforeRandomDescTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

type testCase struct {
name string
setupSQL string
changeSQL string
verify func(t *testing.T, sqlRunner *sqlutils.SQLRunner)
}

// We run the schema change twice. First, to find out how many
// sc.txn calls there are, and then a second time that pauses
// a random one. By finding the count of txns, we make sure
// that we have an equal probability of pausing after each
// transaction.
getTxnCount := func(t *testing.T, tc testCase) int {
var (
count int32 // accessed atomically
shouldCount int32 // accessed atomically
)
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeDescTxn: func() error {
if atomic.LoadInt32(&shouldCount) == 1 {
atomic.AddInt32(&count, 1)
}
return nil
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
sqlRunner := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)

sqlRunner.Exec(t, tc.setupSQL)
atomic.StoreInt32(&shouldCount, 1)
sqlRunner.Exec(t, tc.changeSQL)
return int(atomic.LoadInt32(&count))
}

runWithPauseAt := func(t *testing.T, tc testCase, pauseAt int) {
var (
count int32 // accessed atomically
shouldPause int32 // accessed atomically
jobID jobspb.JobID
)

params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeResume: func(id jobspb.JobID) error {
jobID = id
return nil
},
RunBeforeDescTxn: func() error {
if atomic.LoadInt32(&shouldPause) == 0 {
return nil
}
current := int(atomic.AddInt32(&count, 1))
if current == pauseAt {
atomic.StoreInt32(&shouldPause, 0)
return jobs.MarkPauseRequestError(errors.Newf("paused sc.txn call %d", current))
}
return nil
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
sqlRunner := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)

sqlRunner.Exec(t, tc.setupSQL)
atomic.StoreInt32(&shouldPause, 1)
sqlRunner.ExpectErr(t, ".*paused sc.txn call.*", tc.changeSQL)
sqlRunner.Exec(t, "RESUME JOB $1", jobID)

row := sqlRunner.QueryRow(t, "SELECT status FROM [SHOW JOB WHEN COMPLETE $1]", jobID)
var status string
row.Scan(&status)
require.Equal(t, "succeeded", status)
tc.verify(t, sqlRunner)
}

rnd, _ := randutil.NewTestRand()
for _, tc := range []testCase{
{
name: "create index",
setupSQL: `
CREATE TABLE t (pk INT PRIMARY KEY, b INT);
INSERT INTO t VALUES (1, 1), (2, 2), (3, 3);
`,
changeSQL: "CREATE INDEX on t (b)",
verify: func(t *testing.T, sqlRunner *sqlutils.SQLRunner) {
rows := sqlutils.MatrixToStr(sqlRunner.QueryStr(t, "SELECT * FROM t@t_b_idx"))
require.Equal(t, "1, 1\n2, 2\n3, 3\n", rows)
},
},
} {
txnCount := getTxnCount(t, tc)

const testAll = false
if testAll {
for i := 1; i <= txnCount; i++ {
t.Run(fmt.Sprintf("%s_pause_at_txn_%d", tc.name, i), func(t *testing.T) {
runWithPauseAt(t, tc, i)
})
}
} else {
pauseAt := rnd.Intn(txnCount) + 1
t.Run(fmt.Sprintf("%s_pause_at_txn_%d", tc.name, pauseAt), func(t *testing.T) {
runWithPauseAt(t, tc, pauseAt)

})
}
}

}

0 comments on commit 0ab10e6

Please sign in to comment.