Skip to content

Commit

Permalink
sql,backupccl: stop using StartableJobs when not needed
Browse files Browse the repository at this point in the history
We were using `StartableJob`s in the schema change job and the restore
job to start more jobs. This was unnecessary because we didn't need the
ability to get results from the job resumer that `StartableJob`
provide. It also incurred problems because creating a `StartableJob`
creates state that can't be cleaned up if the transaction was restarted.

This commit switches those jobs to normal jobs that are claimed and
started by the registry. To recover the behavior where jobs are started
"immediately," we notify the job registry adoption loop.

Some schema change tests had to be skipped because they relied on the
ability of the schema change job to manually start jobs.

Release note: None
  • Loading branch information
thoszhang committed Feb 19, 2021
1 parent d08c234 commit 0c5bed9
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 161 deletions.
83 changes: 30 additions & 53 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,22 +1324,17 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
// public.
// TODO (lucy): Ideally we'd just create the database in the public state in
// the first place, as a special case.
var newDescriptorChangeJobs []*jobs.StartableJob
publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) {
newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details)
return err
return r.publishDescriptors(ctx, txn, descsCol, details)
}
if err := descs.Txn(
ctx, r.execCfg.Settings, r.execCfg.LeaseManager, r.execCfg.InternalExecutor,
r.execCfg.DB, publishDescriptors,
); err != nil {
return err
}
// Start the schema change jobs we created.
for _, newJob := range newDescriptorChangeJobs {
if err := newJob.Start(ctx); err != nil {
return err
}
if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}
if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil {
if err := fn(); err != nil {
Expand Down Expand Up @@ -1383,9 +1378,8 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
if err := insertStats(ctx, r.job, p.ExecCfg(), latestStats); err != nil {
return errors.Wrap(err, "inserting table statistics")
}
var newDescriptorChangeJobs []*jobs.StartableJob
publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) {
newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details)
err = r.publishDescriptors(ctx, txn, descsCol, details)
return err
}
if err := descs.Txn(
Expand All @@ -1396,12 +1390,8 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
}
// Reload the details as we may have updated the job.
details = r.job.Details().(jobspb.RestoreDetails)

// Start the schema change jobs we created.
for _, newJob := range newDescriptorChangeJobs {
if err := newJob.Start(ctx); err != nil {
return err
}
if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}
if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil {
if err := fn(); err != nil {
Expand Down Expand Up @@ -1507,24 +1497,13 @@ func insertStats(
// with a new value even if this transaction does not commit.
func (r *restoreResumer) publishDescriptors(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, details jobspb.RestoreDetails,
) (newDescriptorChangeJobs []*jobs.StartableJob, err error) {
defer func() {
if err == nil {
return
}
for _, j := range newDescriptorChangeJobs {
if cleanupErr := j.CleanupOnRollback(ctx); cleanupErr != nil {
log.Warningf(ctx, "failed to clean up job %d: %v", j.ID(), cleanupErr)
}
}
newDescriptorChangeJobs = nil
}()
) (err error) {
if details.DescriptorsPublished {
return nil, nil
return nil
}
if fn := r.testingKnobs.beforePublishingDescriptors; fn != nil {
if err := fn(); err != nil {
return nil, err
return err
}
}
log.VEventf(ctx, 1, "making tables live")
Expand All @@ -1551,10 +1530,10 @@ func (r *restoreResumer) publishDescriptors(
for _, tbl := range details.TableDescs {
mutTable, err := descsCol.GetMutableTableVersionByID(ctx, tbl.GetID(), txn)
if err != nil {
return newDescriptorChangeJobs, err
return err
}
if err := checkVersion(mutTable, tbl.Version); err != nil {
return newDescriptorChangeJobs, err
return err
}
allMutDescs = append(allMutDescs, mutTable)
newTables = append(newTables, mutTable.TableDesc())
Expand All @@ -1564,41 +1543,40 @@ func (r *restoreResumer) publishDescriptors(
if details.DescriptorCoverage != tree.AllDescriptors {
// Convert any mutations that were in progress on the table descriptor
// when the backup was taken, and convert them to schema change jobs.
newJobs, err := createSchemaChangeJobsFromMutations(ctx,
r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable)
if err != nil {
return newDescriptorChangeJobs, err
if err := createSchemaChangeJobsFromMutations(ctx,
r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable,
); err != nil {
return err
}
newDescriptorChangeJobs = append(newDescriptorChangeJobs, newJobs...)
}
}
// For all of the newly created types, make type schema change jobs for any
// type descriptors that were backed up in the middle of a type schema change.
for _, typDesc := range details.TypeDescs {
typ, err := descsCol.GetMutableTypeVersionByID(ctx, txn, typDesc.GetID())
if err != nil {
return newDescriptorChangeJobs, err
return err
}
if err := checkVersion(typ, typDesc.Version); err != nil {
return newDescriptorChangeJobs, err
return err
}
allMutDescs = append(allMutDescs, typ)
newTypes = append(newTypes, typ.TypeDesc())
if typ.HasPendingSchemaChanges() && details.DescriptorCoverage != tree.AllDescriptors {
typJob, err := createTypeChangeJobFromDesc(ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), typ)
if err != nil {
return newDescriptorChangeJobs, err
if err := createTypeChangeJobFromDesc(
ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), typ,
); err != nil {
return err
}
newDescriptorChangeJobs = append(newDescriptorChangeJobs, typJob)
}
}
for _, sc := range details.SchemaDescs {
mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, sc.ID, txn)
if err != nil {
return newDescriptorChangeJobs, err
return err
}
if err := checkVersion(mutDesc, sc.Version); err != nil {
return newDescriptorChangeJobs, err
return err
}
mutSchema := mutDesc.(*schemadesc.Mutable)
allMutDescs = append(allMutDescs, mutSchema)
Expand All @@ -1611,10 +1589,10 @@ func (r *restoreResumer) publishDescriptors(
// field in the details?
mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, dbDesc.ID, txn)
if err != nil {
return newDescriptorChangeJobs, err
return err
}
if err := checkVersion(mutDesc, dbDesc.Version); err != nil {
return newDescriptorChangeJobs, err
return err
}
mutDB := mutDesc.(*dbdesc.Mutable)
// TODO(lucy,ajwerner): Remove this in 21.1.
Expand All @@ -1631,17 +1609,17 @@ func (r *restoreResumer) publishDescriptors(
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, desc, b,
); err != nil {
return newDescriptorChangeJobs, err
return err
}
}

if err := txn.Run(ctx, b); err != nil {
return newDescriptorChangeJobs, errors.Wrap(err, "publishing tables")
return errors.Wrap(err, "publishing tables")
}

for _, tenant := range details.Tenants {
if err := sql.ActivateTenant(ctx, r.execCfg, txn, tenant.ID); err != nil {
return newDescriptorChangeJobs, err
return err
}
}

Expand All @@ -1652,11 +1630,10 @@ func (r *restoreResumer) publishDescriptors(
details.SchemaDescs = newSchemas
details.DatabaseDescs = newDBs
if err := r.job.SetDetails(ctx, txn, details); err != nil {
return newDescriptorChangeJobs, errors.Wrap(err,
return errors.Wrap(err,
"updating job details after publishing tables")
}

return newDescriptorChangeJobs, nil
return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface. Removes KV data that
Expand Down
31 changes: 15 additions & 16 deletions pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func createTypeChangeJobFromDesc(
txn *kv.Txn,
username security.SQLUsername,
typ catalog.TypeDescriptor,
) (*jobs.StartableJob, error) {
) error {
// Any non-public members in the type descriptor are accumulated as
// "transitioning" and their promotion or removal will be handled in a
// single job.
Expand All @@ -146,27 +146,27 @@ func createTypeChangeJobFromDesc(
// Type change jobs are not cancellable.
NonCancelable: true,
}
job, err := jr.CreateStartableJobWithTxn(ctx, record, txn)
job, err := jr.CreateJobWithTxn(ctx, record, txn)
if err != nil {
return nil, err
return err
}
return job, nil
log.Infof(ctx, "queued new type schema change job %d for type %d", *job.ID(), typ.GetID())
return nil
}

// createSchemaChangeJobsFromMutations creates and runs jobs for any mutations
// on the table descriptor. It also updates tableDesc's MutationJobs to
// reference the new jobs. This is only used to reconstruct a job based off a
// mutation, namely during RESTORE.
// createSchemaChangeJobsFromMutations creates jobs for any mutations on the
// table descriptor. It also updates tableDesc's MutationJobs to reference the
// new jobs. This is only used to reconstruct a job based off a mutation, namely
// during RESTORE.
func createSchemaChangeJobsFromMutations(
ctx context.Context,
jr *jobs.Registry,
codec keys.SQLCodec,
txn *kv.Txn,
username security.SQLUsername,
tableDesc *tabledesc.Mutable,
) ([]*jobs.StartableJob, error) {
) error {
mutationJobs := make([]descpb.TableDescriptor_MutationJob, 0, len(tableDesc.Mutations))
newJobs := make([]*jobs.StartableJob, 0, len(tableDesc.Mutations))
seenMutations := make(map[descpb.MutationID]bool)
for _, mutation := range tableDesc.Mutations {
if seenMutations[mutation.MutationID] {
Expand All @@ -178,7 +178,7 @@ func createSchemaChangeJobsFromMutations(
seenMutations[mutationID] = true
jobDesc, mutationCount, err := jobDescriptionFromMutationID(tableDesc.TableDesc(), mutationID)
if err != nil {
return nil, err
return err
}
spanList := make([]jobspb.ResumeSpanList, mutationCount)
for i := range spanList {
Expand All @@ -201,20 +201,19 @@ func createSchemaChangeJobsFromMutations(
},
Progress: jobspb.SchemaChangeProgress{},
}
newJob, err := jr.CreateStartableJobWithTxn(ctx, jobRecord, txn)
newJob, err := jr.CreateJobWithTxn(ctx, jobRecord, txn)
if err != nil {
return nil, err
return err
}
newMutationJob := descpb.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: *newJob.ID(),
}
mutationJobs = append(mutationJobs, newMutationJob)
newJobs = append(newJobs, newJob)

log.Infof(ctx, "queued new schema change job %d for table %d, mutation %d",
newJob.ID(), tableDesc.ID, mutationID)
*newJob.ID(), tableDesc.ID, mutationID)
}
tableDesc.MutationJobs = mutationJobs
return newJobs, nil
return nil
}
31 changes: 15 additions & 16 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,30 @@ func (r *Registry) CreateAndStartJob(
return rj, nil
}

// NotifyToAdoptJobs notifies the job adoption loop to start claimed jobs.
func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error {
select {
case r.adoptionCh <- resumeClaimedJobs:
case <-r.stopper.ShouldQuiesce():
return stop.ErrUnavailable
case <-ctx.Done():
return ctx.Err()
}
return nil
}

// Run starts previously unstarted jobs from a list of scheduled
// jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs.
func (r *Registry) Run(ctx context.Context, ex sqlutil.InternalExecutor, jobs []int64) error {
if len(jobs) == 0 {
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
buf := bytes.Buffer{}
usingSQLLiveness := r.startUsingSQLLivenessAdoption(ctx)
for i, id := range jobs {
// In the pre-20.2 and mixed-version state, the adoption loop needs to be
// notified once per job (in the worst case) in order to ensure that all
// newly created jobs get adopted in a timely manner. In the sqlliveness
// world of 20.2 and later, we only need to notify the loop once as the
// newly created jobs are already claimed. The adoption loop will merely
// start all previously claimed jobs.
if !usingSQLLiveness || i == 0 {
select {
case r.adoptionCh <- resumeClaimedJobs:
case <-r.stopper.ShouldQuiesce():
return stop.ErrUnavailable
case <-ctx.Done():
return ctx.Err()
}
}
if i > 0 {
buf.WriteString(",")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/alter_column_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -35,6 +36,7 @@ import (
func TestInsertBeforeOldColumnIsDropped(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 60801, "needs to be rewritten to not use RunBeforeChildJobs")

params, _ := tests.CreateTestServerParams()
childJobStartNotification := make(chan struct{})
Expand Down Expand Up @@ -97,6 +99,7 @@ ALTER TABLE test ALTER COLUMN x TYPE STRING;`)
func TestInsertBeforeOldColumnIsDroppedUsingExpr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 60801, "needs to be rewritten to not use RunBeforeChildJobs")

params, _ := tests.CreateTestServerParams()
childJobStartNotification := make(chan struct{})
Expand Down
Loading

0 comments on commit 0c5bed9

Please sign in to comment.