Skip to content

Commit

Permalink
schemachanger,backupccl: support for backup and restore mid-change
Browse files Browse the repository at this point in the history
This commit touches the schema changer to encode some more metadata about the
state of the schema change into the descriptors in order to better facilitate
synthesizing a job at restore time.

The major contribution, then, is to support the declarative schema changer in
restore. Along the way, it picks up some catalog niceties like batches
descriptor retreival and the nstree.Catalog.

Finally, the code is tested with a suite of datadriven tests which run schema
changes and then take the state and BACKUP/RESTORE it at each of the possible
stages (including all of the rollback stages) and makes sure that the right
outcome ultimately happens.

Note that this is currently just tested for database-level BACKUP/RESTORE
and that more testing is planned as follow-up.

Release note: None
  • Loading branch information
ajwerner committed Feb 17, 2022
1 parent 4c2dc32 commit bec814d
Show file tree
Hide file tree
Showing 20 changed files with 848 additions and 243 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/multiregion",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/resolver",
"//pkg/sql/catalog/schemadesc",
"//pkg/sql/catalog/schemaexpr",
Expand All @@ -96,6 +97,10 @@ go_library(
"//pkg/sql/roleoption",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/schemachanger/rel",
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/screl",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down
214 changes: 150 additions & 64 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/screl"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand Down Expand Up @@ -1870,30 +1874,44 @@ func (r *restoreResumer) publishDescriptors(

// Write the new descriptors and flip state over to public so they can be
// accessed.
allMutDescs := make([]catalog.MutableDescriptor, 0,
len(details.TableDescs)+len(details.TypeDescs)+len(details.SchemaDescs)+len(details.DatabaseDescs))

// Pre-fetch all the descriptors into the collection to avoid doing
// round-trips per descriptor.
all, err := prefetchDescriptors(ctx, txn, descsCol, details)
if err != nil {
return err
}

// Create slices of raw descriptors for the restore job details.
newTables := make([]*descpb.TableDescriptor, 0, len(details.TableDescs))
newTypes := make([]*descpb.TypeDescriptor, 0, len(details.TypeDescs))
newSchemas := make([]*descpb.SchemaDescriptor, 0, len(details.SchemaDescs))
newDBs := make([]*descpb.DatabaseDescriptor, 0, len(details.DatabaseDescs))
checkVersion := func(read catalog.Descriptor, exp descpb.DescriptorVersion) error {
if read.GetVersion() == exp {
return nil

// Go through the descriptors and find any declarative schema change jobs
// affecting them.
//
// If we're restoring all the descriptors, it means we're also restoring the
// jobs.
if details.DescriptorCoverage != tree.AllDescriptors {
if err := createDeclarativeSchemaChangeJobs(
ctx, r.execCfg.JobRegistry, txn, all,
); err != nil {
return err
}
return errors.Errorf("version mismatch for descriptor %d, expected version %d, got %v",
read.GetID(), read.GetVersion(), exp)
}

// Write the new TableDescriptors and flip state over to public so they can be
// accessed.
for _, tbl := range details.TableDescs {
mutTable, err := descsCol.GetMutableTableVersionByID(ctx, tbl.GetID(), txn)
if err != nil {
return err
}
if err := checkVersion(mutTable, tbl.Version); err != nil {
return err
for i := range details.TableDescs {
mutTable := all.LookupDescriptorEntry(details.TableDescs[i].GetID()).(*tabledesc.Mutable)
// Note that we don't need to worry about the re-validated indexes for descriptors
// with a declarative schema change job.
if mutTable.GetDeclarativeSchemaChangerState() != nil {
newTables = append(newTables, mutTable.TableDesc())
continue
}

badIndexes := devalidateIndexes[mutTable.ID]
for _, badIdx := range badIndexes {
found, err := mutTable.FindIndexWithID(badIdx)
Expand All @@ -1910,7 +1928,6 @@ func (r *restoreResumer) publishDescriptors(
if err := mutTable.AllocateIDs(ctx, version); err != nil {
return err
}
allMutDescs = append(allMutDescs, mutTable)
newTables = append(newTables, mutTable.TableDesc())
// For cluster restores, all the jobs are restored directly from the jobs
// table, so there is no need to re-create ongoing schema change jobs,
Expand All @@ -1927,67 +1944,36 @@ func (r *restoreResumer) publishDescriptors(
}
// For all of the newly created types, make type schema change jobs for any
// type descriptors that were backed up in the middle of a type schema change.
for _, typDesc := range details.TypeDescs {
typ, err := descsCol.GetMutableTypeVersionByID(ctx, txn, typDesc.GetID())
if err != nil {
return err
}
if err := checkVersion(typ, typDesc.Version); err != nil {
return err
}
allMutDescs = append(allMutDescs, typ)
for i := range details.TypeDescs {
typ := all.LookupDescriptorEntry(details.TypeDescs[i].GetID()).(catalog.TypeDescriptor)
newTypes = append(newTypes, typ.TypeDesc())
if typ.HasPendingSchemaChanges() && details.DescriptorCoverage != tree.AllDescriptors {
if typ.GetDeclarativeSchemaChangerState() == nil &&
typ.HasPendingSchemaChanges() && details.DescriptorCoverage != tree.AllDescriptors {
if err := createTypeChangeJobFromDesc(
ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), typ,
); err != nil {
return err
}
}
}
for _, sc := range details.SchemaDescs {
mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, txn, sc.ID)
if err != nil {
return err
}
if err := checkVersion(mutDesc, sc.Version); err != nil {
return err
}
mutSchema := mutDesc.(*schemadesc.Mutable)
allMutDescs = append(allMutDescs, mutSchema)
newSchemas = append(newSchemas, mutSchema.SchemaDesc())
for i := range details.SchemaDescs {
sc := all.LookupDescriptorEntry(details.SchemaDescs[i].GetID()).(catalog.SchemaDescriptor)
newSchemas = append(newSchemas, sc.SchemaDesc())
}
for _, dbDesc := range details.DatabaseDescs {
// Jobs started before 20.2 upgrade finalization don't put databases in
// an offline state.
// TODO(lucy): Should we make this more explicit with a format version
// field in the details?
mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, txn, dbDesc.ID)
if err != nil {
return err
}
if err := checkVersion(mutDesc, dbDesc.Version); err != nil {
return err
}
mutDB := mutDesc.(*dbdesc.Mutable)
// TODO(lucy,ajwerner): Remove this in 21.1.
if !mutDB.Offline() {
newDBs = append(newDBs, dbDesc)
} else {
allMutDescs = append(allMutDescs, mutDB)
newDBs = append(newDBs, mutDB.DatabaseDesc())
}
for i := range details.DatabaseDescs {
db := all.LookupDescriptorEntry(details.DatabaseDescs[i].GetID()).(catalog.DatabaseDescriptor)
newDBs = append(newDBs, db.DatabaseDesc())
}
b := txn.NewBatch()
for _, desc := range allMutDescs {
desc.SetPublic()
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, desc, b,
); err != nil {
return err
}
if err := all.ForEachDescriptorEntry(func(desc catalog.Descriptor) error {
d := desc.(catalog.MutableDescriptor)
d.SetPublic()
return descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, d, b,
)
}); err != nil {
return err
}

if err := txn.Run(ctx, b); err != nil {
return errors.Wrap(err, "publishing tables")
}
Expand All @@ -2011,6 +1997,106 @@ func (r *restoreResumer) publishDescriptors(
return nil
}

// prefetchDescriptors calculates the set of descriptors needed by looking
// at the relevant fields of the job details. It then fetches all of those
// descriptors in a batch using the descsCol. It packages up that set of
// descriptors into an nstree.Catalog for easy use.
//
// This function also takes care of asserting that the retrieved version
// matches the expectation.
func prefetchDescriptors(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, details jobspb.RestoreDetails,
) (_ nstree.Catalog, _ error) {
var all nstree.MutableCatalog
var allDescIDs catalog.DescriptorIDSet
expVersion := map[descpb.ID]descpb.DescriptorVersion{}
for i := range details.TableDescs {
expVersion[details.TableDescs[i].GetID()] = details.TableDescs[i].GetVersion()
allDescIDs.Add(details.TableDescs[i].GetID())
}
for i := range details.TypeDescs {
expVersion[details.TypeDescs[i].GetID()] = details.TypeDescs[i].GetVersion()
allDescIDs.Add(details.TypeDescs[i].GetID())
}
for i := range details.SchemaDescs {
expVersion[details.SchemaDescs[i].GetID()] = details.SchemaDescs[i].GetVersion()
allDescIDs.Add(details.SchemaDescs[i].GetID())
}
for i := range details.DatabaseDescs {
expVersion[details.DatabaseDescs[i].GetID()] = details.DatabaseDescs[i].GetVersion()
allDescIDs.Add(details.DatabaseDescs[i].GetID())
}
// Note that no maximum size is put on the batch here because,
// in general, we assume that we can fit all of the descriptors
// in RAM (we have them in RAM as part of the details object,
// and we're going to write them to KV very soon as part of a
// single batch).
ids := allDescIDs.Ordered()
got, err := descsCol.GetMutableDescriptorsByID(ctx, txn, ids...)
if err != nil {
return nstree.Catalog{}, errors.Wrap(err, "prefetch descriptors")
}
for i, id := range ids {
if got[i].GetVersion() != expVersion[id] {
return nstree.Catalog{}, errors.Errorf(
"version mismatch for descriptor %d, expected version %d, got %v",
got[i].GetID(), got[i].GetVersion(), expVersion[id],
)
}
all.UpsertDescriptorEntry(got[i])
}
return all.Catalog, nil
}

// createDeclarativeSchemaChangeJobs is called during the last phase of a
// restore. The provided catalog should contain all descriptors being restored.
// The code here will iterate those descriptors and synthesize the appropriate
// jobs.
//
// It should only be called for backups which do not restore the jobs table
// directly.
func createDeclarativeSchemaChangeJobs(
ctx context.Context, registry *jobs.Registry, txn *kv.Txn, allMut nstree.Catalog,
) error {
byJobID := make(map[catpb.JobID][]catalog.MutableDescriptor)
_ = allMut.ForEachDescriptorEntry(func(d catalog.Descriptor) error {
if s := d.GetDeclarativeSchemaChangerState(); s != nil {
byJobID[s.JobID] = append(byJobID[s.JobID], d.(catalog.MutableDescriptor))
}
return nil
})
var records []*jobs.Record
for _, descs := range byJobID {
// TODO(ajwerner): Consider the need to trim elements or update
// descriptors in the face of restoring only some constituent
// descriptors of a larger change. One example where this needs
// to happen urgently is sequences. Others shouldn't be possible
// at this point.
newID := registry.MakeJobID()
var descriptorStates []*scpb.DescriptorState
for _, d := range descs {
ds := d.GetDeclarativeSchemaChangerState()
ds.JobID = newID
descriptorStates = append(descriptorStates, ds)
}
// TODO(ajwerner): Deal with rollback and revertibility.
currentState, err := scpb.MakeCurrentStateFromDescriptors(
descriptorStates,
)
if err != nil {
return err
}
records = append(records, scexec.MakeDeclarativeSchemaChangeJobRecord(
newID,
currentState.Statements,
currentState.Authorization,
screl.GetDescIDs(currentState.TargetState).Ordered(),
))
}
_, err := registry.CreateJobsWithTxn(ctx, txn, records)
return err
}

func emitRestoreJobEvent(
ctx context.Context, p sql.JobExecContext, status jobs.Status, job *jobs.Job,
) {
Expand Down
Loading

0 comments on commit bec814d

Please sign in to comment.