From c56dcaa9049eb705adb12b2c5ed2b4445458e0aa Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Wed, 9 Aug 2023 11:39:54 -0400 Subject: [PATCH 1/2] schemachanger: Cleanup redundant tests and refactor it This commit recognizes that there were previously three redundant tests about concurrent schema changer behavior so it deletes them and rewrite it to a simpler one. Release note: None --- pkg/sql/schemachanger/BUILD.bazel | 11 +- pkg/sql/schemachanger/schemachanger_test.go | 885 +++----------------- 2 files changed, 121 insertions(+), 775 deletions(-) diff --git a/pkg/sql/schemachanger/BUILD.bazel b/pkg/sql/schemachanger/BUILD.bazel index 8f8658a425a2..6ca5cc59dc94 100644 --- a/pkg/sql/schemachanger/BUILD.bazel +++ b/pkg/sql/schemachanger/BUILD.bazel @@ -39,39 +39,34 @@ go_test( "//pkg/ccl", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/kv", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", "//pkg/sql", "//pkg/sql/catalog", - "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/desctestutils", "//pkg/sql/execinfra", - "//pkg/sql/pgwire/pgcode", "//pkg/sql/rowenc", "//pkg/sql/schemachanger/scexec", "//pkg/sql/schemachanger/scop", "//pkg/sql/schemachanger/scplan", "//pkg/sql/schemachanger/sctest", # keep - "//pkg/sql/sqltestutils", + "//pkg/sql/sessiondatapb", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", - "//pkg/util", "//pkg/util/ctxgroup", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", - "//pkg/util/syncutil", - "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//errorspb", - "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index be5838e1acf9..5c4f73ef5003 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -20,798 +20,34 @@ import ( "sync/atomic" "testing" - "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan" - "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/errorspb" - "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) -// TestConcurrentDeclarativeSchemaChanges tests that concurrent declarative -// schema changes operating on the same descriptors are performed serially. -func TestConcurrentDeclarativeSchemaChanges(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx, cancel := context.WithCancel(context.Background()) - - var maxValue = 4000 - if util.RaceEnabled { - // Race builds are a lot slower, so use a smaller number of rows. - maxValue = 200 - } - - // Protects backfillNotification. - var mu syncutil.Mutex - var backfillNotification, continueNotification chan struct{} - // We have to have initBackfillNotification return the new - // channel rather than having later users read the original - // backfillNotification to make the race detector happy. - initBackfillNotification := func() (chan struct{}, chan struct{}) { - mu.Lock() - defer mu.Unlock() - backfillNotification = make(chan struct{}) - continueNotification = make(chan struct{}) - return backfillNotification, continueNotification - } - notifyBackfill := func() { - mu.Lock() - defer mu.Unlock() - if backfillNotification != nil { - close(backfillNotification) - backfillNotification = nil - } - if continueNotification != nil { - <-continueNotification - close(continueNotification) - continueNotification = nil - } - } - var alterPrimaryKeyBlockedCounter atomic.Uint32 - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { - for _, stmt := range stmts { - if strings.Contains(stmt, "ALTER PRIMARY KEY") { - alterPrimaryKeyBlockedCounter.Add(1) - return - } - } - }, - }, - DistSQL: &execinfra.TestingKnobs{ - RunBeforeBackfillChunk: func(_ roachpb.Span) error { - notifyBackfill() - return nil - }, - }, - // Decrease the adopt loop interval so that retries happen quickly. - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - // Prevent the GC job from running so we ensure that all the keys - // which were written remain. - GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error { - <-ctx.Done() - return ctx.Err() - }}, - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - defer cancel() - codec := s.ApplicationLayer().Codec() - - if _, err := sqlDB.Exec(`CREATE DATABASE t`); err != nil { - t.Fatal(err) - } - - if _, err := sqlDB.Exec(`CREATE TABLE t.test (k INT NOT NULL, v INT)`); err != nil { - t.Fatal(err) - } - if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil { - t.Fatal(err) - } - - backfillNotif, continueNotif := initBackfillNotification() - var wg sync.WaitGroup - wg.Add(1) - go func() { - if _, err := sqlDB.Exec(`CREATE INDEX i ON t.test (v)`); err != nil { - t.Error(err) - } - wg.Done() - }() - - // Wait until the create index schema change job has started - // before kicking off the alter primary key. - <-backfillNotif - require.Zero(t, alterPrimaryKeyBlockedCounter.Load()) - wg.Add(1) - go func() { - if _, err := sqlDB.Exec(`ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (k)`); err != nil { - t.Error(err) - } - wg.Done() - }() - - // The ALTER PRIMARY KEY schema change must block. - testutils.SucceedsSoon(t, func() error { - if alterPrimaryKeyBlockedCounter.Load() == 0 { - return errors.New("waiting for concurrent schema change to block") - } - return nil - }) - - // Unblock the create index job. - continueNotif <- struct{}{} - wg.Wait() - - // The ALTER PRIMARY KEY schema change must have been blocked. - require.NotZero(t, alterPrimaryKeyBlockedCounter.Load()) - - // There should be 5 k/v pairs per row: - // * the original primary index keyed on rowid, - // * the first version of the secondary index on v with rowid as key suffix, - // * the intermediate version of the new primary index keyed on k but - // including rowid as a stored column, - // * the final version of the new primary index keyed on k and not including rowid, - // * the final version of the secondary index for v with k as the key suffix. - testutils.SucceedsSoon(t, func() error { - return sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 5, maxValue) - }) -} - -func TestSchemaChangeWaitsForOtherSchemaChanges(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - t.Run("wait for legacy schema changes", func(t *testing.T) { - // This test starts an legacy schema change job (job 1), and then starts - // another legacy schema change job (job 2) and a declarative schema change - // job (job 3) while job 1 is backfilling. Job 1 is resumed after job 2 - // has started running. - ctx := context.Background() - - var job1Backfill sync.Once - var job2Resume sync.Once - var job3Wait sync.Once - // Closed when we enter the RunBeforeBackfill knob of job 1. - job1BackfillNotification := make(chan struct{}) - // Closed when we're ready to continue with job 1. - job1ContinueNotification := make(chan struct{}) - // Closed when job 2 starts. - job2ResumeNotification := make(chan struct{}) - // Closed when job 3 starts waiting for concurrent schema changes to finish. - job3WaitNotification := make(chan struct{}) - var job1ID jobspb.JobID - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - RunBeforeResume: func(jobID jobspb.JobID) error { - // Only block in job 2. - if job1ID == 0 || jobID == job1ID { - job1ID = jobID - return nil - } - job2Resume.Do(func() { - close(job2ResumeNotification) - }) - return nil - }, - RunBeforeBackfill: func() error { - job1Backfill.Do(func() { - close(job1BackfillNotification) - <-job1ContinueNotification - }) - return nil - }, - }, - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, idx int) error { - // Assert that when job 3 is running, there are no mutations other - // than the ones associated with this schema change. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - // There are 2 schema changes that should precede job 3. - // The declarative schema changer uses the same mutation ID for all - // its mutations. - for _, m := range table.AllMutations() { - assert.Equal(t, int(m.MutationID()), 3) - } - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(_ []string) { - job3Wait.Do(func() { - close(job3WaitNotification) - }) - }, - }, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - - g := ctxgroup.WithContext(ctx) - - // Start job 1: An index schema change, which does not use the new schema - // changer. - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.ExecContext(ctx, `SET use_declarative_schema_changer='off'`) - assert.NoError(t, err) - _, err = sqlDB.ExecContext(ctx, `CREATE INDEX idx ON db.t(a)`) - assert.NoError(t, err) - return nil - }) - - <-job1BackfillNotification - - // Start job 3: A column schema change which uses the new schema changer. - // The transaction will not actually commit until job 1 has finished. - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`) - assert.NoError(t, err) - return nil - }) - - <-job3WaitNotification - - // Start job 2: Another index schema change which does not use the new - // schema changer. - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.ExecContext(ctx, `SET use_declarative_schema_changer='off'`) - assert.NoError(t, err) - _, err = sqlDB.ExecContext(ctx, `CREATE INDEX idx2 ON db.t(a)`) - assert.NoError(t, err) - return nil - }) - - // Wait for job 2 to start. - <-job2ResumeNotification - - // Finally, let job 1 finish, which will unblock the - // others. - close(job1ContinueNotification) - require.NoError(t, g.Wait()) - - // Check that job 3 was created last. - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status, description FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeSchemaChange.String(), string(jobs.StatusSucceeded), `CREATE INDEX idx ON db.public.t (a)`}, - {jobspb.TypeSchemaChange.String(), string(jobs.StatusSucceeded), `CREATE INDEX idx2 ON db.public.t (a)`}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded), `ALTER TABLE db.public.t ADD COLUMN b INT8 DEFAULT 1`}, - }, - ) - }) - - t.Run("wait for declarative schema changes for tables", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) while job 1 is backfilling. - ctx := context.Background() - - var job1Backfill sync.Once - var job2Wait sync.Once - // Closed when we enter the RunBeforeBackfill knob of job 1. - job1BackfillNotification := make(chan struct{}) - // Closed when we're ready to continue with job 1. - job1ContinueNotification := make(chan struct{}) - // Closed when job 2 starts waiting for concurrent schema changes to finish. - job2WaitNotification := make(chan struct{}) - - stmt1 := `ALTER TABLE db.t ADD COLUMN b INT8 DEFAULT 1` - stmt2 := `ALTER TABLE db.t ADD COLUMN c INT8 DEFAULT 2` - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, idx int) error { - // Verify that we never queue mutations for job 2 before finishing job - // 1. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - mutations := table.AllMutations() - if len(mutations) == 0 { - t.Errorf("unexpected empty mutations") - return errors.Errorf("test failure") - } - var idsSeen []descpb.MutationID - for _, m := range mutations { - if len(idsSeen) == 0 || m.MutationID() > idsSeen[len(idsSeen)-1] { - idsSeen = append(idsSeen, m.MutationID()) - } - } - highestID := idsSeen[len(idsSeen)-1] - assert.Truef(t, highestID <= 1, "unexpected mutation IDs %v", idsSeen) - // Block job 1 during the backfill. - s := p.Stages[idx] - stmt := p.TargetState.Statements[0].Statement - if stmt != stmt1 || s.Type() != scop.BackfillType { - return nil - } - for _, op := range s.EdgeOps { - if backfillOp, ok := op.(*scop.BackfillIndex); ok && backfillOp.IndexID == descpb.IndexID(2) { - job1Backfill.Do(func() { - close(job1BackfillNotification) - <-job1ContinueNotification - }) - } - } - - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] != stmt2 { - return - } - job2Wait.Do(func() { - close(job2WaitNotification) - }) - }, - }, - } - - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job1BackfillNotification - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - <-job2WaitNotification - close(job1ContinueNotification) - require.NoError(t, g.Wait()) - - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - }) - - t.Run("wait for declarative schema changes for schema", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) involving dropping schemas. - // Both of these jobs will need to concurrently touch the database descriptor. - ctx := context.Background() - - var jobWaitForPostCommit sync.Once - var jobWaitBeforeWait sync.Once - // Closed when we're ready to continue with job 1. - job2StartExecution := make(chan struct{}) - job2ContinueNotification := make(chan struct{}) - completionCount := int32(0) - - stmt1 := `DROP SCHEMA db.s1` - stmt2 := `DROP SCHEMA db.s2` - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, stageIdx int) error { - if p.Params.ExecutionPhase != scop.PostCommitPhase { - return nil - } - jobWaitForPostCommit.Do(func() { - job2StartExecution <- struct{}{} - job2ContinueNotification <- struct{}{} - }) - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] == stmt2 { - atomic.AddInt32(&completionCount, 1) - jobWaitBeforeWait.Do(func() { - <-job2ContinueNotification - }) - } - }, - }, - } - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, "SET CLUSTER SETTING sql.schema.force_declarative_statements='+CREATE SCHEMA'") - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE SCHEMA db.s1`) - tdb.Exec(t, `CREATE SCHEMA db.s2`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job2StartExecution - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - require.NoError(t, g.Wait()) - - // Expect two NEW SCHEMA CHANGE jobs for stmt1 and stmt2. - // The CREATE DATABASE and two CREATE SCHEMAs don't create jobs. - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - // We should observe the schema change was tried at least twice. - require.GreaterOrEqual(t, atomic.LoadInt32(&completionCount), int32(1)) - }) - - t.Run("wait for declarative schema changes for type", func(t *testing.T) { - // This test starts a declarative schema change job (job 1), and then starts - // another declarative schema change job (job 2) involving dropping tables. - // Both of these jobs will need to concurrently touch type descriptors. - ctx := context.Background() - - var jobWaitForPostCommit sync.Once - var jobWaitBeforeWait sync.Once - // Closed when we're ready to continue with job 1. - job2StartExecution := make(chan struct{}) - job2ContinueNotification := make(chan struct{}) - completionCount := int32(0) - - stmt1 := `DROP TABLE db.t1` - stmt2 := `DROP TABLE db.t2` - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeStage: func(p scplan.Plan, stageIdx int) error { - if p.Params.ExecutionPhase != scop.PostCommitPhase { - return nil - } - jobWaitForPostCommit.Do(func() { - job2StartExecution <- struct{}{} - job2ContinueNotification <- struct{}{} - }) - return nil - }, - BeforeWaitingForConcurrentSchemaChanges: func(stmts []string) { - if stmts[0] == stmt2 { - atomic.AddInt32(&completionCount, 1) - jobWaitBeforeWait.Do(func() { - <-job2ContinueNotification - }) - } - }, - }, - } - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TYPE db.status AS ENUM ('open', 'closed', 'inactive');`) - tdb.Exec(t, `CREATE TABLE db.t1(t db.status)`) - tdb.Exec(t, `CREATE TABLE db.t2(t db.status)`) - - g := ctxgroup.WithContext(ctx) - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt1) - assert.NoError(t, err) - return nil - }) - - <-job2StartExecution - - g.GoCtx(func(ctx context.Context) error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - _, err = conn.ExecContext(ctx, stmt2) - assert.NoError(t, err) - return nil - }) - - require.NoError(t, g.Wait()) - - tdb.CheckQueryResults(t, - fmt.Sprintf(`SELECT job_type, status FROM crdb_internal.jobs WHERE job_type = '%s' OR job_type = '%s' ORDER BY created`, - jobspb.TypeSchemaChange.String(), jobspb.TypeNewSchemaChange.String(), - ), - [][]string{ - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - {jobspb.TypeNewSchemaChange.String(), string(jobs.StatusSucceeded)}, - }, - ) - // We should observe the schema change was tried at least twice. - require.GreaterOrEqual(t, atomic.LoadInt32(&completionCount), int32(1)) - }) -} - -// TestConcurrentSchemaChangesWait ensures that when a schema change -// is run concurrently with a declarative schema change, that it waits for -// the declarative schema change to complete before proceeding. -// -// Each concurrent schema change is run both as a single statement, where the -// test expects an automatic retry, and as part of an explicit transaction -// which has returned rows, in order to ensure that an error with the proper -// error code is returned. -func TestConcurrentSchemaChangesWait(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - const defaultInitialStmt = `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1` - type concurrentWaitTest struct { - // initial statement run under the declarative schema changer, paused on - // the first post commit phase. - initial string - // concurrent statement run under the legacy schema changer - concurrent string - } - ctx := context.Background() - runConcurrentSchemaChangeCase := func(t *testing.T, stmts concurrentWaitTest, implicit bool) { - defer log.Scope(t).Close(t) - var doOnce sync.Once - // Closed when we enter the BeforeStage knob with a post commit or later - // phase. - beforePostCommitNotification := make(chan struct{}) - // Closed when we're ready to continue with the schema change. - continueNotification := make(chan struct{}) - // Sent on when we're waiting for the initial schema change. - waitingForConcurrent := make(chan struct{}) - - var getTableDescriptor func() catalog.TableDescriptor - var params base.TestServerArgs - params.Knobs = base.TestingKnobs{ - SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - BeforeWaitingForConcurrentSchemaChanges: func(_ []string) { - waitingForConcurrent <- struct{}{} - }, - BeforeStage: func(p scplan.Plan, idx int) error { - // Verify that we never get a mutation ID not associated with the schema - // change that is running. - if p.Params.ExecutionPhase < scop.PostCommitPhase { - return nil - } - table := getTableDescriptor() - for _, m := range table.AllMutations() { - assert.LessOrEqual(t, int(m.MutationID()), 2) - } - s := p.Stages[idx] - if s.Phase < scop.PostCommitPhase { - return nil - } - doOnce.Do(func() { - close(beforePostCommitNotification) - <-continueNotification - }) - return nil - }, - }, - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - } - - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - getTableDescriptor = func() catalog.TableDescriptor { - return desctestutils.TestingGetPublicTableDescriptor(kvDB, s.ApplicationLayer().Codec(), "db", "t") - } - - initialSchemaChange := func() error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) - assert.NoError(t, err) - for _, s := range strings.Split(stmts.initial, ";") { - _, err = conn.ExecContext(ctx, s) - assert.NoError(t, err) - } - return nil - } - concurrentSchemaChangeImplicit := func() error { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - for _, s := range append([]string{ - `SET use_declarative_schema_changer = 'off'`, - }, strings.Split(stmts.concurrent, ";")...) { - if _, err = conn.ExecContext(ctx, s); err != nil { - return err - } - } - return nil - } - concurrentSchemaChangeExplicit := func() error { - var sawRestart bool - defer func() { assert.True(t, sawRestart) }() - return crdb.Execute(func() (err error) { - conn, err := sqlDB.Conn(ctx) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer func() { - if err != nil { - var pqErr *pq.Error - sawRestart = sawRestart || - errors.As(err, &pqErr) && - string(pqErr.Code) == pgcode.SerializationFailure.String() - _ = tx.Rollback() - } - }() - // Execute something first to ensure that a restart is sent. - if _, err := tx.Exec("SELECT * FROM db.other_t"); err != nil { - return err - } - for _, s := range strings.Split(stmts.concurrent, ";") { - if _, err := tx.ExecContext(ctx, s); err != nil { - return err - } - } - return tx.Commit() - }) - } - - tdb := sqlutils.MakeSQLRunner(sqlDB) - tdb.Exec(t, `CREATE DATABASE db`) - tdb.Exec(t, `CREATE TABLE db.other_t (a INT PRIMARY KEY)`) - tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) - tdb.Exec(t, `CREATE USER testuser`) - tdb.Exec(t, `CREATE SCHEMA db.sc`) - tdb.Exec(t, `ALTER SCHEMA db.sc OWNER to testuser`) - tdb.Exec(t, `CREATE TABLE db.sc.t (a INT PRIMARY KEY)`) - tdb.Exec(t, `ALTER TABLE db.sc.t OWNER to testuser`) - var initialSchemaChangeGroup errgroup.Group - var concurrentSchemaChangeGroup errgroup.Group - initialSchemaChangeGroup.Go(initialSchemaChange) - <-beforePostCommitNotification - if implicit { - concurrentSchemaChangeGroup.Go(concurrentSchemaChangeImplicit) - } else { - concurrentSchemaChangeGroup.Go(concurrentSchemaChangeExplicit) - } - <-waitingForConcurrent - close(continueNotification) - require.NoError(t, initialSchemaChangeGroup.Wait()) - require.NoError(t, concurrentSchemaChangeGroup.Wait()) - } - - stmts := []concurrentWaitTest{ - {defaultInitialStmt, `ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`}, - {defaultInitialStmt, `CREATE INDEX ON db.t(a)`}, - {defaultInitialStmt, `ALTER TABLE db.t RENAME COLUMN a TO c`}, - {defaultInitialStmt, `CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`}, - {defaultInitialStmt, `CREATE VIEW db.v AS SELECT a FROM db.t`}, - {defaultInitialStmt, `ALTER TABLE db.t RENAME TO db.new`}, - {defaultInitialStmt, `TRUNCATE TABLE db.t`}, - {defaultInitialStmt, `DROP TABLE db.t`}, - {"USE db; DROP OWNED BY testuser;", `DROP DATABASE db`}, - } - for i := range stmts { - stmt := stmts[i] // copy for closure - t.Run(stmt.concurrent, func(t *testing.T) { - testutils.RunTrueAndFalse(t, "implicit", func(t *testing.T, implicit bool) { - runConcurrentSchemaChangeCase(t, stmt, implicit) - }) - }) - } -} - func TestSchemaChangerJobRunningStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1166,3 +402,118 @@ WHERE }) } } + +// TestSchemaChangeWaitsForConcurrentSchemaChanges tests that if a schema +// change on a table is issued when there is already an ongoing schema change +// on that table, it will wait until that ongoing schema change finishes before +// proceeding. +func TestSchemaChangeWaitsForConcurrentSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tf := func(t *testing.T, modeFor1stStmt, modeFor2ndStmt sessiondatapb.NewSchemaChangerMode) { + ctx, cancel := context.WithCancel(context.Background()) + createIndexChan := make(chan struct{}) + addColChan := make(chan struct{}) + var closeMainChanOnce, closeAlterPKChanOnce sync.Once + + var params base.TestServerArgs + params.Knobs = base.TestingKnobs{ + SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ + // If the blocked schema changer is from legacy schema changer, we let + // it hijack this knob (which is originally design for declarative + // schema changer) if `stmt` is nil. + WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { + if (len(stmts) == 1 && strings.Contains(stmts[0], "ADD COLUMN")) || + stmts == nil { + closeAlterPKChanOnce.Do(func() { + close(addColChan) + }) + } + }, + }, + DistSQL: &execinfra.TestingKnobs{ + RunBeforeBackfillChunk: func(_ roachpb.Span) error { + closeMainChanOnce.Do(func() { + close(createIndexChan) + }) + <-addColChan // wait for AddCol to unblock me + return nil + }, + }, + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + // Prevent the GC job from running so we ensure that all the keys which + // were written remain. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(jobID jobspb.JobID) error { + <-ctx.Done() + return ctx.Err() + }}, + } + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + defer cancel() + tdb := sqlutils.MakeSQLRunner(sqlDB) + + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL);") + tdb.Exec(t, "INSERT INTO t SELECT k, k+1 FROM generate_series(1,1000) AS tmp(k);") + + // Execute 1st DDL asynchronously and block until it's executing. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor1stStmt.String()) + go func() { + tdb.Exec(t, `CREATE INDEX idx ON t (j);`) + }() + <-createIndexChan + + // Execute 2st DDL synchronously. During waiting, it will unblock 1st DDL so + // it will eventually be able to proceed after waiting for a while. + tdb.Exec(t, `SET use_declarative_schema_changer = $1`, modeFor2ndStmt.String()) + tdb.Exec(t, `ALTER TABLE t ADD COLUMN k INT DEFAULT 30;`) + + // There should be 2 k/v pairs per row: + // 1. the old primary index (i : j) + // 2. the new secondary index keyed on j with key suffix on i (j; i : ), from CREATE INDEX + // Additionally, if ADD COLUMN uses declarative schema changer, there will + // one 1 more k/v pair for each row: + // 3. the new primary index (i : j, k), from ADD COLUMN + expectedKeyCount := 2000 + if modeFor2ndStmt == sessiondatapb.UseNewSchemaChangerUnsafeAlways { + expectedKeyCount = 3000 + } + requireTableKeyCount(ctx, t, s.ApplicationLayer().Codec(), kvDB, + "defaultdb", "t", expectedKeyCount) + } + + t.Run("declarative-then-declarative", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerUnsafeAlways) + }) + + t.Run("declarative-then-legacy", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerUnsafeAlways, sessiondatapb.UseNewSchemaChangerOff) + }) + + t.Run("legacy-then-declarative", func(t *testing.T) { + tf(t, sessiondatapb.UseNewSchemaChangerOff, sessiondatapb.UseNewSchemaChangerUnsafeAlways) + }) + + // legacy + legacy case is tested in TestLegacySchemaChangerWaitsForOtherSchemaChanges + // because the waiting occurred under a different code path. +} + +// requireTableKeyCount ensures that `db`.`tbl` has `keyCount` kv-pairs in it. +func requireTableKeyCount( + ctx context.Context, + t *testing.T, + codec keys.SQLCodec, + kvDB *kv.DB, + db string, + tbl string, + keyCount int, +) { + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, db, tbl) + tablePrefix := codec.TablePrefix(uint32(tableDesc.GetID())) + tableEnd := tablePrefix.PrefixEnd() + kvs, err := kvDB.Scan(ctx, tablePrefix, tableEnd, 0) + require.NoError(t, err) + require.Equal(t, keyCount, len(kvs)) +} From c02d1f38289546f078b43cd322797fe543f39cdf Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Wed, 9 Aug 2023 11:44:55 -0400 Subject: [PATCH 2/2] schemachanger: Add an intergration-style test for concurrent schema changer behavior This commit adds an integration style test for concurrent schema changer behaviors where we run multiple DDLs for an extended period of time on a few descriptors and assert that they all eventually finish and the descriptors end up in the expected state. Release note: None --- pkg/sql/schemachanger/BUILD.bazel | 3 + pkg/sql/schemachanger/schemachanger_test.go | 271 ++++++++++++++++++++ 2 files changed, 274 insertions(+) diff --git a/pkg/sql/schemachanger/BUILD.bazel b/pkg/sql/schemachanger/BUILD.bazel index 6ca5cc59dc94..51fee6bc6acb 100644 --- a/pkg/sql/schemachanger/BUILD.bazel +++ b/pkg/sql/schemachanger/BUILD.bazel @@ -49,6 +49,7 @@ go_test( "//pkg/sql/catalog", "//pkg/sql/catalog/desctestutils", "//pkg/sql/execinfra", + "//pkg/sql/pgwire/pgcode", "//pkg/sql/rowenc", "//pkg/sql/schemachanger/scexec", "//pkg/sql/schemachanger/scop", @@ -57,6 +58,7 @@ go_test( "//pkg/sql/sessiondatapb", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/util/ctxgroup", "//pkg/util/leaktest", @@ -65,6 +67,7 @@ go_test( "//pkg/util/randutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//errorspb", + "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 5c4f73ef5003..1076357befc5 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -14,11 +14,13 @@ import ( "context" "encoding/hex" "fmt" + "math/rand" "regexp" "strings" "sync" "sync/atomic" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -30,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop" @@ -37,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -44,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/errorspb" + "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -517,3 +522,269 @@ func requireTableKeyCount( require.NoError(t, err) require.Equal(t, keyCount, len(kvs)) } + +// TestConcurrentSchemaChanges is an integration style tests where we issue many +// schema changes concurrently (renames, add/drop columns, and create/drop +// indexes) for a period of time and assert that they all finish eventually and +// we end up with expected names, columns, and indexes. +func TestConcurrentSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderShort(t, "this test is long running (>3 mins).") + skip.UnderStress(t, "test is already integration style and long running") + skip.UnderStressRace(t, "test is already integration style and long running") + skip.UnderRace(t, "the test knowingly has data race and has logic to account for that") + + const testDuration = 3 * time.Minute + const renameDBInterval = 5 * time.Second + const renameSCInterval = 4 * time.Second + const renameTblInterval = 3 * time.Second + const addColInterval = 1 * time.Second + const dropColInterval = 1 * time.Second + const createIdxInterval = 1 * time.Second + const dropIdxInterval = 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + var params base.TestServerArgs + params.Knobs = base.TestingKnobs{ + // Decrease the adopt loop interval so that retries happen quickly. + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + } + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + dbName, scName, tblName := "testdb", "testsc", "t" + allColToIndexes := make(map[string]map[string]struct{}) // colName -> indexes that uses that column + allColToIndexes["col"] = map[string]struct{}{"t_pkey": {}} + allNonPublicIdxToKeyCols := make(map[string]map[string]struct{}) // indexName -> its key column(s) + tdb.Exec(t, fmt.Sprintf("CREATE DATABASE %v;", dbName)) + tdb.Exec(t, fmt.Sprintf("CREATE SCHEMA %v.%v;", dbName, scName)) + tdb.Exec(t, fmt.Sprintf("CREATE TABLE %v.%v.%v (col INT PRIMARY KEY);", dbName, scName, tblName)) + tdb.Exec(t, fmt.Sprintf("INSERT INTO %v.%v.%v SELECT generate_series(1,100);", dbName, scName, tblName)) + + // repeatFnWithInterval repeats `fn` indefinitely every `interval` until + // `ctx` is cancelled. + workerErrChan := make(chan error) + var wg sync.WaitGroup + repeatWorkWithInterval := func(workerName string, workInterval time.Duration, work func() error) { + wg.Add(1) + defer wg.Done() + for { + jitteredInterval := workInterval * time.Duration(0.8+0.4*rand.Float32()) + select { + case <-ctx.Done(): + t.Logf("%v is signaled to finish work", workerName) + return + case <-time.After(jitteredInterval): + if err := work(); err != nil { + t.Logf("%v encounters error %v; signal to main routine and finish working", workerName, err.Error()) + workerErrChan <- err + return + } + } + } + } + + // validate performs a few quick validations after all schema changes are finished: + // 1. Database, schema, and table indeed end up with the tracked name. + // 2. Table indeed has the tracked columns. + // 3. Table indeed has the tracked indexes. + codec := s.ApplicationLayer().Codec() + validate := func() { + dbDesc := desctestutils.TestingGetDatabaseDescriptor(kvDB, codec, dbName) + desctestutils.TestingGetSchemaDescriptor(kvDB, codec, dbDesc.GetID(), scName) + tblDesc := desctestutils.TestingGetTableDescriptor(kvDB, codec, dbName, scName, tblName) + require.Equal(t, len(allColToIndexes), len(tblDesc.PublicColumns())) // allColToIndexes does not include `col` + for _, col := range tblDesc.PublicColumns() { + _, ok := allColToIndexes[col.GetName()] + require.True(t, ok, "column %v does not exist in allColToIndexes=%v", col.GetName(), allColToIndexes) + } + require.Equal(t, len(allNonPublicIdxToKeyCols), len(tblDesc.PublicNonPrimaryIndexes())) + for _, idx := range tblDesc.PublicNonPrimaryIndexes() { + _, ok := allNonPublicIdxToKeyCols[idx.GetName()] + require.True(t, ok, "index %v does not exist in allNonPublicIdxToKeyCols=%v", idx.GetName(), allNonPublicIdxToKeyCols) + } + } + + // A goroutine that repeatedly renames database `testdb` randomly. + go repeatWorkWithInterval("rename-db-worker", renameDBInterval, func() error { + newDBName := fmt.Sprintf("testdb_%v", rand.Intn(1000)) + if newDBName == dbName { + return nil + } + if _, err := sqlDB.Exec(fmt.Sprintf("ALTER DATABASE %v RENAME TO %v", dbName, newDBName)); err != nil { + return err + } + dbName = newDBName + t.Logf("RENAME DATABASE TO %v", newDBName) + return nil + }) + + // A goroutine that renames schema `testdb.testsc` randomly. + go repeatWorkWithInterval("rename-schema-worker", renameSCInterval, func() error { + newSCName := fmt.Sprintf("testsc_%v", rand.Intn(1000)) + if scName == newSCName { + return nil + } + _, err := sqlDB.Exec(fmt.Sprintf("ALTER SCHEMA %v.%v RENAME TO %v", dbName, scName, newSCName)) + if err == nil { + scName = newSCName + t.Logf("RENAME SCHEMA TO %v", newSCName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase) { + err = nil // mute those errors as they're expected + t.Logf("Parent database is renamed; skipping this schema renaming.") + } + return err + }) + + // A goroutine that renames table `testdb.testsc.t` randomly. + go repeatWorkWithInterval("rename-tbl-worker", renameTblInterval, func() error { + newTblName := fmt.Sprintf("t_%v", rand.Intn(1000)) + _, err := sqlDB.Exec(fmt.Sprintf(`ALTER TABLE %v.%v.%v RENAME TO %v`, dbName, scName, tblName, newTblName)) + if err == nil { + tblName = newTblName + t.Logf("RENAME TABLE TO %v", newTblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName) { + err = nil + t.Logf("Parent database or schema is renamed; skipping this table renaming.") + } + return err + }) + + // A goroutine that adds columns to `testdb.testsc.t` randomly. + go repeatWorkWithInterval("add-column-worker", addColInterval, func() error { + newColName := fmt.Sprintf("col_%v", rand.Intn(1000)) + if _, ok := allColToIndexes[newColName]; ok { + return nil + } + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v ADD COLUMN %v INT DEFAULT %v", dbName, scName, tblName, newColName, rand.Intn(100))) + if err == nil { + allColToIndexes[newColName] = make(map[string]struct{}) + t.Logf("ADD COLUMN %v TO TABLE %v", newColName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this column addition.") + } + return err + }) + + // A goroutine that drops columns from `testdb.testsc.t` randomly. + go repeatWorkWithInterval("drop-column-worker", dropColInterval, func() error { + // Randomly pick a non-PK column to drop. + if len(allColToIndexes) == 1 { + return nil + } + var colName string + for col := range allColToIndexes { + if col != "col" { + colName = col + break + } + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("ALTER TABLE %v.%v.%v DROP COLUMN %v;", dbName, scName, tblName, colName)) + if err == nil { + for indexName := range allColToIndexes[colName] { + delete(allNonPublicIdxToKeyCols, indexName) + } + delete(allColToIndexes, colName) + t.Logf("DROP COLUMN %v FROM TABLE %v", colName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this column removal.") + } + return err + }) + + // A goroutine that creates secondary index on a randomly selected column. + go repeatWorkWithInterval("create-index-worker", createIdxInterval, func() error { + newIndexName := fmt.Sprintf("idx_%v", rand.Intn(1000)) + if _, ok := allNonPublicIdxToKeyCols[newIndexName]; ok { + return nil + } + + // Randomly pick a non-PK column to create an index on. + if len(allColToIndexes) == 1 { + return nil + } + var colName string + for col := range allColToIndexes { + if col != "col" { + colName = col + break + } + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("CREATE INDEX %v ON %v.%v.%v (%v);", newIndexName, dbName, scName, tblName, colName)) + if err == nil { + allNonPublicIdxToKeyCols[newIndexName] = map[string]struct{}{colName: {}} + allColToIndexes[colName][newIndexName] = struct{}{} + t.Logf("CREATE INDEX %v ON TABLE %v(%v)", newIndexName, tblName, colName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedColumn) { + err = nil + t.Logf("Parent database or schema or table is renamed or column is dropped; skipping this index creation.") + } + return err + }) + + // A goroutine that drops a secondary index randomly. + go repeatWorkWithInterval("drop-index-worker", dropIdxInterval, func() error { + // Randomly pick a non-public index to drop. + if len(allNonPublicIdxToKeyCols) == 0 { + return nil + } + var indexName string + var indexKeyCols map[string]struct{} + for idx, idxCols := range allNonPublicIdxToKeyCols { + indexName = idx + indexKeyCols = idxCols + break + } + + tblName := tblName + _, err := sqlDB.Exec(fmt.Sprintf("DROP INDEX %v.%v.%v@%v;", dbName, scName, tblName, indexName)) + if err == nil { + for indexKeyCol := range indexKeyCols { + delete(allColToIndexes[indexKeyCol], indexName) + } + delete(allNonPublicIdxToKeyCols, indexName) + t.Logf("DROP INDEX %v FROM TABLE %v", indexName, tblName) + } else if isPQErrWithCode(err, pgcode.UndefinedDatabase, pgcode.UndefinedSchema, pgcode.InvalidSchemaName, pgcode.UndefinedTable, pgcode.UndefinedObject) { + err = nil + t.Logf("Parent database or schema or table is renamed; skipping this index removal.") + } + return err + }) + + select { + case workerErr := <-workerErrChan: + t.Logf("main: a worker error %q is signaled; Inform all workers to stop.", workerErr.Error()) + cancel() + wg.Wait() + t.Logf("main: all workers have stopped their work; Test Failure!") + t.Fatalf(workerErr.Error()) + case <-time.After(testDuration): + t.Logf("main: time's up! Inform all workers to stop.") + cancel() + wg.Wait() + t.Logf("main: all workers have stopped. Validating descriptors states...") + validate() + t.Logf("main: validation succeeded! Test success!") + } +} + +func isPQErrWithCode(err error, codes ...pgcode.Code) bool { + if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { + for _, code := range codes { + if pgcode.MakeCode(string(pqErr.Code)) == code { + return true + } + } + } + return false +}