From bede330a0d5fa7743b109ecb5d6bf3a59381f620 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Thu, 6 Apr 2023 16:32:47 +0000 Subject: [PATCH] upgrademanager: fix upgrade manager tests that relied on wrong invariants The two tests in question were low level tests that upgrade clusters from mock version 41 to 42 and override the upgrade flow to test the internals of the upgrade manager. These mock cluster versions were not tied to our real cluster versions and so were not offset with the `DevOffset` when a branch is marked as a development branch. For this reason when a branch was a developmentBranch they would sort under all the "real" cluster versions while when a branch was marked as a release branch they would sort above all the "real" cluster versions. This undeterministic behaviour did not play well with certain job migrations that were added this release. This change rewrites the test to use real cluster versions so that their values are in sync with whether the branch is a developmentBranch or not. This change also removes some overrides to make the test more intuitive. Cocnretely, the test servers now bootstrap at the minimum supported binary version and run all the upgrades until the `startCV` before executing the body of the test. Fixes: #100685 Informs: #100552 Release note: None --- .../upgrademanager/manager_external_test.go | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/pkg/upgrade/upgrademanager/manager_external_test.go b/pkg/upgrade/upgrademanager/manager_external_test.go index f268b226cc16..4b45276ad108 100644 --- a/pkg/upgrade/upgrademanager/manager_external_test.go +++ b/pkg/upgrade/upgrademanager/manager_external_test.go @@ -56,9 +56,13 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // We're going to be migrating from startCV to endCV. - startCV := roachpb.Version{Major: 41} - endCV := roachpb.Version{Major: 42} + // clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen + // specifically so that all the migrations that introduce and backfill the new + // `system.job_info` have run by this point. In the future this startCV should + // be changed to V23_2Start and updated to the next Start key everytime the + // compatability window moves forward. + startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs + endCV := startCV + 1 ch := make(chan chan error) @@ -66,10 +70,11 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false), Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), Server: &server.TestingKnobs{ - BinaryVersionOverride: startCV, + BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey, + BinaryVersionOverride: clusterversion.ByKey(startCV), DisableAutomaticVersionUpgrade: make(chan struct{}), }, DistSQL: &execinfra.TestingKnobs{ @@ -77,11 +82,8 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { ProcessorNoTracingSpan: true, }, UpgradeManager: &upgradebase.TestingKnobs{ - ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version { - return []roachpb.Version{to} - }, RegistryOverride: func(v roachpb.Version) (upgradebase.Upgrade, bool) { - if v != endCV { + if v != clusterversion.ByKey(endCV) { return nil, false } return upgrade.NewTenantUpgrade("test", v, upgrade.NoPrecondition, func( @@ -107,6 +109,8 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { }) defer tc.Stopper().Stop(ctx) + // At this point the test cluster has run all the migrations until startCV. + upgrade1Err := make(chan error, 1) go func() { _, err := tc.ServerConn(0).ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String()) @@ -114,6 +118,17 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { }() unblock := <-ch + var firstID jobspb.JobID + var firstPayload, firstProgress []byte + require.NoError(t, tc.ServerConn(0).QueryRow(` + SELECT id, payload, progress FROM crdb_internal.system_jobs WHERE ( + crdb_internal.pb_to_json( + 'cockroach.sql.jobs.jobspb.Payload', + payload + )->'migration' + ) IS NOT NULL AND status = 'running' +`).Scan(&firstID, &firstPayload, &firstProgress)) + // Inject a second job for the same upgrade and ensure that that causes // an error. This is pretty gnarly. var secondID jobspb.JobID @@ -125,21 +140,25 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { unique_rowid(), status, created, - payload, - progress, + NULL, + NULL, created_by_type, created_by_id, claim_session_id, claim_instance_id - FROM system.jobs - WHERE ( - crdb_internal.pb_to_json( - 'cockroach.sql.jobs.jobspb.Payload', - payload - )->'migration' - ) IS NOT NULL + FROM crdb_internal.system_jobs + WHERE id = $1 ) -RETURNING id;`).Scan(&secondID)) +RETURNING id;`, firstID).Scan(&secondID)) + // Insert the job payload and progress into the `system.job_info` table. + err := tc.Server(0).InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := jobs.InfoStorageForJob(txn, secondID) + if err := infoStorage.WriteLegacyPayload(ctx, firstPayload); err != nil { + return err + } + return infoStorage.WriteLegacyProgress(ctx, firstProgress) + }) + require.NoError(t, err) // Make sure that the second job gets run in a timely manner. runErr := make(chan error) @@ -150,7 +169,7 @@ RETURNING id;`).Scan(&secondID)) fakeJobBlockChan := <-ch // Ensure that we see the assertion error. - _, err := tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String()) + _, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String()) require.Regexp(t, "found multiple non-terminal jobs for version", err) // Let the fake, erroneous job finish with an error. @@ -392,9 +411,13 @@ func TestPauseMigration(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // We're going to be migrating from startCV to endCV. - startCV := roachpb.Version{Major: 41} - endCV := roachpb.Version{Major: 42} + // clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen + // specifically so that all the migrations that introduce and backfill the new + // `system.job_info` have run by this point. In the future this startCV should + // be changed to V23_2Start and updated to the next Start key everytime the + // compatability window moves forward. + startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs + endCV := startCV + 1 type migrationEvent struct { unblock chan<- error @@ -405,19 +428,16 @@ func TestPauseMigration(t *testing.T) { tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false), Knobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), Server: &server.TestingKnobs{ - BinaryVersionOverride: startCV, + BinaryVersionOverride: clusterversion.ByKey(startCV), + BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey, DisableAutomaticVersionUpgrade: make(chan struct{}), }, UpgradeManager: &upgradebase.TestingKnobs{ - ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version { - return []roachpb.Version{to} - }, RegistryOverride: func(cv roachpb.Version) (upgradebase.Upgrade, bool) { - if cv != endCV { + if cv != clusterversion.ByKey(endCV) { return nil, false } return upgrade.NewTenantUpgrade("test", cv, upgrade.NoPrecondition, func( @@ -454,13 +474,13 @@ func TestPauseMigration(t *testing.T) { var id int64 tdb.QueryRow(t, ` SELECT id - FROM system.jobs + FROM crdb_internal.system_jobs WHERE ( crdb_internal.pb_to_json( 'cockroach.sql.jobs.jobspb.Payload', payload )->'migration' - ) IS NOT NULL;`). + ) IS NOT NULL AND status = 'running';`). Scan(&id) tdb.Exec(t, "PAUSE JOB $1", id)