Skip to content

Commit

Permalink
Merge pull request #58255 from lucy-zhang/backport20.2-56589-57836
Browse files Browse the repository at this point in the history
release-20.2: sql: fix bug where bad mutation job state could block dropping tables
  • Loading branch information
thoszhang authored Dec 24, 2020
2 parents a1768fe + 43f27d6 commit 4ff7891
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 9 deletions.
55 changes: 51 additions & 4 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -426,7 +429,20 @@ func (p *planner) initiateDropTable(
tableDesc.DrainingNames = append(tableDesc.DrainingNames, nameDetails)
}

// Mark all jobs scheduled for schema changes as successful.
// For this table descriptor, mark all previous jobs scheduled for schema changes as successful
// and delete them from the schema change job cache.
//
// Since the table is being dropped, any previous schema changes to the table do not need to complete
// and can be put in a terminal state such as Succeeded. Deleting the jobs from the cache ensures that
// subsequent schema changes in the transaction (ie. this drop table statement) do not get a cache hit
// and do not try to update succeeded jobs, which would raise an error. Instead, this drop table
// statement will create a new job to drop the table.
//
// Note that we still wait for jobs removed from the cache to finish running
// after the transaction, since they're not removed from the jobsCollection.
// Also, changes made here do not affect schema change jobs created in this
// transaction with no mutation ID; they remain in the cache, and will be
// updated when writing the job record to drop the table.
jobIDs := make(map[int64]struct{})
var id descpb.MutationID
for _, m := range tableDesc.Mutations {
Expand All @@ -440,11 +456,42 @@ func (p *planner) initiateDropTable(
}
}
for jobID := range jobIDs {
if err := p.ExecCfg().JobRegistry.Succeeded(ctx, p.txn, jobID); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as as successful", errors.Safe(jobID))
// Mark jobs as succeeded when possible, but be defensive about jobs that
// are already in a terminal state or nonexistent. This could happen for
// schema change jobs that couldn't be successfully reverted and ended up in
// a failed state. Such jobs could have already been GCed from the jobs
// table by the time this code runs.
mutationJob, err := p.execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
if jobs.HasJobNotFoundError(err) {
log.Warningf(ctx, "mutation job %d not found", jobID)
continue
}
return err
}
if err := mutationJob.WithTxn(p.txn).Update(
ctx, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
status := md.Status
switch status {
case jobs.StatusSucceeded, jobs.StatusCanceled, jobs.StatusFailed:
log.Warningf(ctx, "mutation job %d in unexpected state %s", jobID, status)
return nil
case jobs.StatusRunning, jobs.StatusPending:
status = jobs.StatusSucceeded
default:
// We shouldn't mark jobs as succeeded if they're not in a state where
// they're eligible to ever succeed, so mark them as failed.
status = jobs.StatusFailed
}
log.Infof(ctx, "marking mutation job %d for dropped table as %s", jobID, status)
ju.UpdateStatus(status)
return nil
}); err != nil {
return errors.Wrap(err, "updating mutation job for dropped table")
}
delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID)
}

// Initiate an immediate schema change. When dropping a table
// in a session, the data and the descriptor are not deleted.
// Instead, that is taken care of asynchronously by the schema
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/drop_table
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,25 @@ user testuser
# Being the owner of schema s should allow testuser to drop table s.t.
statement ok
DROP TABLE s.t

# Verify that a table can successfully be dropped after performing
# a schema change to the table in the same transaction.
# See https://github.com/cockroachdb/cockroach/issues/56235.
subtest drop_after_schema_change_in_txn
statement ok
CREATE TABLE to_drop();

statement ok
BEGIN;

statement ok
ALTER TABLE to_drop ADD COLUMN foo int;

statement ok
DROP TABLE to_drop;

statement ok
COMMIT;

statement error pgcode 42P01 relation "to_drop" does not exist
DROP TABLE to_drop;
85 changes: 80 additions & 5 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6093,6 +6093,67 @@ CREATE UNIQUE INDEX i ON t.test(v);
})
}

// TestDropTableWhileSchemaChangeReverting tests that schema changes in the
// reverting state end up as failed when the table is dropped.
func TestDropTableWhileSchemaChangeReverting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setTestJobsAdoptInterval()()
ctx := context.Background()

// Closed when we enter the RunBeforeOnFailOrCancel knob, at which point the
// job is in the reverting state.
beforeOnFailOrCancelNotification := make(chan struct{})
// Closed when we're ready to continue with the schema change (rollback).
continueNotification := make(chan struct{})
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeOnFailOrCancel: func(_ int64) error {
close(beforeOnFailOrCancelNotification)
<-continueNotification
// Return a retry error, so that we can be sure to test the path where
// the job is marked as failed by the DROP TABLE instead of running to
// completion and ending up in the failed state on its own.
return jobs.NewRetryJobError("injected retry error")
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT8);
INSERT INTO t.test VALUES (1, 2), (2, 2);
`)
require.NoError(t, err)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
// Try to create a unique index which won't be valid and will need a rollback.
_, err := sqlDB.Exec(`CREATE UNIQUE INDEX i ON t.test(v);`)
assert.Regexp(t, "violates unique constraint", err)
return nil
})

<-beforeOnFailOrCancelNotification

_, err = sqlDB.Exec(`DROP TABLE t.test;`)
require.NoError(t, err)

close(continueNotification)
require.NoError(t, g.Wait())

var status jobs.Status
var jobError string
require.NoError(t, sqlDB.QueryRow(`
SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNIQUE INDEX%'
`).Scan(&status, &jobError))
require.Equal(t, jobs.StatusFailed, status)
require.Regexp(t, "violates unique constraint", jobError)
}

// TestPermanentErrorDuringRollback tests that a permanent error while rolling
// back a schema change causes the job to fail, and that the appropriate error
// is displayed in the jobs table.
Expand All @@ -6102,7 +6163,7 @@ func TestPermanentErrorDuringRollback(t *testing.T) {
defer setTestJobsAdoptInterval()()
ctx := context.Background()

runTest := func(params base.TestServerArgs) {
runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) {
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

Expand All @@ -6119,10 +6180,20 @@ CREATE UNIQUE INDEX i ON t.test(v);
`)
require.Regexp(t, "violates unique constraint", err.Error())

var jobID int64
var jobErr string
row := sqlDB.QueryRow("SELECT error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobErr))
row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobID, &jobErr))
require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr)

if gcJobRecord {
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID)
require.NoError(t, err)
}

// Test that dropping the table is still possible.
_, err = sqlDB.Exec(`DROP TABLE t.test`)
require.NoError(t, err)
}

t.Run("error-before-backfill", func(t *testing.T) {
Expand All @@ -6147,7 +6218,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// Don't GC the job record after the schema change, so we can test dropping
// the table with a failed mutation job.
runTest(t, params, false /* gcJobRecord */)
})

t.Run("error-before-reversing-mutations", func(t *testing.T) {
Expand All @@ -6172,7 +6245,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// GC the job record after the schema change, so we can test dropping the
// table with a nonexistent mutation job.
runTest(t, params, true /* gcJobRecord */)
})
}

Expand Down

0 comments on commit 4ff7891

Please sign in to comment.