Skip to content

Commit

Permalink
upgrademanager: fix upgrade manager tests that relied on wrong invari…
Browse files Browse the repository at this point in the history
…ants

The two tests in question were low level tests that upgrade clusters
from mock version 41 to 42 and override the upgrade flow to
test the internals of the upgrade manager. These mock cluster versions
were not tied to our real cluster versions and so were not offset
with the `DevOffset` when a branch is marked as a development branch.
For this reason when a branch was a developmentBranch they would sort
under all the "real" cluster versions while when a branch was marked
as a release branch they would sort above all the "real" cluster versions.
This undeterministic behaviour did not play well with certain job migrations
that were added this release.

This change rewrites the test to use real cluster versions so that their
values are in sync with whether the branch is a developmentBranch or not.
This change also removes some overrides to make the test more intuitive.
Cocnretely, the test servers now bootstrap at the minimum supported binary version
and run all the upgrades until the `startCV` before executing the body of the
test.

Fixes: #100685
Informs: #100552

Release note: None
  • Loading branch information
adityamaru committed Apr 6, 2023
1 parent 14cf13d commit bede330
Showing 1 changed file with 51 additions and 31 deletions.
82 changes: 51 additions & 31 deletions pkg/upgrade/upgrademanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,34 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We're going to be migrating from startCV to endCV.
startCV := roachpb.Version{Major: 41}
endCV := roachpb.Version{Major: 42}
// clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen
// specifically so that all the migrations that introduce and backfill the new
// `system.job_info` have run by this point. In the future this startCV should
// be changed to V23_2Start and updated to the next Start key everytime the
// compatability window moves forward.
startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs
endCV := startCV + 1

ch := make(chan chan error)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Server: &server.TestingKnobs{
BinaryVersionOverride: startCV,
BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey,
BinaryVersionOverride: clusterversion.ByKey(startCV),
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
DistSQL: &execinfra.TestingKnobs{
// See the TODO below for why we need this.
ProcessorNoTracingSpan: true,
},
UpgradeManager: &upgradebase.TestingKnobs{
ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version {
return []roachpb.Version{to}
},
RegistryOverride: func(v roachpb.Version) (upgradebase.Upgrade, bool) {
if v != endCV {
if v != clusterversion.ByKey(endCV) {
return nil, false
}
return upgrade.NewTenantUpgrade("test", v, upgrade.NoPrecondition, func(
Expand All @@ -107,13 +109,26 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)

// At this point the test cluster has run all the migrations until startCV.

upgrade1Err := make(chan error, 1)
go func() {
_, err := tc.ServerConn(0).ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
upgrade1Err <- err
}()
unblock := <-ch

var firstID jobspb.JobID
var firstPayload, firstProgress []byte
require.NoError(t, tc.ServerConn(0).QueryRow(`
SELECT id, payload, progress FROM crdb_internal.system_jobs WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL AND status = 'running'
`).Scan(&firstID, &firstPayload, &firstProgress))

// Inject a second job for the same upgrade and ensure that that causes
// an error. This is pretty gnarly.
var secondID jobspb.JobID
Expand All @@ -125,21 +140,25 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
unique_rowid(),
status,
created,
payload,
progress,
NULL,
NULL,
created_by_type,
created_by_id,
claim_session_id,
claim_instance_id
FROM system.jobs
WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL
FROM crdb_internal.system_jobs
WHERE id = $1
)
RETURNING id;`).Scan(&secondID))
RETURNING id;`, firstID).Scan(&secondID))
// Insert the job payload and progress into the `system.job_info` table.
err := tc.Server(0).InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, secondID)
if err := infoStorage.WriteLegacyPayload(ctx, firstPayload); err != nil {
return err
}
return infoStorage.WriteLegacyProgress(ctx, firstProgress)
})
require.NoError(t, err)

// Make sure that the second job gets run in a timely manner.
runErr := make(chan error)
Expand All @@ -150,7 +169,7 @@ RETURNING id;`).Scan(&secondID))
fakeJobBlockChan := <-ch

// Ensure that we see the assertion error.
_, err := tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
_, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
require.Regexp(t, "found multiple non-terminal jobs for version", err)

// Let the fake, erroneous job finish with an error.
Expand Down Expand Up @@ -392,9 +411,13 @@ func TestPauseMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We're going to be migrating from startCV to endCV.
startCV := roachpb.Version{Major: 41}
endCV := roachpb.Version{Major: 42}
// clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen
// specifically so that all the migrations that introduce and backfill the new
// `system.job_info` have run by this point. In the future this startCV should
// be changed to V23_2Start and updated to the next Start key everytime the
// compatability window moves forward.
startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs
endCV := startCV + 1

type migrationEvent struct {
unblock chan<- error
Expand All @@ -405,19 +428,16 @@ func TestPauseMigration(t *testing.T) {
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Server: &server.TestingKnobs{
BinaryVersionOverride: startCV,
BinaryVersionOverride: clusterversion.ByKey(startCV),
BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
UpgradeManager: &upgradebase.TestingKnobs{
ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version {
return []roachpb.Version{to}
},
RegistryOverride: func(cv roachpb.Version) (upgradebase.Upgrade, bool) {
if cv != endCV {
if cv != clusterversion.ByKey(endCV) {
return nil, false
}
return upgrade.NewTenantUpgrade("test", cv, upgrade.NoPrecondition, func(
Expand Down Expand Up @@ -454,13 +474,13 @@ func TestPauseMigration(t *testing.T) {
var id int64
tdb.QueryRow(t, `
SELECT id
FROM system.jobs
FROM crdb_internal.system_jobs
WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL;`).
) IS NOT NULL AND status = 'running';`).
Scan(&id)
tdb.Exec(t, "PAUSE JOB $1", id)

Expand Down

0 comments on commit bede330

Please sign in to comment.