From 6505359bf76562a104a839f27700747068d08587 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Mon, 21 Dec 2020 23:39:35 -0500 Subject: [PATCH] sql: fix bug where bad mutation job state could block dropping tables Previously, while dropping a table, we would mark all the jobs associated with mutations on the table as `succeeded`, under the assumption that they were running. The job registry API prohibits this when the jobs are not `running` (or `pending`), so if a mutation was stuck on the table descriptor with a failed or nonexistent job, dropping the table would fail. This PR fixes the bug by checking the job state before attempting to update the job. It also fixes a related failure to drop a table caused by a valid mutation job not being in a `running` state. Release note (bug fix): Fixed a bug where prior schema changes on a table that failed and could not be fully reverted could prevent the table from being dropped. --- pkg/sql/drop_table.go | 49 ++++++++++++++++++-- pkg/sql/schema_changer_test.go | 85 ++++++++++++++++++++++++++++++++-- 2 files changed, 126 insertions(+), 8 deletions(-) diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 0c3f3133ea8f..8b020d0c57bb 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -15,6 +15,8 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -442,6 +445,12 @@ func (p *planner) initiateDropTable( // subsequent schema changes in the transaction (ie. this drop table statement) do not get a cache hit // and do not try to update succeeded jobs, which would raise an error. Instead, this drop table // statement will create a new job to drop the table. + // + // Note that we still wait for jobs removed from the cache to finish running + // after the transaction, since they're not removed from the jobsCollection. + // Also, changes made here do not affect schema change jobs created in this + // transaction with no mutation ID; they remain in the cache, and will be + // updated when writing the job record to drop the table. jobIDs := make(map[int64]struct{}) var id descpb.MutationID for _, m := range tableDesc.Mutations { @@ -455,9 +464,43 @@ func (p *planner) initiateDropTable( } } for jobID := range jobIDs { - if err := p.ExecCfg().JobRegistry.Succeeded(ctx, p.txn, jobID); err != nil { - return errors.Wrapf(err, - "failed to mark job %d as as successful", errors.Safe(jobID)) + // Mark jobs as succeeded when possible, but be defensive about jobs that + // are already in a terminal state or nonexistent. This could happen for + // schema change jobs that couldn't be successfully reverted and ended up in + // a failed state. Such jobs could have already been GCed from the jobs + // table by the time this code runs. + + // First, see if the job exists. + if row, err := p.ExtendedEvalContext().InternalExecutor.(*InternalExecutor).QueryRowEx( + ctx, "query mutation job", p.txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + `SELECT 1 FROM system.jobs WHERE id = $1`, jobID, + ); err != nil { + return err + } else if row == nil { + log.Warningf(ctx, "mutation job %d not found", jobID) + continue + } + + // Update the job status. + if err := p.execCfg.JobRegistry.UpdateJobWithTxn( + ctx, jobID, p.txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + status := md.Status + switch status { + case jobs.StatusSucceeded, jobs.StatusCanceled, jobs.StatusFailed: + log.Warningf(ctx, "mutation job %d in unexpected state %s", jobID, status) + return nil + case jobs.StatusRunning, jobs.StatusPending: + status = jobs.StatusSucceeded + default: + // We shouldn't mark jobs as succeeded if they're not in a state where + // they're eligible to ever succeed, so mark them as failed. + status = jobs.StatusFailed + } + log.Infof(ctx, "marking mutation job %d for dropped table as %s", jobID, status) + ju.UpdateStatus(status) + return nil + }); err != nil { + return errors.Wrap(err, "updating mutation job for dropped table") } delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID) } diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 36bb0e988493..15b57d3d9a66 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -6114,6 +6114,67 @@ SELECT value }) } +// TestDropTableWhileSchemaChangeReverting tests that schema changes in the +// reverting state end up as failed when the table is dropped. +func TestDropTableWhileSchemaChangeReverting(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer setTestJobsAdoptInterval()() + ctx := context.Background() + + // Closed when we enter the RunBeforeOnFailOrCancel knob, at which point the + // job is in the reverting state. + beforeOnFailOrCancelNotification := make(chan struct{}) + // Closed when we're ready to continue with the schema change (rollback). + continueNotification := make(chan struct{}) + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeOnFailOrCancel: func(_ int64) error { + close(beforeOnFailOrCancelNotification) + <-continueNotification + // Return a retry error, so that we can be sure to test the path where + // the job is marked as failed by the DROP TABLE instead of running to + // completion and ending up in the failed state on its own. + return jobs.NewRetryJobError("injected retry error") + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); +INSERT INTO t.test VALUES (1, 2), (2, 2); +`) + require.NoError(t, err) + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + // Try to create a unique index which won't be valid and will need a rollback. + _, err = sqlDB.Exec(`CREATE UNIQUE INDEX i ON t.test(v);`) + assert.Regexp(t, "violates unique constraint", err) + return nil + }) + + <-beforeOnFailOrCancelNotification + + _, err = sqlDB.Exec(`DROP TABLE t.test;`) + require.NoError(t, err) + + close(continueNotification) + require.NoError(t, g.Wait()) + + var status jobs.Status + var jobError string + require.NoError(t, sqlDB.QueryRow(` +SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNIQUE INDEX%' +`).Scan(&status, &jobError)) + require.Equal(t, jobs.StatusFailed, status) + require.Regexp(t, "violates unique constraint", jobError) +} + // TestPermanentErrorDuringRollback tests that a permanent error while rolling // back a schema change causes the job to fail, and that the appropriate error // is displayed in the jobs table. @@ -6123,7 +6184,7 @@ func TestPermanentErrorDuringRollback(t *testing.T) { defer setTestJobsAdoptInterval()() ctx := context.Background() - runTest := func(params base.TestServerArgs) { + runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) { s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -6140,10 +6201,20 @@ CREATE UNIQUE INDEX i ON t.test(v); `) require.Regexp(t, "violates unique constraint", err.Error()) + var jobID int64 var jobErr string - row := sqlDB.QueryRow("SELECT error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") - require.NoError(t, row.Scan(&jobErr)) + row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") + require.NoError(t, row.Scan(&jobID, &jobErr)) require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr) + + if gcJobRecord { + _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID) + require.NoError(t, err) + } + + // Test that dropping the table is still possible. + _, err = sqlDB.Exec(`DROP TABLE t.test`) + require.NoError(t, err) } t.Run("error-before-backfill", func(t *testing.T) { @@ -6168,7 +6239,9 @@ CREATE UNIQUE INDEX i ON t.test(v); }, }, } - runTest(params) + // Don't GC the job record after the schema change, so we can test dropping + // the table with a failed mutation job. + runTest(t, params, false /* gcJobRecord */) }) t.Run("error-before-reversing-mutations", func(t *testing.T) { @@ -6193,7 +6266,9 @@ CREATE UNIQUE INDEX i ON t.test(v); }, }, } - runTest(params) + // GC the job record after the schema change, so we can test dropping the + // table with a nonexistent mutation job. + runTest(t, params, true /* gcJobRecord */) }) }