diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 9422b060fe8b..df63f7658d1a 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -15,6 +15,8 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -27,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -426,7 +429,20 @@ func (p *planner) initiateDropTable( tableDesc.DrainingNames = append(tableDesc.DrainingNames, nameDetails) } - // Mark all jobs scheduled for schema changes as successful. + // For this table descriptor, mark all previous jobs scheduled for schema changes as successful + // and delete them from the schema change job cache. + // + // Since the table is being dropped, any previous schema changes to the table do not need to complete + // and can be put in a terminal state such as Succeeded. Deleting the jobs from the cache ensures that + // subsequent schema changes in the transaction (ie. this drop table statement) do not get a cache hit + // and do not try to update succeeded jobs, which would raise an error. Instead, this drop table + // statement will create a new job to drop the table. + // + // Note that we still wait for jobs removed from the cache to finish running + // after the transaction, since they're not removed from the jobsCollection. + // Also, changes made here do not affect schema change jobs created in this + // transaction with no mutation ID; they remain in the cache, and will be + // updated when writing the job record to drop the table. jobIDs := make(map[int64]struct{}) var id descpb.MutationID for _, m := range tableDesc.Mutations { @@ -440,11 +456,42 @@ func (p *planner) initiateDropTable( } } for jobID := range jobIDs { - if err := p.ExecCfg().JobRegistry.Succeeded(ctx, p.txn, jobID); err != nil { - return errors.Wrapf(err, - "failed to mark job %d as as successful", errors.Safe(jobID)) + // Mark jobs as succeeded when possible, but be defensive about jobs that + // are already in a terminal state or nonexistent. This could happen for + // schema change jobs that couldn't be successfully reverted and ended up in + // a failed state. Such jobs could have already been GCed from the jobs + // table by the time this code runs. + mutationJob, err := p.execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn) + if err != nil { + if jobs.HasJobNotFoundError(err) { + log.Warningf(ctx, "mutation job %d not found", jobID) + continue + } + return err + } + if err := mutationJob.WithTxn(p.txn).Update( + ctx, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + status := md.Status + switch status { + case jobs.StatusSucceeded, jobs.StatusCanceled, jobs.StatusFailed: + log.Warningf(ctx, "mutation job %d in unexpected state %s", jobID, status) + return nil + case jobs.StatusRunning, jobs.StatusPending: + status = jobs.StatusSucceeded + default: + // We shouldn't mark jobs as succeeded if they're not in a state where + // they're eligible to ever succeed, so mark them as failed. + status = jobs.StatusFailed + } + log.Infof(ctx, "marking mutation job %d for dropped table as %s", jobID, status) + ju.UpdateStatus(status) + return nil + }); err != nil { + return errors.Wrap(err, "updating mutation job for dropped table") } + delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID) } + // Initiate an immediate schema change. When dropping a table // in a session, the data and the descriptor are not deleted. // Instead, that is taken care of asynchronously by the schema diff --git a/pkg/sql/logictest/testdata/logic_test/drop_table b/pkg/sql/logictest/testdata/logic_test/drop_table index c30f0f5402b9..0285a5b688a9 100644 --- a/pkg/sql/logictest/testdata/logic_test/drop_table +++ b/pkg/sql/logictest/testdata/logic_test/drop_table @@ -75,3 +75,25 @@ user testuser # Being the owner of schema s should allow testuser to drop table s.t. statement ok DROP TABLE s.t + +# Verify that a table can successfully be dropped after performing +# a schema change to the table in the same transaction. +# See https://github.com/cockroachdb/cockroach/issues/56235. +subtest drop_after_schema_change_in_txn +statement ok +CREATE TABLE to_drop(); + +statement ok +BEGIN; + +statement ok +ALTER TABLE to_drop ADD COLUMN foo int; + +statement ok +DROP TABLE to_drop; + +statement ok +COMMIT; + +statement error pgcode 42P01 relation "to_drop" does not exist +DROP TABLE to_drop; diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 1f6b53f0ba62..163da27d2ac1 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -6093,6 +6093,67 @@ CREATE UNIQUE INDEX i ON t.test(v); }) } +// TestDropTableWhileSchemaChangeReverting tests that schema changes in the +// reverting state end up as failed when the table is dropped. +func TestDropTableWhileSchemaChangeReverting(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer setTestJobsAdoptInterval()() + ctx := context.Background() + + // Closed when we enter the RunBeforeOnFailOrCancel knob, at which point the + // job is in the reverting state. + beforeOnFailOrCancelNotification := make(chan struct{}) + // Closed when we're ready to continue with the schema change (rollback). + continueNotification := make(chan struct{}) + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeOnFailOrCancel: func(_ int64) error { + close(beforeOnFailOrCancelNotification) + <-continueNotification + // Return a retry error, so that we can be sure to test the path where + // the job is marked as failed by the DROP TABLE instead of running to + // completion and ending up in the failed state on its own. + return jobs.NewRetryJobError("injected retry error") + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + _, err := sqlDB.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); +INSERT INTO t.test VALUES (1, 2), (2, 2); +`) + require.NoError(t, err) + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + // Try to create a unique index which won't be valid and will need a rollback. + _, err := sqlDB.Exec(`CREATE UNIQUE INDEX i ON t.test(v);`) + assert.Regexp(t, "violates unique constraint", err) + return nil + }) + + <-beforeOnFailOrCancelNotification + + _, err = sqlDB.Exec(`DROP TABLE t.test;`) + require.NoError(t, err) + + close(continueNotification) + require.NoError(t, g.Wait()) + + var status jobs.Status + var jobError string + require.NoError(t, sqlDB.QueryRow(` +SELECT status, error FROM crdb_internal.jobs WHERE description LIKE '%CREATE UNIQUE INDEX%' +`).Scan(&status, &jobError)) + require.Equal(t, jobs.StatusFailed, status) + require.Regexp(t, "violates unique constraint", jobError) +} + // TestPermanentErrorDuringRollback tests that a permanent error while rolling // back a schema change causes the job to fail, and that the appropriate error // is displayed in the jobs table. @@ -6102,7 +6163,7 @@ func TestPermanentErrorDuringRollback(t *testing.T) { defer setTestJobsAdoptInterval()() ctx := context.Background() - runTest := func(params base.TestServerArgs) { + runTest := func(t *testing.T, params base.TestServerArgs, gcJobRecord bool) { s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -6119,10 +6180,20 @@ CREATE UNIQUE INDEX i ON t.test(v); `) require.Regexp(t, "violates unique constraint", err.Error()) + var jobID int64 var jobErr string - row := sqlDB.QueryRow("SELECT error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") - require.NoError(t, row.Scan(&jobErr)) + row := sqlDB.QueryRow("SELECT job_id, error FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE'") + require.NoError(t, row.Scan(&jobID, &jobErr)) require.Regexp(t, "cannot be reverted, manual cleanup may be required: permanent error", jobErr) + + if gcJobRecord { + _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1`, jobID) + require.NoError(t, err) + } + + // Test that dropping the table is still possible. + _, err = sqlDB.Exec(`DROP TABLE t.test`) + require.NoError(t, err) } t.Run("error-before-backfill", func(t *testing.T) { @@ -6147,7 +6218,9 @@ CREATE UNIQUE INDEX i ON t.test(v); }, }, } - runTest(params) + // Don't GC the job record after the schema change, so we can test dropping + // the table with a failed mutation job. + runTest(t, params, false /* gcJobRecord */) }) t.Run("error-before-reversing-mutations", func(t *testing.T) { @@ -6172,7 +6245,9 @@ CREATE UNIQUE INDEX i ON t.test(v); }, }, } - runTest(params) + // GC the job record after the schema change, so we can test dropping the + // table with a nonexistent mutation job. + runTest(t, params, true /* gcJobRecord */) }) }