Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: fix bug where bad mutation job state could block dropping tables #57836

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -442,6 +445,12 @@ func (p *planner) initiateDropTable(
// subsequent schema changes in the transaction (ie. this drop table statement) do not get a cache hit
// and do not try to update succeeded jobs, which would raise an error. Instead, this drop table
// statement will create a new job to drop the table.
//
// Note that we still wait for jobs removed from the cache to finish running
// after the transaction, since they're not removed from the jobsCollection.
// Also, changes made here do not affect schema change jobs created in this
// transaction with no mutation ID; they remain in the cache, and will be
// updated when writing the job record to drop the table.
jobIDs := make(map[int64]struct{})
var id descpb.MutationID
for _, m := range tableDesc.Mutations {
Expand All @@ -455,9 +464,38 @@ func (p *planner) initiateDropTable(
}
}
for jobID := range jobIDs {
if err := p.ExecCfg().JobRegistry.Succeeded(ctx, p.txn, jobID); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as as successful", errors.Safe(jobID))
// Mark jobs as succeeded when possible, but be defensive about jobs that
// are already in a terminal state or nonexistent. This could happen for
// schema change jobs that couldn't be successfully reverted and ended up in
// a failed state. Such jobs could have already been GCed from the jobs
// table by the time this code runs.
mutationJob, err := p.execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
if jobs.HasJobNotFoundError(err) {
log.Warningf(ctx, "mutation job %d not found", jobID)
continue
}
return err
}
if err := mutationJob.WithTxn(p.txn).Update(
ctx, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
status := md.Status
switch status {
case jobs.StatusSucceeded, jobs.StatusCanceled, jobs.StatusFailed:
log.Warningf(ctx, "mutation job %d in unexpected state %s", jobID, status)
return nil
case jobs.StatusRunning, jobs.StatusPending:
status = jobs.StatusSucceeded
default:
// We shouldn't mark jobs as succeeded if they're not in a state where
// they're eligible to ever succeed, so mark them as failed.
status = jobs.StatusFailed
}
log.Infof(ctx, "marking mutation job %d for dropped table as %s", jobID, status)
ju.UpdateStatus(status)
return nil
}); err != nil {
return errors.Wrap(err, "updating mutation job for dropped table")
}
delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID)
}
Expand Down
85 changes: 80 additions & 5 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6114,6 +6114,67 @@ SELECT value
})
}

// TestDropTableWhileSchemaChangeReverting tests that schema changes in the
// reverting state end up as failed when the table is dropped.
func TestDropTableWhileSchemaChangeReverting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setTestJobsAdoptInterval()()
ctx := context.Background()

// Closed when we enter the RunBeforeOnFailOrCancel knob, at which point the
// job is in the reverting state.
beforeOnFailOrCancelNotification := make(chan struct{})
// Closed when we're ready to continue with the schema change (rollback).
continueNotification := make(chan struct{})
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeOnFailOrCancel: func(_ int64) error {
close(beforeOnFailOrCancelNotification)
<-continueNotification
// Return a retry error, so that we can be sure to test the path where
// the job is marked as failed by the DROP TABLE instead of running to
// completion and ending up in the failed state on its own.
return jobs.NewRetryJobError("injected retry error")
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT8);
INSERT INTO t.test VALUES (1, 2), (2, 2);
`)
require.NoError(t, err)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
// Try to create a unique index which won't be valid and will need a rollback.
_, err := sqlDB.Exec(`CREATE UNIQUE INDEX i ON t.test(v);`)
assert.Regexp(t, "violates unique constraint", err)
return nil
})

<-beforeOnFailOrCancelNotification

_, err = sqlDB.Exec(`DROP TABLE t.test;`)
require.NoError(t, err)

close(continueNotification)
require.NoError(t, g.Wait())

var status jobs.Status
var jobError string
require.NoError(t, sqlDB.QueryRow(`
SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNIQUE INDEX%'
`).Scan(&status, &jobError))
require.Equal(t, jobs.StatusFailed, status)
require.Regexp(t, "violates unique constraint", jobError)
}

// TestPermanentErrorDuringRollback tests that a permanent error while rolling
// back a schema change causes the job to fail, and that the appropriate error
// is displayed in the jobs table.
Expand All @@ -6123,7 +6184,7 @@ func TestPermanentErrorDuringRollback(t *testing.T) {
defer setTestJobsAdoptInterval()()
ctx := context.Background()

runTest := func(params base.TestServerArgs) {
runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) {
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

Expand All @@ -6140,10 +6201,20 @@ CREATE UNIQUE INDEX i ON t.test(v);
`)
require.Regexp(t, "violates unique constraint", err.Error())

var jobID int64
var jobErr string
row := sqlDB.QueryRow("SELECT error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobErr))
row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobID, &jobErr))
require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr)

if gcJobRecord {
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID)
require.NoError(t, err)
}

// Test that dropping the table is still possible.
_, err = sqlDB.Exec(`DROP TABLE t.test`)
require.NoError(t, err)
}

t.Run("error-before-backfill", func(t *testing.T) {
Expand All @@ -6168,7 +6239,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// Don't GC the job record after the schema change, so we can test dropping
// the table with a failed mutation job.
runTest(t, params, false /* gcJobRecord */)
})

t.Run("error-before-reversing-mutations", func(t *testing.T) {
Expand All @@ -6193,7 +6266,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// GC the job record after the schema change, so we can test dropping the
// table with a nonexistent mutation job.
runTest(t, params, true /* gcJobRecord */)
})
}

Expand Down