From 0ab10e64cd972cd5707bb81b0d5e23efa7a379c7 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 9 Feb 2022 10:24:52 +0000 Subject: [PATCH] sql: add test to randomly pause schema change job This test randomly pauses a schema change before a call to (*schemaChanger).txn. Release note: None --- pkg/jobs/errors.go | 6 ++ pkg/sql/schema_changer.go | 14 ++++ pkg/sql/schema_changer_test.go | 125 +++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+) diff --git a/pkg/jobs/errors.go b/pkg/jobs/errors.go index 6ec709683cc0..8ad50f3da7ea 100644 --- a/pkg/jobs/errors.go +++ b/pkg/jobs/errors.go @@ -48,6 +48,12 @@ func IsPermanentJobError(err error) bool { return errors.Is(err, errJobPermanentSentinel) } +// IsPauseSelfError checks whether the given error is a +// PauseRequestError. +func IsPauseSelfError(err error) bool { + return errors.Is(err, errPauseSelfSentinel) +} + // errPauseSelfSentinel exists so the errors returned from PauseRequestErr can // be marked with it. var errPauseSelfSentinel = errors.New("job requested it be paused") diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 78c002151907..46a808f1ea3f 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -752,6 +752,10 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { } // Go through the recording motions. See comment above. sqltelemetry.RecordError(ctx, err, &sc.settings.SV) + if jobs.IsPauseSelfError(err) { + // For testing only + return err + } } // Run through mutation state machine and backfill. @@ -2130,6 +2134,10 @@ type SchemaChangerTestingKnobs struct { // RunBeforeResume runs at the start of the Resume hook. RunBeforeResume func(jobID jobspb.JobID) error + // RunBeforeDescTxn runs at the start of every call to + // (*schemaChanger).txn. + RunBeforeDescTxn func() error + // OldNamesDrainedNotification is called during a schema change, // after all leases on the version of the descriptor with the old // names are gone, and just before the mapping of the old names to the @@ -2173,6 +2181,12 @@ func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {} func (sc *SchemaChanger) txn( ctx context.Context, f func(context.Context, *kv.Txn, *descs.Collection) error, ) error { + if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { + if err := fn(); err != nil { + return err + } + } + return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, f) } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 07c97a4f25e4..31bb984082db 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -7496,3 +7497,127 @@ func TestAddIndexResumeAfterSettingFlippedFails(t *testing.T) { require.Error(t, <-errC, "schema change requires MVCC-compliant backfiller, but MVCC-compliant backfiller is not supported") } + +func TestPauseBeforeRandomDescTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + type testCase struct { + name string + setupSQL string + changeSQL string + verify func(t *testing.T, sqlRunner *sqlutils.SQLRunner) + } + + // We run the schema change twice. First, to find out how many + // sc.txn calls there are, and then a second time that pauses + // a random one. By finding the count of txns, we make sure + // that we have an equal probability of pausing after each + // transaction. + getTxnCount := func(t *testing.T, tc testCase) int { + var ( + count int32 // accessed atomically + shouldCount int32 // accessed atomically + ) + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeDescTxn: func() error { + if atomic.LoadInt32(&shouldCount) == 1 { + atomic.AddInt32(&count, 1) + } + return nil + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + sqlRunner := sqlutils.MakeSQLRunner(sqlDB) + defer s.Stopper().Stop(ctx) + + sqlRunner.Exec(t, tc.setupSQL) + atomic.StoreInt32(&shouldCount, 1) + sqlRunner.Exec(t, tc.changeSQL) + return int(atomic.LoadInt32(&count)) + } + + runWithPauseAt := func(t *testing.T, tc testCase, pauseAt int) { + var ( + count int32 // accessed atomically + shouldPause int32 // accessed atomically + jobID jobspb.JobID + ) + + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeResume: func(id jobspb.JobID) error { + jobID = id + return nil + }, + RunBeforeDescTxn: func() error { + if atomic.LoadInt32(&shouldPause) == 0 { + return nil + } + current := int(atomic.AddInt32(&count, 1)) + if current == pauseAt { + atomic.StoreInt32(&shouldPause, 0) + return jobs.MarkPauseRequestError(errors.Newf("paused sc.txn call %d", current)) + } + return nil + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + sqlRunner := sqlutils.MakeSQLRunner(sqlDB) + defer s.Stopper().Stop(ctx) + + sqlRunner.Exec(t, tc.setupSQL) + atomic.StoreInt32(&shouldPause, 1) + sqlRunner.ExpectErr(t, ".*paused sc.txn call.*", tc.changeSQL) + sqlRunner.Exec(t, "RESUME JOB $1", jobID) + + row := sqlRunner.QueryRow(t, "SELECT status FROM [SHOW JOB WHEN COMPLETE $1]", jobID) + var status string + row.Scan(&status) + require.Equal(t, "succeeded", status) + tc.verify(t, sqlRunner) + } + + rnd, _ := randutil.NewTestRand() + for _, tc := range []testCase{ + { + name: "create index", + setupSQL: ` +CREATE TABLE t (pk INT PRIMARY KEY, b INT); +INSERT INTO t VALUES (1, 1), (2, 2), (3, 3); +`, + changeSQL: "CREATE INDEX on t (b)", + verify: func(t *testing.T, sqlRunner *sqlutils.SQLRunner) { + rows := sqlutils.MatrixToStr(sqlRunner.QueryStr(t, "SELECT * FROM t@t_b_idx")) + require.Equal(t, "1, 1\n2, 2\n3, 3\n", rows) + }, + }, + } { + txnCount := getTxnCount(t, tc) + + const testAll = false + if testAll { + for i := 1; i <= txnCount; i++ { + t.Run(fmt.Sprintf("%s_pause_at_txn_%d", tc.name, i), func(t *testing.T) { + runWithPauseAt(t, tc, i) + }) + } + } else { + pauseAt := rnd.Intn(txnCount) + 1 + t.Run(fmt.Sprintf("%s_pause_at_txn_%d", tc.name, pauseAt), func(t *testing.T) { + runWithPauseAt(t, tc, pauseAt) + + }) + } + } + +}