Skip to content

Commit

Permalink
Merge #40439
Browse files Browse the repository at this point in the history
40439: jobs: ensure that async index dropping is needed r=pbardea a=pbardea

We drop indexes through an async schema changer which eventually gets removes an
index with `ClearRange`. When this happens the index drop job is not marked as
completed, but instead marked as waiting for GC. However, if we DROP the table
that the index references in the same transaction, then we mark the index drop
job as done and we don't want to override this.

This PR ensures and tests that we don't override already successful index dropping
jobs that would be created as explained above.

Addresses #39260.

Release note: None

Co-authored-by: Paul Bardea <[email protected]>
  • Loading branch information
craig[bot] and pbardea committed Sep 10, 2019
2 parents ba30aa9 + a46de66 commit 9ccece9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
12 changes: 12 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ func (j *Job) CheckStatus(ctx context.Context) error {
})
}

// CheckTerminalStatus returns true if the job is in a terminal status.
func (j *Job) CheckTerminalStatus(ctx context.Context) bool {
err := j.Update(ctx, func(_ *client.Txn, md JobMetadata, _ *JobUpdater) error {
if !md.Status.Terminal() {
return &InvalidStatusError{md.ID, md.Status, "checking that job status is success", md.Payload.Error}
}
return nil
})

return err == nil
}

// RunningStatus updates the detailed status of a job currently in progress.
// It sets the job's RunningStatus field to the value returned by runningStatusFn
// and persists runningStatusFn's modifications to the job's details, if any.
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/schema_change_in_txn
Original file line number Diff line number Diff line change
Expand Up @@ -1724,3 +1724,25 @@ DROP INDEX b_idx CASCADE;

statement ok
COMMIT;

# Test that deleting an index on a table that gets dropped in the same
# transaction is allowed.
subtest delete_index_and_table_in_txn

statement ok
CREATE TABLE people (id INT PRIMARY KEY, name STRING);

statement ok
CREATE INDEX people_name_index ON people (name);

statement ok
BEGIN;

statement ok
DROP INDEX people@people_name_index;

statement ok
DROP TABLE people;

statement ok
COMMIT;
29 changes: 18 additions & 11 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,17 +1315,24 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}

descs, err := sc.leaseMgr.PublishMultiple(ctx, tableIDsToUpdate, update, func(txn *client.Txn) error {
if jobSucceeded {
if err := sc.job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as successful", errors.Safe(*sc.job.ID()))
}
} else {
if err := sc.job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return RunningStatusWaitingGC, nil
}); err != nil {
return errors.Wrapf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
// If the job already has a terminal status, we shouldn't need to update
// its status again. One way this may happen is when a table is dropped
// all jobs that mutate that table are marked successful. So if is a job
// that mutates a table that is dropped in the same txn, then it will
// already be successful. These jobs don't need their status to be updated.
if !sc.job.WithTxn(txn).CheckTerminalStatus(ctx) {
if jobSucceeded {
if err := sc.job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as successful", errors.Safe(*sc.job.ID()))
}
} else {
if err := sc.job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return RunningStatusWaitingGC, nil
}); err != nil {
return errors.Wrapf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
}
}
}

Expand Down

0 comments on commit 9ccece9

Please sign in to comment.