Skip to content

Commit

Permalink
Merge pull request #100867 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-100830
  • Loading branch information
knz authored Apr 6, 2023
2 parents 14cf13d + bede330 commit 2bb5d17
Showing 1 changed file with 51 additions and 31 deletions.
82 changes: 51 additions & 31 deletions pkg/upgrade/upgrademanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,34 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We're going to be migrating from startCV to endCV.
startCV := roachpb.Version{Major: 41}
endCV := roachpb.Version{Major: 42}
// clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen
// specifically so that all the migrations that introduce and backfill the new
// `system.job_info` have run by this point. In the future this startCV should
// be changed to V23_2Start and updated to the next Start key everytime the
// compatability window moves forward.
startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs
endCV := startCV + 1

ch := make(chan chan error)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Server: &server.TestingKnobs{
BinaryVersionOverride: startCV,
BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey,
BinaryVersionOverride: clusterversion.ByKey(startCV),
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
DistSQL: &execinfra.TestingKnobs{
// See the TODO below for why we need this.
ProcessorNoTracingSpan: true,
},
UpgradeManager: &upgradebase.TestingKnobs{
ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version {
return []roachpb.Version{to}
},
RegistryOverride: func(v roachpb.Version) (upgradebase.Upgrade, bool) {
if v != endCV {
if v != clusterversion.ByKey(endCV) {
return nil, false
}
return upgrade.NewTenantUpgrade("test", v, upgrade.NoPrecondition, func(
Expand All @@ -107,13 +109,26 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)

// At this point the test cluster has run all the migrations until startCV.

upgrade1Err := make(chan error, 1)
go func() {
_, err := tc.ServerConn(0).ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
upgrade1Err <- err
}()
unblock := <-ch

var firstID jobspb.JobID
var firstPayload, firstProgress []byte
require.NoError(t, tc.ServerConn(0).QueryRow(`
SELECT id, payload, progress FROM crdb_internal.system_jobs WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL AND status = 'running'
`).Scan(&firstID, &firstPayload, &firstProgress))

// Inject a second job for the same upgrade and ensure that that causes
// an error. This is pretty gnarly.
var secondID jobspb.JobID
Expand All @@ -125,21 +140,25 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) {
unique_rowid(),
status,
created,
payload,
progress,
NULL,
NULL,
created_by_type,
created_by_id,
claim_session_id,
claim_instance_id
FROM system.jobs
WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL
FROM crdb_internal.system_jobs
WHERE id = $1
)
RETURNING id;`).Scan(&secondID))
RETURNING id;`, firstID).Scan(&secondID))
// Insert the job payload and progress into the `system.job_info` table.
err := tc.Server(0).InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, secondID)
if err := infoStorage.WriteLegacyPayload(ctx, firstPayload); err != nil {
return err
}
return infoStorage.WriteLegacyProgress(ctx, firstProgress)
})
require.NoError(t, err)

// Make sure that the second job gets run in a timely manner.
runErr := make(chan error)
Expand All @@ -150,7 +169,7 @@ RETURNING id;`).Scan(&secondID))
fakeJobBlockChan := <-ch

// Ensure that we see the assertion error.
_, err := tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
_, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String())
require.Regexp(t, "found multiple non-terminal jobs for version", err)

// Let the fake, erroneous job finish with an error.
Expand Down Expand Up @@ -392,9 +411,13 @@ func TestPauseMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// We're going to be migrating from startCV to endCV.
startCV := roachpb.Version{Major: 41}
endCV := roachpb.Version{Major: 42}
// clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs was chosen
// specifically so that all the migrations that introduce and backfill the new
// `system.job_info` have run by this point. In the future this startCV should
// be changed to V23_2Start and updated to the next Start key everytime the
// compatability window moves forward.
startCV := clusterversion.V23_1StopWritingPayloadAndProgressToSystemJobs
endCV := startCV + 1

type migrationEvent struct {
unblock chan<- error
Expand All @@ -405,19 +428,16 @@ func TestPauseMigration(t *testing.T) {
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV, startCV, false),
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Server: &server.TestingKnobs{
BinaryVersionOverride: startCV,
BinaryVersionOverride: clusterversion.ByKey(startCV),
BootstrapVersionKeyOverride: clusterversion.BinaryMinSupportedVersionKey,
DisableAutomaticVersionUpgrade: make(chan struct{}),
},
UpgradeManager: &upgradebase.TestingKnobs{
ListBetweenOverride: func(from, to roachpb.Version) []roachpb.Version {
return []roachpb.Version{to}
},
RegistryOverride: func(cv roachpb.Version) (upgradebase.Upgrade, bool) {
if cv != endCV {
if cv != clusterversion.ByKey(endCV) {
return nil, false
}
return upgrade.NewTenantUpgrade("test", cv, upgrade.NoPrecondition, func(
Expand Down Expand Up @@ -454,13 +474,13 @@ func TestPauseMigration(t *testing.T) {
var id int64
tdb.QueryRow(t, `
SELECT id
FROM system.jobs
FROM crdb_internal.system_jobs
WHERE (
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload
)->'migration'
) IS NOT NULL;`).
) IS NOT NULL AND status = 'running';`).
Scan(&id)
tdb.Exec(t, "PAUSE JOB $1", id)

Expand Down

0 comments on commit 2bb5d17

Please sign in to comment.