Skip to content

Commit

Permalink
schemachanger,backupccl: support for backup and restore mid-change
Browse files Browse the repository at this point in the history
This commit touches the schema changer to encode some more metadata about the
state of the schema change into the descriptors in order to better facilitate
synthesizing a job at restore time.

The major contribution, then, is to support the declarative schema changer in
restore. Along the way, it picks up some catalog niceties like batches
descriptor retreival and the nstree.Catalog.

Finally, the code is tested with a suite of datadriven tests which run schema
changes and then take the state and BACKUP/RESTORE it at each of the possible
stages (including all of the rollback stages) and makes sure that the right
outcome ultimately happens.

Note that this is currently just tested for database-level BACKUP/RESTORE
and that more testing is planned as follow-up.

Release note: None
  • Loading branch information
ajwerner committed Feb 18, 2022
1 parent 0ca4592 commit c10b5ef
Show file tree
Hide file tree
Showing 20 changed files with 907 additions and 238 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/multiregion",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/resolver",
"//pkg/sql/catalog/schemadesc",
"//pkg/sql/catalog/schemaexpr",
Expand All @@ -96,6 +97,9 @@ go_library(
"//pkg/sql/roleoption",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/schemachanger/scbackup",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/screl",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down
163 changes: 99 additions & 64 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scbackup"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand Down Expand Up @@ -1870,30 +1872,44 @@ func (r *restoreResumer) publishDescriptors(

// Write the new descriptors and flip state over to public so they can be
// accessed.
allMutDescs := make([]catalog.MutableDescriptor, 0,
len(details.TableDescs)+len(details.TypeDescs)+len(details.SchemaDescs)+len(details.DatabaseDescs))

// Pre-fetch all the descriptors into the collection to avoid doing
// round-trips per descriptor.
all, err := prefetchDescriptors(ctx, txn, descsCol, details)
if err != nil {
return err
}

// Create slices of raw descriptors for the restore job details.
newTables := make([]*descpb.TableDescriptor, 0, len(details.TableDescs))
newTypes := make([]*descpb.TypeDescriptor, 0, len(details.TypeDescs))
newSchemas := make([]*descpb.SchemaDescriptor, 0, len(details.SchemaDescs))
newDBs := make([]*descpb.DatabaseDescriptor, 0, len(details.DatabaseDescs))
checkVersion := func(read catalog.Descriptor, exp descpb.DescriptorVersion) error {
if read.GetVersion() == exp {
return nil

// Go through the descriptors and find any declarative schema change jobs
// affecting them.
//
// If we're restoring all the descriptors, it means we're also restoring the
// jobs.
if details.DescriptorCoverage != tree.AllDescriptors {
if err := scbackup.CreateDeclarativeSchemaChangeJobs(
ctx, r.execCfg.JobRegistry, txn, all,
); err != nil {
return err
}
return errors.Errorf("version mismatch for descriptor %d, expected version %d, got %v",
read.GetID(), read.GetVersion(), exp)
}

// Write the new TableDescriptors and flip state over to public so they can be
// accessed.
for _, tbl := range details.TableDescs {
mutTable, err := descsCol.GetMutableTableVersionByID(ctx, tbl.GetID(), txn)
if err != nil {
return err
}
if err := checkVersion(mutTable, tbl.Version); err != nil {
return err
for i := range details.TableDescs {
mutTable := all.LookupDescriptorEntry(details.TableDescs[i].GetID()).(*tabledesc.Mutable)
// Note that we don't need to worry about the re-validated indexes for descriptors
// with a declarative schema change job.
if mutTable.GetDeclarativeSchemaChangerState() != nil {
newTables = append(newTables, mutTable.TableDesc())
continue
}

badIndexes := devalidateIndexes[mutTable.ID]
for _, badIdx := range badIndexes {
found, err := mutTable.FindIndexWithID(badIdx)
Expand All @@ -1910,7 +1926,6 @@ func (r *restoreResumer) publishDescriptors(
if err := mutTable.AllocateIDs(ctx, version); err != nil {
return err
}
allMutDescs = append(allMutDescs, mutTable)
newTables = append(newTables, mutTable.TableDesc())
// For cluster restores, all the jobs are restored directly from the jobs
// table, so there is no need to re-create ongoing schema change jobs,
Expand All @@ -1927,67 +1942,36 @@ func (r *restoreResumer) publishDescriptors(
}
// For all of the newly created types, make type schema change jobs for any
// type descriptors that were backed up in the middle of a type schema change.
for _, typDesc := range details.TypeDescs {
typ, err := descsCol.GetMutableTypeVersionByID(ctx, txn, typDesc.GetID())
if err != nil {
return err
}
if err := checkVersion(typ, typDesc.Version); err != nil {
return err
}
allMutDescs = append(allMutDescs, typ)
for i := range details.TypeDescs {
typ := all.LookupDescriptorEntry(details.TypeDescs[i].GetID()).(catalog.TypeDescriptor)
newTypes = append(newTypes, typ.TypeDesc())
if typ.HasPendingSchemaChanges() && details.DescriptorCoverage != tree.AllDescriptors {
if typ.GetDeclarativeSchemaChangerState() == nil &&
typ.HasPendingSchemaChanges() && details.DescriptorCoverage != tree.AllDescriptors {
if err := createTypeChangeJobFromDesc(
ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), typ,
); err != nil {
return err
}
}
}
for _, sc := range details.SchemaDescs {
mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, txn, sc.ID)
if err != nil {
return err
}
if err := checkVersion(mutDesc, sc.Version); err != nil {
return err
}
mutSchema := mutDesc.(*schemadesc.Mutable)
allMutDescs = append(allMutDescs, mutSchema)
newSchemas = append(newSchemas, mutSchema.SchemaDesc())
for i := range details.SchemaDescs {
sc := all.LookupDescriptorEntry(details.SchemaDescs[i].GetID()).(catalog.SchemaDescriptor)
newSchemas = append(newSchemas, sc.SchemaDesc())
}
for _, dbDesc := range details.DatabaseDescs {
// Jobs started before 20.2 upgrade finalization don't put databases in
// an offline state.
// TODO(lucy): Should we make this more explicit with a format version
// field in the details?
mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, txn, dbDesc.ID)
if err != nil {
return err
}
if err := checkVersion(mutDesc, dbDesc.Version); err != nil {
return err
}
mutDB := mutDesc.(*dbdesc.Mutable)
// TODO(lucy,ajwerner): Remove this in 21.1.
if !mutDB.Offline() {
newDBs = append(newDBs, dbDesc)
} else {
allMutDescs = append(allMutDescs, mutDB)
newDBs = append(newDBs, mutDB.DatabaseDesc())
}
for i := range details.DatabaseDescs {
db := all.LookupDescriptorEntry(details.DatabaseDescs[i].GetID()).(catalog.DatabaseDescriptor)
newDBs = append(newDBs, db.DatabaseDesc())
}
b := txn.NewBatch()
for _, desc := range allMutDescs {
desc.SetPublic()
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, desc, b,
); err != nil {
return err
}
if err := all.ForEachDescriptorEntry(func(desc catalog.Descriptor) error {
d := desc.(catalog.MutableDescriptor)
d.SetPublic()
return descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, d, b,
)
}); err != nil {
return err
}

if err := txn.Run(ctx, b); err != nil {
return errors.Wrap(err, "publishing tables")
}
Expand All @@ -2011,6 +1995,57 @@ func (r *restoreResumer) publishDescriptors(
return nil
}

// prefetchDescriptors calculates the set of descriptors needed by looking
// at the relevant fields of the job details. It then fetches all of those
// descriptors in a batch using the descsCol. It packages up that set of
// descriptors into an nstree.Catalog for easy use.
//
// This function also takes care of asserting that the retrieved version
// matches the expectation.
func prefetchDescriptors(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, details jobspb.RestoreDetails,
) (_ nstree.Catalog, _ error) {
var all nstree.MutableCatalog
var allDescIDs catalog.DescriptorIDSet
expVersion := map[descpb.ID]descpb.DescriptorVersion{}
for i := range details.TableDescs {
expVersion[details.TableDescs[i].GetID()] = details.TableDescs[i].GetVersion()
allDescIDs.Add(details.TableDescs[i].GetID())
}
for i := range details.TypeDescs {
expVersion[details.TypeDescs[i].GetID()] = details.TypeDescs[i].GetVersion()
allDescIDs.Add(details.TypeDescs[i].GetID())
}
for i := range details.SchemaDescs {
expVersion[details.SchemaDescs[i].GetID()] = details.SchemaDescs[i].GetVersion()
allDescIDs.Add(details.SchemaDescs[i].GetID())
}
for i := range details.DatabaseDescs {
expVersion[details.DatabaseDescs[i].GetID()] = details.DatabaseDescs[i].GetVersion()
allDescIDs.Add(details.DatabaseDescs[i].GetID())
}
// Note that no maximum size is put on the batch here because,
// in general, we assume that we can fit all of the descriptors
// in RAM (we have them in RAM as part of the details object,
// and we're going to write them to KV very soon as part of a
// single batch).
ids := allDescIDs.Ordered()
got, err := descsCol.GetMutableDescriptorsByID(ctx, txn, ids...)
if err != nil {
return nstree.Catalog{}, errors.Wrap(err, "prefetch descriptors")
}
for i, id := range ids {
if got[i].GetVersion() != expVersion[id] {
return nstree.Catalog{}, errors.Errorf(
"version mismatch for descriptor %d, expected version %d, got %v",
got[i].GetID(), got[i].GetVersion(), expVersion[id],
)
}
all.UpsertDescriptorEntry(got[i])
}
return all.Catalog, nil
}

func emitRestoreJobEvent(
ctx context.Context, p sql.JobExecContext, status jobs.Status, job *jobs.Job,
) {
Expand Down
93 changes: 93 additions & 0 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/screl"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -1065,6 +1067,10 @@ func rewriteDatabaseDescs(databases []*dbdesc.Mutable, descriptorRewrites DescRe
db.Version = 1
db.ModificationTime = hlc.Timestamp{}

if err := rewriteSchemaChangerState(db, descriptorRewrites); err != nil {
return err
}

// Rewrite the name-to-ID mapping for the database's child schemas.
newSchemas := make(map[string]descpb.DatabaseDescriptor_SchemaInfo)
err := db.ForEachNonDroppedSchema(func(id descpb.ID, name string) error {
Expand Down Expand Up @@ -1130,6 +1136,10 @@ func rewriteTypeDescs(types []*typedesc.Mutable, descriptorRewrites DescRewriteM
typ.Version = 1
typ.ModificationTime = hlc.Timestamp{}

if err := rewriteSchemaChangerState(typ, descriptorRewrites); err != nil {
return err
}

typ.ID = rewrite.ID
typ.ParentSchemaID = rewrite.ParentSchemaID
typ.ParentID = rewrite.ParentID
Expand Down Expand Up @@ -1170,6 +1180,85 @@ func rewriteSchemaDescs(schemas []*schemadesc.Mutable, descriptorRewrites DescRe

sc.ID = rewrite.ID
sc.ParentID = rewrite.ParentID

if err := rewriteSchemaChangerState(sc, descriptorRewrites); err != nil {
return err
}
}
return nil
}

// rewriteSchemaChangerState handles rewriting any references to IDs stored in
// the descriptor's declarative schema changer state.
func rewriteSchemaChangerState(
d catalog.MutableDescriptor, descriptorRewrites DescRewriteMap,
) (err error) {
state := d.GetDeclarativeSchemaChangerState()
if state == nil {
return nil
}
defer func() {
if err != nil {
err = errors.Wrap(err, "rewriting declarative schema changer state")
}
}()
for i := 0; i < len(state.Targets); i++ {
t := &state.Targets[i]
if err := screl.WalkDescIDs(t.Element(), func(id *descpb.ID) error {
rewrite, ok := descriptorRewrites[*id]
if !ok {
return errors.Errorf("missing rewrite for id %d in %T", *id, t)
}
*id = rewrite.ID
return nil
}); err != nil {
// We'll permit this in the special case of a schema descriptor
// database entry.
//
// TODO(ajwerner,postamar): it's not obvious that this should be its own
// element as opposed to just an extension of the namespace table that only
// ops know about.
switch el := t.Element().(type) {
case *scpb.DatabaseSchemaEntry:
_, scExists := descriptorRewrites[el.SchemaID]
if !scExists && state.CurrentStatuses[i] == scpb.Status_ABSENT {
state.Targets = append(state.Targets[:i], state.Targets[i+1:]...)
state.CurrentStatuses = append(state.CurrentStatuses[:i], state.CurrentStatuses[i+1:]...)
state.TargetRanks = append(state.TargetRanks[:i], state.TargetRanks[i+1:]...)
i--
continue
}
}
return errors.Wrap(err, "rewriting descriptor ids")
}

if err := screl.WalkExpressions(t.Element(), func(expr *catpb.Expression) error {
if *expr == "" {
return nil
}
newExpr, err := rewriteTypesInExpr(string(*expr), descriptorRewrites)
if err != nil {
return errors.Wrapf(err, "rewriting expression type references: %q", *expr)
}
newExpr, err = rewriteSequencesInExpr(newExpr, descriptorRewrites)
if err != nil {
return errors.Wrapf(err, "rewriting expression sequence references: %q", newExpr)
}
*expr = catpb.Expression(newExpr)
return nil
}); err != nil {
return err
}
if err := screl.WalkTypes(t.Element(), func(t *types.T) error {
return rewriteIDsInTypesT(t, descriptorRewrites)
}); err != nil {
return errors.Wrap(err, "rewriting user-defined type references")
}
// TODO(ajwerner): Remember to rewrite views when the time comes. Currently
// views are not handled by the declarative schema changer.
}
if len(state.Targets) == 0 {
d.SetDeclarativeSchemaChangerState(nil)
}
return nil
}
Expand Down Expand Up @@ -1200,6 +1289,9 @@ func RewriteTableDescs(
return err
}
}
if err := rewriteSchemaChangerState(table, descriptorRewrites); err != nil {
return err
}

table.ID = tableRewrite.ID
table.UnexposedParentSchemaID = tableRewrite.ParentSchemaID
Expand Down Expand Up @@ -1365,6 +1457,7 @@ func RewriteTableDescs(
// lease is obviously bogus (plus the nodeID is relative to backup cluster).
table.Lease = nil
}

return nil
}

Expand Down
Loading

0 comments on commit c10b5ef

Please sign in to comment.