Skip to content

Commit

Permalink
sql: fix bug where bad mutation job state could block dropping tables
Browse files Browse the repository at this point in the history
Previously, while dropping a table, we would mark all the jobs
associated with mutations on the table as `succeeded`, under the
assumption that they were running. The job registry API prohibits this
when the jobs are not `running` (or `pending`), so if a mutation was
stuck on the table descriptor with a failed or nonexistent job, dropping
the table would fail.

This PR fixes the bug by checking the job state before attempting to
update the job. It also fixes a related failure to drop a table caused
by a valid mutation job not being in a `running` state.

Release note (bug fix): Fixed a bug where prior schema changes on a
table that failed and could not be fully reverted could prevent the
table from being dropped.
  • Loading branch information
thoszhang committed Dec 23, 2020
1 parent 7f0b87f commit 6505359
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 8 deletions.
49 changes: 46 additions & 3 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -442,6 +445,12 @@ func (p *planner) initiateDropTable(
// subsequent schema changes in the transaction (ie. this drop table statement) do not get a cache hit
// and do not try to update succeeded jobs, which would raise an error. Instead, this drop table
// statement will create a new job to drop the table.
//
// Note that we still wait for jobs removed from the cache to finish running
// after the transaction, since they're not removed from the jobsCollection.
// Also, changes made here do not affect schema change jobs created in this
// transaction with no mutation ID; they remain in the cache, and will be
// updated when writing the job record to drop the table.
jobIDs := make(map[int64]struct{})
var id descpb.MutationID
for _, m := range tableDesc.Mutations {
Expand All @@ -455,9 +464,43 @@ func (p *planner) initiateDropTable(
}
}
for jobID := range jobIDs {
if err := p.ExecCfg().JobRegistry.Succeeded(ctx, p.txn, jobID); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as as successful", errors.Safe(jobID))
// Mark jobs as succeeded when possible, but be defensive about jobs that
// are already in a terminal state or nonexistent. This could happen for
// schema change jobs that couldn't be successfully reverted and ended up in
// a failed state. Such jobs could have already been GCed from the jobs
// table by the time this code runs.

// First, see if the job exists.
if row, err := p.ExtendedEvalContext().InternalExecutor.(*InternalExecutor).QueryRowEx(
ctx, "query mutation job", p.txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
`SELECT 1 FROM system.jobs WHERE id = $1`, jobID,
); err != nil {
return err
} else if row == nil {
log.Warningf(ctx, "mutation job %d not found", jobID)
continue
}

// Update the job status.
if err := p.execCfg.JobRegistry.UpdateJobWithTxn(
ctx, jobID, p.txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
status := md.Status
switch status {
case jobs.StatusSucceeded, jobs.StatusCanceled, jobs.StatusFailed:
log.Warningf(ctx, "mutation job %d in unexpected state %s", jobID, status)
return nil
case jobs.StatusRunning, jobs.StatusPending:
status = jobs.StatusSucceeded
default:
// We shouldn't mark jobs as succeeded if they're not in a state where
// they're eligible to ever succeed, so mark them as failed.
status = jobs.StatusFailed
}
log.Infof(ctx, "marking mutation job %d for dropped table as %s", jobID, status)
ju.UpdateStatus(status)
return nil
}); err != nil {
return errors.Wrap(err, "updating mutation job for dropped table")
}
delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID)
}
Expand Down
85 changes: 80 additions & 5 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6114,6 +6114,67 @@ SELECT value
})
}

// TestDropTableWhileSchemaChangeReverting tests that schema changes in the
// reverting state end up as failed when the table is dropped.
func TestDropTableWhileSchemaChangeReverting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setTestJobsAdoptInterval()()
ctx := context.Background()

// Closed when we enter the RunBeforeOnFailOrCancel knob, at which point the
// job is in the reverting state.
beforeOnFailOrCancelNotification := make(chan struct{})
// Closed when we're ready to continue with the schema change (rollback).
continueNotification := make(chan struct{})
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeOnFailOrCancel: func(_ int64) error {
close(beforeOnFailOrCancelNotification)
<-continueNotification
// Return a retry error, so that we can be sure to test the path where
// the job is marked as failed by the DROP TABLE instead of running to
// completion and ending up in the failed state on its own.
return jobs.NewRetryJobError("injected retry error")
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT8);
INSERT INTO t.test VALUES (1, 2), (2, 2);
`)
require.NoError(t, err)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
// Try to create a unique index which won't be valid and will need a rollback.
_, err = sqlDB.Exec(`CREATE UNIQUE INDEX i ON t.test(v);`)
assert.Regexp(t, "violates unique constraint", err)
return nil
})

<-beforeOnFailOrCancelNotification

_, err = sqlDB.Exec(`DROP TABLE t.test;`)
require.NoError(t, err)

close(continueNotification)
require.NoError(t, g.Wait())

var status jobs.Status
var jobError string
require.NoError(t, sqlDB.QueryRow(`
SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNIQUE INDEX%'
`).Scan(&status, &jobError))
require.Equal(t, jobs.StatusFailed, status)
require.Regexp(t, "violates unique constraint", jobError)
}

// TestPermanentErrorDuringRollback tests that a permanent error while rolling
// back a schema change causes the job to fail, and that the appropriate error
// is displayed in the jobs table.
Expand All @@ -6123,7 +6184,7 @@ func TestPermanentErrorDuringRollback(t *testing.T) {
defer setTestJobsAdoptInterval()()
ctx := context.Background()

runTest := func(params base.TestServerArgs) {
runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) {
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

Expand All @@ -6140,10 +6201,20 @@ CREATE UNIQUE INDEX i ON t.test(v);
`)
require.Regexp(t, "violates unique constraint", err.Error())

var jobID int64
var jobErr string
row := sqlDB.QueryRow("SELECT error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobErr))
row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobID, &jobErr))
require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr)

if gcJobRecord {
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID)
require.NoError(t, err)
}

// Test that dropping the table is still possible.
_, err = sqlDB.Exec(`DROP TABLE t.test`)
require.NoError(t, err)
}

t.Run("error-before-backfill", func(t *testing.T) {
Expand All @@ -6168,7 +6239,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// Don't GC the job record after the schema change, so we can test dropping
// the table with a failed mutation job.
runTest(t, params, false /* gcJobRecord */)
})

t.Run("error-before-reversing-mutations", func(t *testing.T) {
Expand All @@ -6193,7 +6266,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// GC the job record after the schema change, so we can test dropping the
// table with a nonexistent mutation job.
runTest(t, params, true /* gcJobRecord */)
})
}

Expand Down

0 comments on commit 6505359

Please sign in to comment.