From 85759978683ed04d983978eee000e3d1c7276453 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 1 Apr 2021 02:07:44 -0400 Subject: [PATCH] sql: fix or disallow hazardous interactions between TRUNCATE and schema changes Prior to this change, the truncate implementation since 20.2 was totally broken. The code creates new indexes for all existing indexes. It also makes all mutations public. The problem with this is that it makes the mutations public *after* deciding which indexes to clone. Because of this it just makes currently being backfilled or validated indexes public. This leads to inconsistency between the old secondary index and the newly truncated indexes. The code is also problematic in that it does not do anything about the mutation jobs. The code assumes that these jobs will happily continue and detect that they correspond to no mutations and stop doing things as well as clean themselves up. Unfortunately, it seems that these changes can lead to certain failures. For example, new index validation may now fail. Reverting that index build will now also fail because the index is public. This will leave the descriptor in a state where it has an orphaned mutation job. We fix this by only permitting TRUNCATE to run concurrently with mutations we deem safe and then to fix the behavior for those cases. Fixes #62842. Release note (bug fix): Fixed bugs where TRUNCATE concurrent with index construction and other schema changes could result in corruption. --- pkg/sql/drop_table.go | 48 +++-- pkg/sql/tests/truncate_test.go | 336 +++++++++++++++++++++++++++++++++ pkg/sql/truncate.go | 106 +++++++++-- 3 files changed, 454 insertions(+), 36 deletions(-) create mode 100644 pkg/sql/tests/truncate_test.go diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 9fa287911e28..f4ce750fd3a2 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -450,24 +450,29 @@ func (p *planner) initiateDropTable( // Also, changes made here do not affect schema change jobs created in this // transaction with no mutation ID; they remain in the cache, and will be // updated when writing the job record to drop the table. - jobIDs := make(map[int64]struct{}) - var id descpb.MutationID - for _, m := range tableDesc.Mutations { - if id != m.MutationID { - id = m.MutationID - jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, id) - if err != nil { - return err - } - jobIDs[jobID] = struct{}{} - } + if err := p.markTableMutationJobsSuccessful(ctx, tableDesc); err != nil { + return err } - for jobID := range jobIDs { - // Mark jobs as succeeded when possible, but be defensive about jobs that - // are already in a terminal state or nonexistent. This could happen for - // schema change jobs that couldn't be successfully reverted and ended up in - // a failed state. Such jobs could have already been GCed from the jobs - // table by the time this code runs. + + // Initiate an immediate schema change. When dropping a table + // in a session, the data and the descriptor are not deleted. + // Instead, that is taken care of asynchronously by the schema + // change manager, which is notified via a system config gossip. + // The schema change manager will properly schedule deletion of + // the underlying data when the GC deadline expires. + return p.writeDropTable(ctx, tableDesc, queueJob, jobDesc) +} + +// Mark jobs as succeeded when possible, but be defensive about jobs that +// are already in a terminal state or nonexistent. This could happen for +// schema change jobs that couldn't be successfully reverted and ended up in +// a failed state. Such jobs could have already been GCed from the jobs +// table by the time this code runs. +func (p *planner) markTableMutationJobsSuccessful( + ctx context.Context, tableDesc *tabledesc.Mutable, +) error { + for _, mj := range tableDesc.MutationJobs { + jobID := mj.JobID mutationJob, err := p.execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn) if err != nil { if jobs.HasJobNotFoundError(err) { @@ -498,14 +503,7 @@ func (p *planner) initiateDropTable( } delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID) } - - // Initiate an immediate schema change. When dropping a table - // in a session, the data and the descriptor are not deleted. - // Instead, that is taken care of asynchronously by the schema - // change manager, which is notified via a system config gossip. - // The schema change manager will properly schedule deletion of - // the underlying data when the GC deadline expires. - return p.writeDropTable(ctx, tableDesc, queueJob, jobDesc) + return nil } func (p *planner) removeFKForBackReference( diff --git a/pkg/sql/tests/truncate_test.go b/pkg/sql/tests/truncate_test.go new file mode 100644 index 000000000000..51d9ca55d760 --- /dev/null +++ b/pkg/sql/tests/truncate_test.go @@ -0,0 +1,336 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/stats" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// TestTruncateWithConcurrentMutations is a regression test to cover a +// situation where a table is truncated while concurrent schema changes are +// performed. Prior to the commit introducing this test, all concurrent +// mutations were made public and the corresponding mutation jobs would not be +// dealt with. This could lead to tables which cannot be changed by schema +// changes and have invalid secondary indexes. Instead we now allowlist specific +// interactions and reject the rest. This test exercises these scenarios. +func TestTruncateWithConcurrentMutations(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + type blockType int + const ( + blockAfterBackfill blockType = iota // default + blockBeforeResume + ) + type validateQuery struct { + stmt string + optionalResults [][]string + } + type testCase struct { + blockType blockType + setupStmts []string + name string + expErrRE string + truncateStmt string + stmts []string + validations []validateQuery + } + run := func(t *testing.T, testC testCase) { + var ( + blocked = make(chan struct{}) + unblock = make(chan error) + tc *testcluster.TestCluster + ) + { + settings := cluster.MakeTestingClusterSettings() + stats.AutomaticStatisticsClusterMode.Override(&settings.SV, false) + scKnobs := &sql.SchemaChangerTestingKnobs{} + blockFunc := func(jobID int64) error { + select { + case blocked <- struct{}{}: + case err := <-unblock: + return err + } + return <-unblock + } + switch testC.blockType { + case blockAfterBackfill: + scKnobs.RunAfterBackfill = blockFunc + case blockBeforeResume: + scKnobs.RunBeforeResume = blockFunc + } + tc = testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + SQLSchemaChanger: scKnobs, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + } + + db := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(db) + + // Support setup schema changes. + close(unblock) + for _, stmt := range testC.setupStmts { + tdb.Exec(t, stmt) + } + + // Block the main schema changes. + unblock = make(chan error) + + // Create an index concurrently and make sure it blocks. + var g errgroup.Group + g.Go(func() error { + if len(testC.stmts) == 1 { + _, err := db.Exec(testC.stmts[0]) + return err + } + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + for _, stmt := range testC.stmts { + _, err = tx.Exec(stmt) + if err != nil { + _ = tx.Rollback() + return err + } + } + return tx.Commit() + }) + errCh := make(chan error) + go func() { errCh <- g.Wait() }() + + select { + case err := <-errCh: + t.Fatal(err) // concurrent txn failed + case <-blocked: + } + + _, err := db.Exec(testC.truncateStmt) + if testC.expErrRE == "" { + assert.NoError(t, err) + } else { + assert.Regexp(t, testC.expErrRE, err) + } + + close(unblock) + require.NoError(t, <-errCh) + + for _, v := range testC.validations { + if v.optionalResults != nil { + tdb.CheckQueryResults(t, v.stmt, v.optionalResults) + } else { + tdb.Exec(t, v.stmt) + } + } + } + const ( + commonCreateTable = `CREATE TABLE t (i INT PRIMARY KEY, j INT)` + commonPopulateData = `INSERT INTO t SELECT i, i+1 as j from generate_series(1, 100, 1) as t(i)` + ) + commonValidations := []validateQuery{ + { + stmt: "ALTER TABLE t ADD COLUMN added_column INT", + }, + } + commonIdxValidations := append([]validateQuery{ + { + stmt: "SELECT count(*) FROM t", + optionalResults: [][]string{{"0"}}, + }, + { + stmt: "SELECT count(*) FROM t@idx", + optionalResults: [][]string{{"0"}}, + }, + }, commonValidations...) + + cases := []testCase{ + { + name: "add index", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `CREATE INDEX idx ON t(j)`, + }, + validations: commonIdxValidations, + }, + { + name: "add index with column", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD COLUMN k INT`, + `CREATE INDEX idx ON t(j, k)`, + }, + validations: commonIdxValidations, + }, + { + blockType: blockBeforeResume, + name: "drop index", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `ALTER TABLE t ADD COLUMN k INT`, + `CREATE INDEX idx ON t(j, k)`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `DROP INDEX t@idx`, + }, + expErrRE: `unimplemented: cannot perform TRUNCATE on "t" which has indexes being dropped`, + }, + { + name: "drop column with user-defined type", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `CREATE TYPE typ AS ENUM ('a')`, + `ALTER TABLE t ADD COLUMN k typ`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN k`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has ` + + `a column \("k"\) being dropped which depends on another object`, + }, + { + name: "drop column which uses sequence", + setupStmts: []string{ + commonCreateTable, + `SET serial_normalization='sql_sequence'`, + `ALTER TABLE t ADD COLUMN k SERIAL`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN k`, + }, + validations: commonValidations, + }, + { + name: "drop column which owns sequence", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `CREATE SEQUENCE s OWNED BY t.j`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN j`, + }, + validations: commonValidations, + }, + { + name: "alter primary key", + setupStmts: []string{ + `CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL)`, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (j)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ongoing primary key change`, + }, + { + name: "add column", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD COLUMN k INT`, + }, + validations: commonValidations, + }, + { + name: "add self fk", + blockType: blockBeforeResume, + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD CONSTRAINT fk FOREIGN KEY (j) REFERENCES t(i)`, + `INSERT INTO t VALUES (101, NULL)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ` + + `ongoing FOREIGN_KEY constraint change`, + }, + { + name: "add other fk", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + `CREATE TABLE t2 (i INT PRIMARY KEY)`, + `INSERT INTO t2 SELECT i+1 as i from generate_series(1, 100, 1) as t(i)`, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD CONSTRAINT fk FOREIGN KEY (j) REFERENCES t2(i)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ` + + `ongoing FOREIGN_KEY constraint change`, + }, + { + name: "add check constraint", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t ADD CONSTRAINT c CHECK (j > 1)`, + }, + expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ongoing CHECK constraint change`, + }, + { + name: "drop column", + setupStmts: []string{ + commonCreateTable, + commonPopulateData, + }, + truncateStmt: "TRUNCATE TABLE t", + stmts: []string{ + `ALTER TABLE t DROP COLUMN j`, + }, + validations: commonValidations, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { run(t, tc) }) + } +} diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index b068fcf1263d..4cf8126506ce 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -199,7 +200,21 @@ func (p *planner) truncateTable( return err } - // Collect all of the old indexes. + if err := checkTableForDisallowedMutationsWithTruncate(tableDesc); err != nil { + return err + } + + // Resolve all outstanding mutations. Make all new schema elements + // public because the table is empty and doesn't need to be backfilled. + for _, m := range tableDesc.Mutations { + if err := tableDesc.MakeMutationComplete(m); err != nil { + return err + } + } + tableDesc.Mutations = nil + tableDesc.GCMutations = nil + + // Collect all of the old indexes and reset all of the index IDs. oldIndexes := make([]descpb.IndexDescriptor, len(tableDesc.Indexes)+1) oldIndexes[0] = *protoutil.Clone(&tableDesc.PrimaryIndex).(*descpb.IndexDescriptor) for i := range tableDesc.Indexes { @@ -223,16 +238,6 @@ func (p *planner) truncateTable( indexIDMapping[oldIndexes[i+1].ID] = tableDesc.Indexes[i].ID } - // Resolve all outstanding mutations. Make all new schema elements - // public because the table is empty and doesn't need to be backfilled. - for _, m := range tableDesc.Mutations { - if err := tableDesc.MakeMutationComplete(m); err != nil { - return err - } - } - tableDesc.Mutations = nil - tableDesc.GCMutations = nil - // Create schema change GC jobs for all of the indexes. dropTime := timeutil.Now().UnixNano() droppedIndexes := make([]jobspb.SchemaChangeGCDetails_DroppedIndex, 0, len(oldIndexes)) @@ -307,9 +312,88 @@ func (p *planner) truncateTable( return err } + // Mark any mutation jobs as successful and then clear the record of + // their existence. Old code did not do this and relied on jobs finishing + // and removing themselves. That turns out to be hazardous because those + // jobs may now fail and then fail to revert. + if err := p.markTableMutationJobsSuccessful(ctx, tableDesc); err != nil { + return err + } + tableDesc.MutationJobs = nil + return p.writeSchemaChange(ctx, tableDesc, descpb.InvalidMutationID, jobDesc) } +// checkTableForDisallowedMutationsWithTruncate iterates the set of mutations +// in the descriptor and determines whether any should prevent the truncate +// from completing. Interactions between truncate and ongoing schema change +// jobs are complex. On some level, the truncate can allow the work of these +// mutations to be short-circuited. However, accounting for the various cross- +// descriptor side-effects (like removing back-references or cleaning up dropped +// indexes) feels hard to get exactly right in the existing model. Thus, it +// is safer to more generally just disallow TRUNCATE when there are things +// going on and then advise the user to allow jobs to continue or to cancel +// them. +// +// This also aligns with a future world where we will be diminishing the +// allowable concurrency. However, in today's schema change world, some +// cancellations require O(rows) work. Some of these operations we can safely +// just complete. This function iterates the set of mutations and decides +// whether any exist that are not safe to just complete. +func checkTableForDisallowedMutationsWithTruncate(desc *tabledesc.Mutable) error { + + // The mutations which are scary to complete are mutations which involve + // back-references. That would be anything related to foreign keys and + // dropping of a column using a user-defined type or sequence. We can permit + // the addition or removal of columns so long as they do not involve + // user-defined types. + + for i, m := range desc.Mutations { + if idx := m.GetIndex(); idx != nil { + // Do not allow dropping indexes. + if m.Direction != descpb.DescriptorMutation_ADD { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has indexes being dropped", desc.GetName()) + } + } else if col := m.GetColumn(); col != nil { + if m.Direction == descpb.DescriptorMutation_DROP && col.Type.UserDefined() { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has a column (%q) being "+ + "dropped which depends on another object", desc.GetName(), col.Name) + } + } else if c := m.GetConstraint(); c != nil { + switch ct := c.ConstraintType; ct { + case descpb.ConstraintToUpdate_CHECK, + descpb.ConstraintToUpdate_NOT_NULL, + descpb.ConstraintToUpdate_FOREIGN_KEY: + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has an ongoing %s "+ + "constraint change", desc.GetName(), ct) + default: + return errors.AssertionFailedf("cannot perform TRUNCATE due to "+ + "unknown constraint type %v on mutation %d in %v", ct, i, desc) + } + } else if s := m.GetPrimaryKeySwap(); s != nil { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has an ongoing primary key "+ + "change", desc.GetName()) + } else if m.GetComputedColumnSwap() != nil { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has an ongoing column type "+ + "change", desc.GetName()) + } else { + return errors.AssertionFailedf("cannot perform TRUNCATE due to "+ + "concurrent unknown mutation of type %T for mutation %d in %v", m, i, desc) + } + } + return nil +} + // ClearTableDataInChunks truncates the data of a table in chunks. It deletes a // range of data for the table, which includes the PK and all indexes. // The table has already been marked for deletion and has been purged from the