Skip to content

Commit

Permalink
importer: clean up importResumer.dropTables
Browse files Browse the repository at this point in the history
Previously, importResumer.DropTables() assumed that IMPORT INTO could act upon
multiple tables, which isn't actually the case, leading to overly complex code.
This pr is a simple refactor, in preparation for more import rollback work
in cockroachdb#76722 and cockroachdb#70428.

Release note: none
  • Loading branch information
msbutler committed Jul 21, 2022
1 parent 8d4c9bf commit fbaf116
Showing 1 changed file with 53 additions and 36 deletions.
89 changes: 53 additions & 36 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,6 @@ func (r *importResumer) dropTables(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)
dropTime := int64(1)

// If the prepare step of the import job was not completed then the
// descriptors do not need to be rolled back as the txn updating them never
Expand All @@ -1462,31 +1461,33 @@ func (r *importResumer) dropTables(
return nil
}

var revert []catalog.TableDescriptor
var empty []catalog.TableDescriptor
var tableWasEmpty bool
var intoTable catalog.TableDescriptor
for _, tbl := range details.Tables {
if !tbl.IsNew {
desc, err := descsCol.GetMutableTableVersionByID(ctx, tbl.Desc.ID, txn)
if err != nil {
return err
}
imm := desc.ImmutableCopy().(catalog.TableDescriptor)
if tbl.WasEmpty {
empty = append(empty, imm)
} else {
revert = append(revert, imm)
}
intoTable = desc.ImmutableCopy().(catalog.TableDescriptor)
tableWasEmpty = tbl.WasEmpty
break
}
}

if intoTable == nil {
// Rolling back IMPORT (i.e. not IMPORT INTO), where for all tables tbl.IsNew==true
return r.dropNewTables(ctx, txn, descsCol, execCfg)
}
// Clear table data from a rolling back IMPORT INTO cmd
//
// The walltime can be 0 if there is a failure between publishing the tables
// as OFFLINE and then choosing a ingestion timestamp. This might happen
// while waiting for the descriptor version to propagate across the cluster
// for example.
//
// In this case, we don't want to rollback the data since data ingestion has
// not yet begun (since we have not chosen a timestamp at which to ingest.)
if details.Walltime != 0 && len(revert) > 0 {
if details.Walltime != 0 && !tableWasEmpty {
// NB: if a revert fails it will abort the rest of this failure txn, which is
// also what brings tables back online. We _could_ change the error handling
// or just move the revert into Resume()'s error return path, however it isn't
Expand All @@ -1502,23 +1503,44 @@ func (r *importResumer) dropTables(
// writes, so even if GC has run it would not have GC'ed any keys to which
// we need to revert, so we can safely ignore the target-time GC check.
const ignoreGC = true
if err := sql.RevertTables(ctx, txn.DB(), execCfg, revert, ts, ignoreGC, sql.RevertTableDefaultBatchSize); err != nil {
if err := sql.RevertTables(ctx, txn.DB(), execCfg, []catalog.TableDescriptor{intoTable}, ts, ignoreGC,
sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT")
}
}

for i := range empty {
} else if tableWasEmpty {
// Set a DropTime on the table descriptor to differentiate it from an
// older-format (v1.1) descriptor. This enables ClearTableData to use a
// RangeClear for faster data removal, rather than removing by chunks.
empty[i].TableDesc().DropTime = dropTime
intoTable.TableDesc().DropTime = int64(1)
if err := gcjob.ClearTableData(
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, &execCfg.Settings.SV, empty[i],
ctx, execCfg.DB, execCfg.DistSender, execCfg.Codec, &execCfg.Settings.SV, intoTable,
); err != nil {
return errors.Wrapf(err, "clearing data for table %d", empty[i].GetID())
return errors.Wrapf(err, "clearing data for table %d", intoTable.GetID())
}
}

// Bring the IMPORT INTO table back online
b := txn.NewBatch()
intoDesc, err := descsCol.GetMutableTableVersionByID(ctx, intoTable.GetID(), txn)
if err != nil {
return err
}
intoDesc.SetPublic()
const kvTrace = false
if err := descsCol.WriteDescToBatch(ctx, kvTrace, intoDesc, b); err != nil {
return err
}
return errors.Wrap(txn.Run(ctx, b), "putting IMPORT INTO table back online")
}

// dropNewTables drops the tables that were created as part of an IMPORT and
// queues a GC job to clean up the dropped descriptors.
func (r *importResumer) dropNewTables(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)
dropTime := int64(1)

b := txn.NewBatch()
tablesToGC := make([]descpb.ID, 0, len(details.Tables))
toWrite := make([]*tabledesc.Mutable, 0, len(details.Tables))
Expand All @@ -1527,22 +1549,18 @@ func (r *importResumer) dropTables(
if err != nil {
return err
}
if tbl.IsNew {
newTableDesc.SetDropped()
// If the DropTime if set, a table uses RangeClear for fast data removal. This
// operation starts at DropTime + the GC TTL. If we used now() here, it would
// not clean up data until the TTL from the time of the error. Instead, use 1
// (that is, 1ns past the epoch) to allow this to be cleaned up as soon as
// possible. This is safe since the table data was never visible to users,
// and so we don't need to preserve MVCC semantics.
newTableDesc.DropTime = dropTime
b.Del(catalogkeys.EncodeNameKey(execCfg.Codec, newTableDesc))
tablesToGC = append(tablesToGC, newTableDesc.ID)
descsCol.AddDeletedDescriptor(newTableDesc.GetID())
} else {
// IMPORT did not create this table, so we should not drop it.
newTableDesc.SetPublic()
}
newTableDesc.SetDropped()
// If the DropTime if set, a table uses RangeClear for fast data removal. This
// operation starts at DropTime + the GC TTL. If we used now() here, it would
// not clean up data until the TTL from the time of the error. Instead, use 1
// (that is, 1ns past the epoch) to allow this to be cleaned up as soon as
// possible. This is safe since the table data was never visible to users,
// and so we don't need to preserve MVCC semantics.
newTableDesc.DropTime = dropTime
b.Del(catalogkeys.EncodeNameKey(execCfg.Codec, newTableDesc))
tablesToGC = append(tablesToGC, newTableDesc.ID)
descsCol.AddDeletedDescriptor(newTableDesc.GetID())

// Accumulate the changes before adding them to the batch to avoid
// making any table invalid before having read it.
toWrite = append(toWrite, newTableDesc)
Expand Down Expand Up @@ -1574,8 +1592,7 @@ func (r *importResumer) dropTables(
ctx, gcJobRecord, execCfg.JobRegistry.MakeJobID(), txn); err != nil {
return err
}

return errors.Wrap(txn.Run(ctx, b), "rolling back tables")
return errors.Wrap(txn.Run(ctx, b), "rolling back IMPORT tables")
}

func (r *importResumer) dropSchemas(
Expand Down

0 comments on commit fbaf116

Please sign in to comment.