Skip to content

Commit

Permalink
Merge #28014
Browse files Browse the repository at this point in the history
28014: sql: fix schema change reverse mutation and rollback job setup r=vivekmenezes a=vivekmenezes

This change fixes a number of problems related to the rollback
of a schema change.
1. During a rollback the reversal of a mutation would
reverse an incomplete list of mutations dependent on the reversed
mutation. It didn't pick all the mutations representing
the mutation-id being reversed, and didn't do a graph traversal
beyond a depth of 1 while picking out the mutations needing reversal.
2. The jobs associated with the schema changes reversed through the
graph traversal would be left in the pending state rather than
marked failed.
2. A job associated with a schema change would be marked failed
outside of the transaction rolling back the schema change. the
system could land up in a situation where the job was marked failed
but the schema change had not been rolled back. Or a rolled-back
schema change could be marked failed even though it could not
be marked failed.
3. The rollback job associated with the reversed schema change
was created outside of the schema change reversal transaction.
This could result in a situation where a rollback job was
created for a rollback job. A reversal of a reversal
of a schema change is not permitted and the double reversal would
fail, but the new ROLL BACK of a ROLL BACK job would still exist
because it was created outside the transaction.

fixes #27760
related to #27273
related to #27402

Co-authored-by: Vivek Menezes <[email protected]>
  • Loading branch information
craig[bot] and vivekmenezes committed Aug 2, 2018
2 parents 4b698e0 + 681cc5d commit 226ec67
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 129 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
sqlDB.Exec(t, `SET DATABASE = data`)

sqlDB.Exec(t, `BACKUP TABLE bank TO $1 INCREMENTAL FROM $2`, incDir, fullDir)
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+1, jobspb.TypeBackup, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+1, jobspb.TypeBackup, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(
`BACKUP TABLE bank TO '%s' INCREMENTAL FROM '%s'`,
Expand All @@ -486,7 +486,7 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
}

sqlDB.Exec(t, `RESTORE TABLE bank FROM $1, $2 WITH OPTIONS ('into_db'='restoredb')`, fullDir, incDir)
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+2, jobspb.TypeRestore, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+2, jobspb.TypeRestore, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(
`RESTORE TABLE bank FROM '%s', '%s' WITH into_db = 'restoredb'`,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ func TestImportCSVStmt(t *testing.T) {
}
jobPrefix += `t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s)`

if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+testNum, jobspb.TypeImport, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+testNum, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(jobPrefix+tc.jobOpts, strings.Join(tc.files, ", ")),
}); err != nil {
Expand Down
285 changes: 195 additions & 90 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ type SchemaChanger struct {
testingKnobs *SchemaChangerTestingKnobs
distSQLPlanner *DistSQLPlanner
jobRegistry *jobs.Registry
job *jobs.Job
// Keep a reference to the job related to this schema change
// so that we don't need to read the job again while updating
// the status of the job. This job can be one of two jobs: the
// original schema change job for the sql command, or the
// rollback job for the rollback of the schema change.
job *jobs.Job
// Caches updated by DistSQL.
rangeDescriptorCache *kv.RangeDescriptorCache
leaseHolderCache *kv.LeaseHolderCache
Expand Down Expand Up @@ -656,11 +661,6 @@ func (sc *SchemaChanger) rollbackSchemaChange(
evalCtx *extendedEvalContext,
) error {
log.Warningf(ctx, "reversing schema change %d due to irrecoverable error: %s", *sc.job.ID(), err)
if err := sc.job.Failed(ctx, err, jobs.NoopFn); err != nil {
log.Warningf(ctx, "schema change ignoring error while marking job %d as failed: %+v",
*sc.job.ID(), err)
}

if errReverse := sc.reverseMutations(ctx, err); errReverse != nil {
// Although the backfill did hit an integrity constraint violation
// and made a decision to reverse the mutations,
Expand Down Expand Up @@ -883,15 +883,25 @@ func (sc *SchemaChanger) runStateMachineAndBackfill(
// all new indexes referencing the column will also be dropped.
func (sc *SchemaChanger) reverseMutations(ctx context.Context, causingError error) error {
// Reverse the flow of the state machine.
var scJob *jobs.Job
// All the mutations dropped by the reversal of the schema change.
// This is created by traversing the mutations list like a graph
// where the indexes refer columns. Whenever a column schema change
// is reversed, any index mutation referencing it is also reversed.
var droppedMutations map[sqlbase.MutationID]struct{}
_, err := sc.leaseMgr.Publish(ctx, sc.tableID, func(desc *sqlbase.TableDescriptor) error {
// Keep track of the column mutations being reversed so that indexes
// referencing them can be dropped.
columns := make(map[string]struct{})
droppedMutations = nil

for i, mutation := range desc.Mutations {
if mutation.MutationID != sc.mutationID {
// Only reverse the first set of mutations if they have the
// mutation ID we're looking for.
if i == 0 {
return errDidntUpdateDescriptor
}
break
}

Expand All @@ -901,72 +911,43 @@ func (sc *SchemaChanger) reverseMutations(ctx context.Context, causingError erro
return errors.Errorf("mutation already rolled back: %v", mutation)
}

jobID, err := sc.getJobIDForMutationWithDescriptor(ctx, desc, mutation.MutationID)
if err != nil {
return err
}
job, err := sc.jobRegistry.LoadJob(ctx, jobID)
if err != nil {
return err
}

details, ok := job.Details().(jobspb.SchemaChangeDetails)
if !ok {
// TODO(mjibson): should this be `job`, not `sc.job`?
return errors.Errorf("expected SchemaChangeDetails job type, got %T", sc.job.Details())
}
details.ResumeSpanList[i].ResumeSpans = nil
err = job.SetDetails(ctx, details)
if err != nil {
return err
}

log.Warningf(ctx, "reverse schema change mutation: %+v", mutation)
switch mutation.Direction {
case sqlbase.DescriptorMutation_ADD:
desc.Mutations[i].Direction = sqlbase.DescriptorMutation_DROP
// A column ADD being reversed gets placed in the map.
if col := mutation.GetColumn(); col != nil {
columns[col.Name] = struct{}{}
}
desc.Mutations[i], columns = reverseMutation(mutation, false /*notStarted*/, columns)

case sqlbase.DescriptorMutation_DROP:
desc.Mutations[i].Direction = sqlbase.DescriptorMutation_ADD
}
desc.Mutations[i].Rollback = true
}

for i := range desc.MutationJobs {
if desc.MutationJobs[i].MutationID == sc.mutationID {
// Create a roll back job.
oldJobPayload := sc.job.Payload()
job := sc.jobRegistry.NewJob(jobs.Record{
Description: "ROLL BACK " + oldJobPayload.Description,
Username: oldJobPayload.Username,
DescriptorIDs: oldJobPayload.DescriptorIDs,
Details: oldJobPayload.UnwrapDetails(),
Progress: jobspb.SchemaChangeProgress{},
})
if err := job.Created(ctx); err != nil {
return err
}
if err := job.Started(ctx); err != nil {
return err
}
desc.MutationJobs[i].JobID = *job.ID()
sc.job = job
break
}
}

// Delete index mutations that reference any of the reversed columns.
// Delete all mutations that reference any of the reversed columns
// by running a graph traversal of the mutations.
if len(columns) > 0 {
sc.deleteIndexMutationsWithReversedColumns(ctx, desc, columns)
droppedMutations = sc.deleteIndexMutationsWithReversedColumns(ctx, desc, columns)
}

// Publish() will increment the version.
return nil
}, func(txn *client.Txn) error {
// Read the table descriptor from the store. The Version of the
// descriptor has already been incremented in the transaction and
// this descriptor can be modified without incrementing the version.
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID)
if err != nil {
return err
}

// Mark the schema change job as failed and create a rollback job.
scJob, err = sc.createRollbackJob(ctx, txn, tableDesc, causingError)
if err != nil {
return err
}

// Mark other reversed mutation jobs as failed.
for m := range droppedMutations {
_, err := sc.markJobFailed(ctx, txn, tableDesc, m, causingError)
if err != nil {
return err
}
}

// Log "Reverse Schema Change" event. Only the causing error and the
// mutation ID are logged; this can be correlated with the DDL statement
// that initiated the change using the mutation id.
Expand All @@ -982,45 +963,169 @@ func (sc *SchemaChanger) reverseMutations(ctx context.Context, causingError erro
}{fmt.Sprintf("%+v", causingError), uint32(sc.mutationID)},
)
})
return err
if err != nil {
return err
}
// Only update the job if the transaction has succeeded. The schame change
// job will now references the rollback job.
if scJob != nil {
sc.job = scJob
return scJob.Started(ctx)
}
return nil
}

// Mark the job associated with the mutation as failed.
func (sc *SchemaChanger) markJobFailed(
ctx context.Context,
txn *client.Txn,
tableDesc *sqlbase.TableDescriptor,
mutationID sqlbase.MutationID,
causingError error,
) (*jobs.Job, error) {
// Mark job as failed.
jobID, err := sc.getJobIDForMutationWithDescriptor(ctx, tableDesc, mutationID)
if err != nil {
return nil, err
}
job, err := sc.jobRegistry.LoadJobWithTxn(ctx, jobID, txn)
if err != nil {
return nil, err
}
err = job.WithTxn(txn).Failed(ctx, causingError, jobs.NoopFn)
return job, err
}

// Mark the current schema change job as failed and create a new rollback job
// representing the schema change and return it.
func (sc *SchemaChanger) createRollbackJob(
ctx context.Context, txn *client.Txn, tableDesc *sqlbase.TableDescriptor, causingError error,
) (*jobs.Job, error) {

// Mark job as failed.
job, err := sc.markJobFailed(ctx, txn, tableDesc, sc.mutationID, causingError)
if err != nil {
return nil, err
}

// Create a new rollback job representing the reversal of the mutations.
for i := range tableDesc.MutationJobs {
if tableDesc.MutationJobs[i].MutationID == sc.mutationID {
// Create a roll back job.
payload := job.Payload()
rollbackJob := sc.jobRegistry.NewJob(jobs.Record{
Description: "ROLL BACK " + payload.Description,
Username: payload.Username,
DescriptorIDs: payload.DescriptorIDs,
Details: payload.UnwrapDetails(),
Progress: jobspb.SchemaChangeProgress{},
})
if err := rollbackJob.WithTxn(txn).Created(ctx); err != nil {
return nil, err
}
// Set the transaction back to nil so that this job can
// be used in other transactions.
rollbackJob.WithTxn(nil)

tableDesc.MutationJobs[i].JobID = *rollbackJob.ID()

// write descriptor, the version has already been incremented.
descKey := sqlbase.MakeDescMetadataKey(tableDesc.GetID())
descVal := sqlbase.WrapDescriptor(tableDesc)
b := txn.NewBatch()
b.Put(descKey, descVal)
if err := txn.Run(ctx, b); err != nil {
return nil, err
}
return rollbackJob, nil
}
}
// Cannot get here.
return nil, fmt.Errorf("no job found for table %d mutation %d", sc.tableID, sc.mutationID)
}

// deleteIndexMutationsWithReversedColumns deletes index mutations with a
// different mutationID than the schema changer and a reference to one of the
// reversed columns.
// deleteIndexMutationsWithReversedColumns deletes mutations with a
// different mutationID than the schema changer and with an index that
// references one of the reversed columns. Execute this as a breadth
// first search graph traversal.
func (sc *SchemaChanger) deleteIndexMutationsWithReversedColumns(
ctx context.Context, desc *sqlbase.TableDescriptor, columns map[string]struct{},
) {
newMutations := make([]sqlbase.DescriptorMutation, 0, len(desc.Mutations))
for _, mutation := range desc.Mutations {
if mutation.MutationID != sc.mutationID {
if idx := mutation.GetIndex(); idx != nil {
deleteMutation := false
for _, name := range idx.ColumnNames {
if _, ok := columns[name]; ok {
// Such an index mutation has to be with direction ADD and
// in the DELETE_ONLY state. Live indexes referencing live
// columns cannot be deleted and thus never have direction
// DROP. All mutations with the ADD direction start off in
// the DELETE_ONLY state.
if mutation.Direction != sqlbase.DescriptorMutation_ADD ||
mutation.State != sqlbase.DescriptorMutation_DELETE_ONLY {
panic(fmt.Sprintf("mutation in bad state: %+v", mutation))
) map[sqlbase.MutationID]struct{} {
dropMutations := make(map[sqlbase.MutationID]struct{})
// Run breadth first search traversal that reverses mutations
for {
start := len(dropMutations)
for _, mutation := range desc.Mutations {
if mutation.MutationID != sc.mutationID {
if idx := mutation.GetIndex(); idx != nil {
for _, name := range idx.ColumnNames {
if _, ok := columns[name]; ok {
// Such an index mutation has to be with direction ADD and
// in the DELETE_ONLY state. Live indexes referencing live
// columns cannot be deleted and thus never have direction
// DROP. All mutations with the ADD direction start off in
// the DELETE_ONLY state.
if mutation.Direction != sqlbase.DescriptorMutation_ADD ||
mutation.State != sqlbase.DescriptorMutation_DELETE_ONLY {
panic(fmt.Sprintf("mutation in bad state: %+v", mutation))
}
log.Warningf(ctx, "drop schema change mutation: %+v", mutation)
dropMutations[mutation.MutationID] = struct{}{}
break
}
log.Warningf(ctx, "delete schema change mutation: %+v", mutation)
deleteMutation = true
break
}
}
if deleteMutation {
continue
}
}
}
newMutations = append(newMutations, mutation)

if len(dropMutations) == start {
// No more mutations to drop.
break
}
// Drop mutations.
newMutations := make([]sqlbase.DescriptorMutation, 0, len(desc.Mutations))
for _, mutation := range desc.Mutations {
if _, ok := dropMutations[mutation.MutationID]; ok {
// Reverse mutation. Update columns to reflect additional
// columns that have been purged. This mutation doesn't need
// a rollback because it was not started.
mutation, columns = reverseMutation(mutation, true /*notStarted*/, columns)
// Mark as complete because this mutation needs no backfill.
desc.MakeMutationComplete(mutation)
} else {
newMutations = append(newMutations, mutation)
}
}
// Reset mutations.
desc.Mutations = newMutations
}
return dropMutations
}

// Reverse a mutation. Returns the updated mutation and updated columns.
// notStarted is set to true only if the schema change state machine
// was not started for the mutation.
func reverseMutation(
mutation sqlbase.DescriptorMutation, notStarted bool, columns map[string]struct{},
) (sqlbase.DescriptorMutation, map[string]struct{}) {
switch mutation.Direction {
case sqlbase.DescriptorMutation_ADD:
mutation.Direction = sqlbase.DescriptorMutation_DROP
// A column ADD being reversed gets placed in the map.
if col := mutation.GetColumn(); col != nil {
columns[col.Name] = struct{}{}
}
if notStarted && mutation.State != sqlbase.DescriptorMutation_DELETE_ONLY {
panic(fmt.Sprintf("mutation in bad state: %+v", mutation))
}

case sqlbase.DescriptorMutation_DROP:
mutation.Direction = sqlbase.DescriptorMutation_ADD
if notStarted && mutation.State != sqlbase.DescriptorMutation_DELETE_AND_WRITE_ONLY {
panic(fmt.Sprintf("mutation in bad state: %+v", mutation))
}
}
// Reset mutations.
desc.Mutations = newMutations
return mutation, columns
}

// TestingSchemaChangerCollection is an exported (for testing) version of
Expand Down
Loading

0 comments on commit 226ec67

Please sign in to comment.