diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 9fa287911e28..f4ce750fd3a2 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -450,24 +450,29 @@ func (p *planner) initiateDropTable( // Also, changes made here do not affect schema change jobs created in this // transaction with no mutation ID; they remain in the cache, and will be // updated when writing the job record to drop the table. - jobIDs := make(map[int64]struct{}) - var id descpb.MutationID - for _, m := range tableDesc.Mutations { - if id != m.MutationID { - id = m.MutationID - jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, id) - if err != nil { - return err - } - jobIDs[jobID] = struct{}{} - } + if err := p.markTableMutationJobsSuccessful(ctx, tableDesc); err != nil { + return err } - for jobID := range jobIDs { - // Mark jobs as succeeded when possible, but be defensive about jobs that - // are already in a terminal state or nonexistent. This could happen for - // schema change jobs that couldn't be successfully reverted and ended up in - // a failed state. Such jobs could have already been GCed from the jobs - // table by the time this code runs. + + // Initiate an immediate schema change. When dropping a table + // in a session, the data and the descriptor are not deleted. + // Instead, that is taken care of asynchronously by the schema + // change manager, which is notified via a system config gossip. + // The schema change manager will properly schedule deletion of + // the underlying data when the GC deadline expires. + return p.writeDropTable(ctx, tableDesc, queueJob, jobDesc) +} + +// Mark jobs as succeeded when possible, but be defensive about jobs that +// are already in a terminal state or nonexistent. This could happen for +// schema change jobs that couldn't be successfully reverted and ended up in +// a failed state. Such jobs could have already been GCed from the jobs +// table by the time this code runs. +func (p *planner) markTableMutationJobsSuccessful( + ctx context.Context, tableDesc *tabledesc.Mutable, +) error { + for _, mj := range tableDesc.MutationJobs { + jobID := mj.JobID mutationJob, err := p.execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn) if err != nil { if jobs.HasJobNotFoundError(err) { @@ -498,14 +503,7 @@ func (p *planner) initiateDropTable( } delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID) } - - // Initiate an immediate schema change. When dropping a table - // in a session, the data and the descriptor are not deleted. - // Instead, that is taken care of asynchronously by the schema - // change manager, which is notified via a system config gossip. - // The schema change manager will properly schedule deletion of - // the underlying data when the GC deadline expires. - return p.writeDropTable(ctx, tableDesc, queueJob, jobDesc) + return nil } func (p *planner) removeFKForBackReference( diff --git a/pkg/sql/tests/truncate_test.go b/pkg/sql/tests/truncate_test.go new file mode 100644 index 000000000000..51d9ca55d760 --- /dev/null +++ b/pkg/sql/tests/truncate_test.go @@ -0,0 +1,336 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/stats" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// TestTruncateWithConcurrentMutations is a regression test to cover a +// situation where a table is truncated while concurrent schema changes are +// performed. Prior to the commit introducing this test, all concurrent +// mutations were made public and the corresponding mutation jobs would not be +// dealt with. This could lead to tables which cannot be changed by schema +// changes and have invalid secondary indexes. Instead we now allowlist specific +// interactions and reject the rest. This test exercises these scenarios. +func TestTruncateWithConcurrentMutations(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + type blockType int + const ( + blockAfterBackfill blockType = iota // default + blockBeforeResume + ) + type validateQuery struct { + stmt string + optionalResults [][]string + } + type testCase struct { + blockType blockType + setupStmts []string + name string + expErrRE string + truncateStmt string + stmts []string + validations []validateQuery + } + run := func(t *testing.T, testC testCase) { + var ( + blocked = make(chan struct{}) + unblock = make(chan error) + tc *testcluster.TestCluster + ) + { + settings := cluster.MakeTestingClusterSettings() + stats.AutomaticStatisticsClusterMode.Override(&settings.SV, false) + scKnobs := &sql.SchemaChangerTestingKnobs{} + blockFunc := func(jobID int64) error { + select { + case blocked <- struct{}{}: + case err := <-unblock: + return err + } + return <-unblock + } + switch testC.blockType { + case blockAfterBackfill: + scKnobs.RunAfterBackfill = blockFunc + case blockBeforeResume: + scKnobs.RunBeforeResume = blockFunc + } + tc = testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + SQLSchemaChanger: scKnobs, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + } + + db := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(db) + + // Support setup schema changes. + close(unblock) + for _, stmt := range testC.setupStmts { + tdb.Exec(t, stmt) + } + + // Block the main schema changes. + unblock = make(chan error) + + // Create an index concurrently and make sure it blocks. + var g errgroup.Group + g.Go(func() error { + if len(testC.stmts) == 1 { + _, err := db.Exec(testC.stmts[0]) + return err + } + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + for _, stmt := range testC.stmts { + _, err = tx.Exec(stmt) + if err != nil { + _ = tx.Rollback() + return err + } + } + return tx.Commit() + }) + errCh := make(chan error) + go func() { errCh <- g.Wait() }() + + select { + case err := <-errCh: + t.Fatal(err) // concurrent txn failed + case <-blocked: + } + + _, err := db.Exec(testC.truncateStmt) + if testC.expErrRE == "" { + assert.NoError(t, err) + } else { + assert.Regexp(t, testC.expErrRE, err) + } + + close(unblock) + require.NoError(t, <-errCh) + + for _, v := range testC.validations { + if v.optionalResults != nil { + tdb.CheckQueryResults(t, v.stmt, v.optionalResults) + } else { + tdb.Exec(t, v.stmt) + } + } + } + const ( + commonCreateTable = `CREATE TABLE t (i INT PRIMARY KEY, j INT)` + commonPopulateData = `INSERT INTO t SELECT i, i+1 as j from generate_series(1, 100, 1) as t(i)` + ) + commonValidations := []validateQuery{ + { + stmt: "ALTER TABLE t ADD COLUMN added_column INT", + }, + } + commonIdxValidations := append([]validateQuery{ + { + stmt: "SELECT count(*) FROM t", + optionalResults: [][]string{{"0"}}, + }, + { + stmt: "SELECT count(*) FROM t@idx", + optionalResults: [][]string{{"0"}}, + }, + }, commonValidations...) + + cases := []testCase{ + { + name: "add index", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `CREATE INDEX idx ON t(j)`, + }, + validations: commonIdxValidations, + }, + { + name: "add index with column", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD COLUMN k INT`, + `CREATE INDEX idx ON t(j, k)`, + }, + validations: commonIdxValidations, + }, + { + blockType: blockBeforeResume, + name: "drop index", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `ALTER TABLE t ADD COLUMN k INT`, + `CREATE INDEX idx ON t(j, k)`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `DROP INDEX t@idx`, + }, + expErrRE: `unimplemented: cannot perform TRUNCATE on "t" which has indexes being dropped`, + }, + { + name: "drop column with user-defined type", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `CREATE TYPE typ AS ENUM ('a')`, + `ALTER TABLE t ADD COLUMN k typ`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN k`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has ` + + `a column \("k"\) being dropped which depends on another object`, + }, + { + name: "drop column which uses sequence", + setupStmts: []string{ + commonCreateTable, + `SET serial_normalization='sql_sequence'`, + `ALTER TABLE t ADD COLUMN k SERIAL`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN k`, + }, + validations: commonValidations, + }, + { + name: "drop column which owns sequence", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `CREATE SEQUENCE s OWNED BY t.j`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN j`, + }, + validations: commonValidations, + }, + { + name: "alter primary key", + setupStmts: []string{ + `CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL)`, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (j)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ongoing primary key change`, + }, + { + name: "add column", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD COLUMN k INT`, + }, + validations: commonValidations, + }, + { + name: "add self fk", + blockType: blockBeforeResume, + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD CONSTRAINT fk FOREIGN KEY (j) REFERENCES t(i)`, + `INSERT INTO t VALUES (101, NULL)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ` + + `ongoing FOREIGN_KEY constraint change`, + }, + { + name: "add other fk", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `CREATE TABLE t2 (i INT PRIMARY KEY)`, + `INSERT INTO t2 SELECT i+1 as i from generate_series(1, 100, 1) as t(i)`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD CONSTRAINT fk FOREIGN KEY (j) REFERENCES t2(i)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ` + + `ongoing FOREIGN_KEY constraint change`, + }, + { + name: "add check constraint", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD CONSTRAINT c CHECK (j > 1)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ongoing CHECK constraint change`, + }, + { + name: "drop column", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN j`, + }, + validations: commonValidations, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { run(t, tc) }) + } +} diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index b068fcf1263d..4cf8126506ce 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -199,7 +200,21 @@ func (p *planner) truncateTable( return err } - // Collect all of the old indexes. + if err := checkTableForDisallowedMutationsWithTruncate(tableDesc); err != nil { + return err + } + + // Resolve all outstanding mutations. Make all new schema elements + // public because the table is empty and doesn't need to be backfilled. + for _, m := range tableDesc.Mutations { + if err := tableDesc.MakeMutationComplete(m); err != nil { + return err + } + } + tableDesc.Mutations = nil + tableDesc.GCMutations = nil + + // Collect all of the old indexes and reset all of the index IDs. oldIndexes := make([]descpb.IndexDescriptor, len(tableDesc.Indexes)+1) oldIndexes[0] = *protoutil.Clone(&tableDesc.PrimaryIndex).(*descpb.IndexDescriptor) for i := range tableDesc.Indexes { @@ -223,16 +238,6 @@ func (p *planner) truncateTable( indexIDMapping[oldIndexes[i+1].ID] = tableDesc.Indexes[i].ID } - // Resolve all outstanding mutations. Make all new schema elements - // public because the table is empty and doesn't need to be backfilled. - for _, m := range tableDesc.Mutations { - if err := tableDesc.MakeMutationComplete(m); err != nil { - return err - } - } - tableDesc.Mutations = nil - tableDesc.GCMutations = nil - // Create schema change GC jobs for all of the indexes. dropTime := timeutil.Now().UnixNano() droppedIndexes := make([]jobspb.SchemaChangeGCDetails_DroppedIndex, 0, len(oldIndexes)) @@ -307,9 +312,88 @@ func (p *planner) truncateTable( return err } + // Mark any mutation jobs as successful and then clear the record of + // their existence. Old code did not do this and relied on jobs finishing + // and removing themselves. That turns out to be hazardous because those + // jobs may now fail and then fail to revert. + if err := p.markTableMutationJobsSuccessful(ctx, tableDesc); err != nil { + return err + } + tableDesc.MutationJobs = nil + return p.writeSchemaChange(ctx, tableDesc, descpb.InvalidMutationID, jobDesc) } +// checkTableForDisallowedMutationsWithTruncate iterates the set of mutations +// in the descriptor and determines whether any should prevent the truncate +// from completing. Interactions between truncate and ongoing schema change +// jobs are complex. On some level, the truncate can allow the work of these +// mutations to be short-circuited. However, accounting for the various cross- +// descriptor side-effects (like removing back-references or cleaning up dropped +// indexes) feels hard to get exactly right in the existing model. Thus, it +// is safer to more generally just disallow TRUNCATE when there are things +// going on and then advise the user to allow jobs to continue or to cancel +// them. +// +// This also aligns with a future world where we will be diminishing the +// allowable concurrency. However, in today's schema change world, some +// cancellations require O(rows) work. Some of these operations we can safely +// just complete. This function iterates the set of mutations and decides +// whether any exist that are not safe to just complete. +func checkTableForDisallowedMutationsWithTruncate(desc *tabledesc.Mutable) error { + + // The mutations which are scary to complete are mutations which involve + // back-references. That would be anything related to foreign keys and + // dropping of a column using a user-defined type or sequence. We can permit + // the addition or removal of columns so long as they do not involve + // user-defined types. + + for i, m := range desc.Mutations { + if idx := m.GetIndex(); idx != nil { + // Do not allow dropping indexes. + if m.Direction != descpb.DescriptorMutation_ADD { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has indexes being dropped", desc.GetName()) + } + } else if col := m.GetColumn(); col != nil { + if m.Direction == descpb.DescriptorMutation_DROP && col.Type.UserDefined() { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has a column (%q) being "+ + "dropped which depends on another object", desc.GetName(), col.Name) + } + } else if c := m.GetConstraint(); c != nil { + switch ct := c.ConstraintType; ct { + case descpb.ConstraintToUpdate_CHECK, + descpb.ConstraintToUpdate_NOT_NULL, + descpb.ConstraintToUpdate_FOREIGN_KEY: + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has an ongoing %s "+ + "constraint change", desc.GetName(), ct) + default: + return errors.AssertionFailedf("cannot perform TRUNCATE due to "+ + "unknown constraint type %v on mutation %d in %v", ct, i, desc) + } + } else if s := m.GetPrimaryKeySwap(); s != nil { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has an ongoing primary key "+ + "change", desc.GetName()) + } else if m.GetComputedColumnSwap() != nil { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has an ongoing column type "+ + "change", desc.GetName()) + } else { + return errors.AssertionFailedf("cannot perform TRUNCATE due to "+ + "concurrent unknown mutation of type %T for mutation %d in %v", m, i, desc) + } + } + return nil +} + // ClearTableDataInChunks truncates the data of a table in chunks. It deletes a // range of data for the table, which includes the PK and all indexes. // The table has already been marked for deletion and has been purged from the