diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index c71285fa82c0..d402a77ca565 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -254,10 +254,6 @@ const ( TODODelete_V22_2SetRoleOptionsUserIDColumnNotNull // TODODelete_V22_2RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node. TODODelete_V22_2RangefeedUseOneStreamPerNode - // TODODelete_V22_2NoNonMVCCAddSSTable adds a migration which waits for all - // schema changes to complete. After this point, no non-MVCC - // AddSSTable calls will be used outside of tenant streaming. - TODODelete_V22_2NoNonMVCCAddSSTable // TODODelete_V22_2TTLDistSQL uses DistSQL to distribute TTL SELECT/DELETE statements to // leaseholder nodes. TODODelete_V22_2TTLDistSQL @@ -678,10 +674,6 @@ var rawVersionsSingleton = keyedVersions{ Key: TODODelete_V22_2RangefeedUseOneStreamPerNode, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 60}, }, - { - Key: TODODelete_V22_2NoNonMVCCAddSSTable, - Version: roachpb.Version{Major: 22, Minor: 1, Internal: 62}, - }, { Key: TODODelete_V22_2TTLDistSQL, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 68}, diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 61d215119020..cc8ea8a2f0d6 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -36,7 +36,6 @@ go_library( "tenant_table_migration.go", "upgrades.go", "wait_for_del_range_in_gc_job.go", - "wait_for_schema_changes.go", "web_sessions_table_user_id_migration.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades", @@ -123,7 +122,6 @@ go_test( "system_statistics_activity_test.go", "tenant_table_migration_test.go", "wait_for_del_range_in_gc_job_test.go", - "wait_for_schema_changes_test.go", "web_sessions_table_user_id_migration_test.go", ], args = ["-test.timeout=895s"], @@ -185,7 +183,6 @@ go_test( "//pkg/util/intsets", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/protoutil", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/upgrade/upgrades/schemachanger_elements_test.go b/pkg/upgrade/upgrades/schemachanger_elements_test.go index a4ef9502a815..641ce483213f 100644 --- a/pkg/upgrade/upgrades/schemachanger_elements_test.go +++ b/pkg/upgrade/upgrades/schemachanger_elements_test.go @@ -15,6 +15,7 @@ import ( gosql "database/sql" "sync/atomic" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -203,3 +205,11 @@ func TestUpgradeSchemaChangerElements(t *testing.T) { }) } } + +func shortInterval() *time.Duration { + shortInterval := 10 * time.Millisecond + if util.RaceEnabled { + shortInterval *= 5 + } + return &shortInterval +} diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 8f8c0d1250a9..88e6794998b7 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -117,12 +117,6 @@ var upgrades = []upgradebase.Upgrade{ upgrade.NoPrecondition, ensureSQLSchemaTelemetrySchedule, ), - upgrade.NewTenantUpgrade( - "wait for all in-flight schema changes", - toCV(clusterversion.TODODelete_V22_2NoNonMVCCAddSSTable), - upgrade.NoPrecondition, - waitForAllSchemaChanges, - ), upgrade.NewTenantUpgrade("fix corrupt user-file related table descriptors", toCV(clusterversion.TODODelete_V22_2FixUserfileRelatedDescriptorCorruption), upgrade.NoPrecondition, diff --git a/pkg/upgrade/upgrades/wait_for_schema_changes.go b/pkg/upgrade/upgrades/wait_for_schema_changes.go deleted file mode 100644 index 04611c67142f..000000000000 --- a/pkg/upgrade/upgrades/wait_for_schema_changes.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package upgrades - -import ( - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/upgrade" - "github.com/cockroachdb/errors" -) - -// waitForAllSchemaChanges waits for all schema changes to enter a -// terminal or paused state. -// -// Because this is intended for the mvcc-bulk-ops transition, it does -// not care about schema changes created while this migration is -// running because any such schema changes must already be using the -// new mvcc bulk operations -// -// Note that we do not use SHOW JOBS WHEN COMPLETE here to avoid -// blocking forever on PAUSED jobs. Jobs using old index backfills -// will fail on Resume. -func waitForAllSchemaChanges( - ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps, -) error { - - initialJobListQuery := fmt.Sprintf(` - -SELECT - job_id -FROM - [SHOW JOBS] -WHERE - job_type = 'SCHEMA CHANGE' - AND status NOT IN ('%s', '%s', '%s', '%s', '%s') -`, - string(jobs.StatusSucceeded), - string(jobs.StatusFailed), - string(jobs.StatusCanceled), - string(jobs.StatusRevertFailed), - string(jobs.StatusPaused)) - rows, err := d.InternalExecutor.QueryBufferedEx(ctx, - "query-non-terminal-schema-changers", - nil, /* txn */ - sessiondata.NodeUserSessionDataOverride, - initialJobListQuery) - if err != nil { - return err - } - - jobList := make([]jobspb.JobID, len(rows)) - for i, datums := range rows { - if len(datums) != 1 { - return errors.AssertionFailedf("unexpected number of columns: %d (expected 1)", len(datums)) - } - d := datums[0] - id, ok := d.(*tree.DInt) - if !ok { - return errors.AssertionFailedf("unexpected type for id column: %T (expected DInt)", d) - } - jobList[i] = jobspb.JobID(*id) - } - return d.JobRegistry.WaitForJobsIgnoringJobErrors(ctx, jobList) -} diff --git a/pkg/upgrade/upgrades/wait_for_schema_changes_test.go b/pkg/upgrade/upgrades/wait_for_schema_changes_test.go deleted file mode 100644 index ec784a33b3f7..000000000000 --- a/pkg/upgrade/upgrades/wait_for_schema_changes_test.go +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package upgrades_test - -import ( - "context" - gosql "database/sql" - "fmt" - "sync/atomic" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/tests" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" - "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/require" -) - -func TestWaitForSchemaChangeMigration(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup") - - ctx := context.Background() - testCases := []struct { - name string - setup func(t *testing.T, sqlDB *gosql.DB) error - run func(t *testing.T, sqlDB *gosql.DB) error - }{ - { - "running schema change that succeeds waits for success", - nil, - func(t *testing.T, sqlDB *gosql.DB) error { - _, err := sqlDB.Exec("CREATE INDEX ON t (b)") - return err - }, - }, - { - "running schema change that fails waits for failure", - nil, - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("INSERT INTO t VALUES (1, 1), (2, 1)"); err != nil { - return err - } - - if _, err := sqlDB.Exec("CREATE UNIQUE INDEX ON t (b)"); err == nil { - return errors.New("expected failure to create unique index but error was nil") - } - return nil - }, - }, - { - "running schema change that pauses should not block upgrade", - nil, - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'indexbackfill.before_flow'"); err != nil { - return err - } - if _, err := sqlDB.Exec("CREATE INDEX ON t (b)"); err == nil { - return errors.New("expected failure because of pausepoint but error was nil") - } - return nil - }, - }, - { - "previously successful schema change does block migration", - func(t *testing.T, sqlDB *gosql.DB) error { - _, err := sqlDB.Exec("CREATE UNIQUE INDEX ON t (b)") - return err - }, - nil, - }, - { - "previously failed schema change does block migration", - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("INSERT INTO t VALUES (1, 1), (2, 1)"); err != nil { - return err - } - if _, err := sqlDB.Exec("CREATE UNIQUE INDEX ON t (b)"); err == nil { - return errors.New("expected failure to create unique index but error was nil") - } - return nil - }, - nil, - }, - { - "previously paused schema should not block upgrade", - func(t *testing.T, sqlDB *gosql.DB) error { - if _, err := sqlDB.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'indexbackfill.before_flow'"); err != nil { - return err - } - if _, err := sqlDB.Exec("CREATE INDEX ON t (b)"); err == nil { - return errors.New("expected failure because of pausepoint but error was nil") - } - return nil - }, - nil, - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - params, _ := tests.CreateTestServerParams() - params.Knobs.Server = &server.TestingKnobs{ - DisableAutomaticVersionUpgrade: make(chan struct{}), - BinaryVersionOverride: clusterversion.ByKey(clusterversion.TODODelete_V22_2NoNonMVCCAddSSTable - 1), - } - - var ( - scStartedChan chan struct{} - scAllowResumeChan chan struct{} - secondWaitChan chan struct{} - ) - - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - RunBeforeResume: func(_ jobspb.JobID) error { - if scStartedChan != nil { - close(scStartedChan) - } - if scAllowResumeChan != nil { - <-scAllowResumeChan - } - return nil - }, - } - jobKnobs := jobs.NewTestingKnobsWithShortIntervals() - jobKnobs.IntervalOverrides.WaitForJobsInitialDelay = shortInterval() - jobKnobs.IntervalOverrides.WaitForJobsMaxDelay = shortInterval() - - var waitCount int32 - jobKnobs.BeforeWaitForJobsQuery = func(_ []jobspb.JobID) { - if secondWaitChan != nil { - if atomic.AddInt32(&waitCount, 1) == 2 { - close(secondWaitChan) - } - } - } - params.Knobs.JobsTestingKnobs = jobKnobs - - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - _, err := sqlDB.Exec("CREATE TABLE t (pk INT PRIMARY KEY, b INT)") - require.NoError(t, err) - if tc.setup != nil { - require.NoError(t, tc.setup(t, sqlDB)) - } - - ctx := context.Background() - g := ctxgroup.WithContext(ctx) - - if tc.run != nil { - scStartedChan = make(chan struct{}) - scAllowResumeChan = make(chan struct{}) - secondWaitChan = make(chan struct{}) - g.GoCtx(func(ctx context.Context) error { - return tc.run(t, sqlDB) - }) - } - g.GoCtx(func(ctx context.Context) error { - if scStartedChan != nil { - <-scStartedChan - } - _, err = sqlDB.Exec("SET CLUSTER SETTING version = crdb_internal.node_executable_version()") - return err - }) - if tc.run != nil { - <-scStartedChan - <-secondWaitChan - close(scAllowResumeChan) - } - require.NoError(t, g.Wait()) - }) - } -} - -func TestWaitForSchemaChangeMigrationSynthetic(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup") - - ctx := context.Background() - - upsertJob := func(sqlDB *gosql.DB, typ string, status string) error { - var details jobspb.Details - switch typ { - case "SCHEMA CHANGE": - details = jobspb.SchemaChangeDetails{} - case "AUTO CREATE STATS": - details = jobspb.CreateStatsDetails{Name: "__auto__"} - default: - return errors.Newf("job type not support in this test: %s", typ) - } - - payload, err := protoutil.Marshal(&jobspb.Payload{ - UsernameProto: username.RootUserName().EncodeProto(), - Details: jobspb.WrapPayloadDetails(details), - }) - if err != nil { - return err - } - - _, err = sqlDB.Exec(`UPSERT INTO system.jobs (id, status, payload) VALUES ($1, $2, $3)`, - 1, status, payload, - ) - return err - } - - terminalStates := []jobs.Status{ - jobs.StatusSucceeded, - jobs.StatusFailed, - jobs.StatusCanceled, - jobs.StatusRevertFailed, - jobs.StatusPaused, - } - nonTerminalStates := []jobs.Status{ - jobs.StatusPending, - jobs.StatusRunning, - jobs.StatusReverting, - jobs.StatusCancelRequested, - jobs.StatusPauseRequested, - } - - testMigrate := func(jobType string, startingState string, nextState string) { - name := fmt.Sprintf("%s_%s", jobType, startingState) - if nextState != "" { - name = fmt.Sprintf("%s_%s", name, nextState) - } - - t.Run(name, func(t *testing.T) { - params, _ := tests.CreateTestServerParams() - params.Knobs.Server = &server.TestingKnobs{ - DisableAutomaticVersionUpgrade: make(chan struct{}), - BinaryVersionOverride: clusterversion.ByKey(clusterversion.TODODelete_V22_2NoNonMVCCAddSSTable - 1), - } - - var waitCount int32 - var secondWaitChan chan struct{} - params.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{ - BeforeWaitForJobsQuery: func(_ []jobspb.JobID) { - if secondWaitChan != nil { - if atomic.AddInt32(&waitCount, 1) == 2 { - close(secondWaitChan) - } - } - }, - IntervalOverrides: jobs.TestingIntervalOverrides{ - WaitForJobsInitialDelay: shortInterval(), - WaitForJobsMaxDelay: shortInterval(), - }, - } - s, sqlDB, _ := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) - - require.NoError(t, upsertJob(sqlDB, jobType, startingState)) - - // This test expects all of the cases will eventually - // pass the migration. If not, we timeout. - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - g := ctxgroup.WithContext(ctx) - if nextState != "" { - secondWaitChan = make(chan struct{}) - } - - g.GoCtx(func(ctx context.Context) error { - _, err := sqlDB.Exec("SET CLUSTER SETTING version = crdb_internal.node_executable_version()") - return err - }) - - if nextState != "" { - <-secondWaitChan - require.NoError(t, upsertJob(sqlDB, jobType, nextState)) - } - require.NoError(t, g.Wait()) - }) - } - - for _, state := range nonTerminalStates { - testMigrate("AUTO CREATE STATS", string(state), "") - - } - for _, state := range terminalStates { - testMigrate("AUTO CREATE STATS", string(state), "") - testMigrate("SCHEMA CHANGE", string(state), "") - } - for _, startingState := range nonTerminalStates { - for _, endingState := range terminalStates { - testMigrate("SCHEMA CHANGE", string(startingState), string(endingState)) - } - } -} - -func shortInterval() *time.Duration { - shortInterval := 10 * time.Millisecond - if util.RaceEnabled { - shortInterval *= 5 - } - return &shortInterval -}