Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.2: sql: fix bug where bad mutation job state could block dropping tables #58255

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -426,7 +429,20 @@ func (p *planner) initiateDropTable(
tableDesc.DrainingNames = append(tableDesc.DrainingNames, nameDetails)
}

// Mark all jobs scheduled for schema changes as successful.
// For this table descriptor, mark all previous jobs scheduled for schema changes as successful
// and delete them from the schema change job cache.
//
// Since the table is being dropped, any previous schema changes to the table do not need to complete
// and can be put in a terminal state such as Succeeded. Deleting the jobs from the cache ensures that
// subsequent schema changes in the transaction (ie. this drop table statement) do not get a cache hit
// and do not try to update succeeded jobs, which would raise an error. Instead, this drop table
// statement will create a new job to drop the table.
//
// Note that we still wait for jobs removed from the cache to finish running
// after the transaction, since they're not removed from the jobsCollection.
// Also, changes made here do not affect schema change jobs created in this
// transaction with no mutation ID; they remain in the cache, and will be
// updated when writing the job record to drop the table.
jobIDs := make(map[int64]struct{})
var id descpb.MutationID
for _, m := range tableDesc.Mutations {
Expand All @@ -440,11 +456,42 @@ func (p *planner) initiateDropTable(
}
}
for jobID := range jobIDs {
if err := p.ExecCfg().JobRegistry.Succeeded(ctx, p.txn, jobID); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as as successful", errors.Safe(jobID))
// Mark jobs as succeeded when possible, but be defensive about jobs that
// are already in a terminal state or nonexistent. This could happen for
// schema change jobs that couldn't be successfully reverted and ended up in
// a failed state. Such jobs could have already been GCed from the jobs
// table by the time this code runs.
mutationJob, err := p.execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
if jobs.HasJobNotFoundError(err) {
log.Warningf(ctx, "mutation job %d not found", jobID)
continue
}
return err
}
if err := mutationJob.WithTxn(p.txn).Update(
ctx, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
status := md.Status
switch status {
case jobs.StatusSucceeded, jobs.StatusCanceled, jobs.StatusFailed:
log.Warningf(ctx, "mutation job %d in unexpected state %s", jobID, status)
return nil
case jobs.StatusRunning, jobs.StatusPending:
status = jobs.StatusSucceeded
default:
// We shouldn't mark jobs as succeeded if they're not in a state where
// they're eligible to ever succeed, so mark them as failed.
status = jobs.StatusFailed
}
log.Infof(ctx, "marking mutation job %d for dropped table as %s", jobID, status)
ju.UpdateStatus(status)
return nil
}); err != nil {
return errors.Wrap(err, "updating mutation job for dropped table")
}
delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID)
}

// Initiate an immediate schema change. When dropping a table
// in a session, the data and the descriptor are not deleted.
// Instead, that is taken care of asynchronously by the schema
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/drop_table
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,25 @@ user testuser
# Being the owner of schema s should allow testuser to drop table s.t.
statement ok
DROP TABLE s.t

# Verify that a table can successfully be dropped after performing
# a schema change to the table in the same transaction.
# See https://github.com/cockroachdb/cockroach/issues/56235.
subtest drop_after_schema_change_in_txn
statement ok
CREATE TABLE to_drop();

statement ok
BEGIN;

statement ok
ALTER TABLE to_drop ADD COLUMN foo int;

statement ok
DROP TABLE to_drop;

statement ok
COMMIT;

statement error pgcode 42P01 relation "to_drop" does not exist
DROP TABLE to_drop;
85 changes: 80 additions & 5 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6093,6 +6093,67 @@ CREATE UNIQUE INDEX i ON t.test(v);
})
}

// TestDropTableWhileSchemaChangeReverting tests that schema changes in the
// reverting state end up as failed when the table is dropped.
func TestDropTableWhileSchemaChangeReverting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer setTestJobsAdoptInterval()()
ctx := context.Background()

// Closed when we enter the RunBeforeOnFailOrCancel knob, at which point the
// job is in the reverting state.
beforeOnFailOrCancelNotification := make(chan struct{})
// Closed when we're ready to continue with the schema change (rollback).
continueNotification := make(chan struct{})
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeOnFailOrCancel: func(_ int64) error {
close(beforeOnFailOrCancelNotification)
<-continueNotification
// Return a retry error, so that we can be sure to test the path where
// the job is marked as failed by the DROP TABLE instead of running to
// completion and ending up in the failed state on its own.
return jobs.NewRetryJobError("injected retry error")
},
},
}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT8);
INSERT INTO t.test VALUES (1, 2), (2, 2);
`)
require.NoError(t, err)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
// Try to create a unique index which won't be valid and will need a rollback.
_, err := sqlDB.Exec(`CREATE UNIQUE INDEX i ON t.test(v);`)
assert.Regexp(t, "violates unique constraint", err)
return nil
})

<-beforeOnFailOrCancelNotification

_, err = sqlDB.Exec(`DROP TABLE t.test;`)
require.NoError(t, err)

close(continueNotification)
require.NoError(t, g.Wait())

var status jobs.Status
var jobError string
require.NoError(t, sqlDB.QueryRow(`
SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNIQUE INDEX%'
`).Scan(&status, &jobError))
require.Equal(t, jobs.StatusFailed, status)
require.Regexp(t, "violates unique constraint", jobError)
}

// TestPermanentErrorDuringRollback tests that a permanent error while rolling
// back a schema change causes the job to fail, and that the appropriate error
// is displayed in the jobs table.
Expand All @@ -6102,7 +6163,7 @@ func TestPermanentErrorDuringRollback(t *testing.T) {
defer setTestJobsAdoptInterval()()
ctx := context.Background()

runTest := func(params base.TestServerArgs) {
runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) {
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

Expand All @@ -6119,10 +6180,20 @@ CREATE UNIQUE INDEX i ON t.test(v);
`)
require.Regexp(t, "violates unique constraint", err.Error())

var jobID int64
var jobErr string
row := sqlDB.QueryRow("SELECT error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobErr))
row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'")
require.NoError(t, row.Scan(&jobID, &jobErr))
require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr)

if gcJobRecord {
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID)
require.NoError(t, err)
}

// Test that dropping the table is still possible.
_, err = sqlDB.Exec(`DROP TABLE t.test`)
require.NoError(t, err)
}

t.Run("error-before-backfill", func(t *testing.T) {
Expand All @@ -6147,7 +6218,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// Don't GC the job record after the schema change, so we can test dropping
// the table with a failed mutation job.
runTest(t, params, false /* gcJobRecord */)
})

t.Run("error-before-reversing-mutations", func(t *testing.T) {
Expand All @@ -6172,7 +6245,9 @@ CREATE UNIQUE INDEX i ON t.test(v);
},
},
}
runTest(params)
// GC the job record after the schema change, so we can test dropping the
// table with a nonexistent mutation job.
runTest(t, params, true /* gcJobRecord */)
})
}

Expand Down