Skip to content

Commit

Permalink
sql: fix or disallow hazardous interactions between TRUNCATE and sche…
Browse files Browse the repository at this point in the history
…ma changes

Prior to this change, the truncate implementation since 20.2 was totally
broken. The code creates new indexes for all existing indexes. It also
makes all mutations public. The problem with this is that it makes the
mutations public *after* deciding which indexes to clone. Because of this
it just makes currently being backfilled or validated indexes public. This
leads to inconsistency between the old secondary index and the newly
truncated indexes.

The code is also problematic in that it does not do anything about the mutation
jobs. The code assumes that these jobs will happily continue and detect that
they correspond to no mutations and stop doing things as well as clean
themselves up. Unfortunately, it seems that these changes can lead to certain
failures. For example, new index validation may now fail. Reverting that index
build will now also fail because the index is public. This will leave the
descriptor in a state where it has an orphaned mutation job. We fix this by
only permitting TRUNCATE to run concurrently with mutations we deem safe and
then to fix the behavior for those cases.

Fixes #62842.

Release note (bug fix): Fixed bugs where TRUNCATE concurrent with index
construction and other schema changes could result in corruption.
  • Loading branch information
ajwerner committed Apr 20, 2021
1 parent 3ebdf34 commit 8575997
Show file tree
Hide file tree
Showing 3 changed files with 454 additions and 36 deletions.
48 changes: 23 additions & 25 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,24 +450,29 @@ func (p *planner) initiateDropTable(
// Also, changes made here do not affect schema change jobs created in this
// transaction with no mutation ID; they remain in the cache, and will be
// updated when writing the job record to drop the table.
jobIDs := make(map[int64]struct{})
var id descpb.MutationID
for _, m := range tableDesc.Mutations {
if id != m.MutationID {
id = m.MutationID
jobID, err := getJobIDForMutationWithDescriptor(ctx, tableDesc, id)
if err != nil {
return err
}
jobIDs[jobID] = struct{}{}
}
if err := p.markTableMutationJobsSuccessful(ctx, tableDesc); err != nil {
return err
}
for jobID := range jobIDs {
// Mark jobs as succeeded when possible, but be defensive about jobs that
// are already in a terminal state or nonexistent. This could happen for
// schema change jobs that couldn't be successfully reverted and ended up in
// a failed state. Such jobs could have already been GCed from the jobs
// table by the time this code runs.

// Initiate an immediate schema change. When dropping a table
// in a session, the data and the descriptor are not deleted.
// Instead, that is taken care of asynchronously by the schema
// change manager, which is notified via a system config gossip.
// The schema change manager will properly schedule deletion of
// the underlying data when the GC deadline expires.
return p.writeDropTable(ctx, tableDesc, queueJob, jobDesc)
}

// Mark jobs as succeeded when possible, but be defensive about jobs that
// are already in a terminal state or nonexistent. This could happen for
// schema change jobs that couldn't be successfully reverted and ended up in
// a failed state. Such jobs could have already been GCed from the jobs
// table by the time this code runs.
func (p *planner) markTableMutationJobsSuccessful(
ctx context.Context, tableDesc *tabledesc.Mutable,
) error {
for _, mj := range tableDesc.MutationJobs {
jobID := mj.JobID
mutationJob, err := p.execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
if jobs.HasJobNotFoundError(err) {
Expand Down Expand Up @@ -498,14 +503,7 @@ func (p *planner) initiateDropTable(
}
delete(p.ExtendedEvalContext().SchemaChangeJobCache, tableDesc.ID)
}

// Initiate an immediate schema change. When dropping a table
// in a session, the data and the descriptor are not deleted.
// Instead, that is taken care of asynchronously by the schema
// change manager, which is notified via a system config gossip.
// The schema change manager will properly schedule deletion of
// the underlying data when the GC deadline expires.
return p.writeDropTable(ctx, tableDesc, queueJob, jobDesc)
return nil
}

func (p *planner) removeFKForBackReference(
Expand Down
336 changes: 336 additions & 0 deletions pkg/sql/tests/truncate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// TestTruncateWithConcurrentMutations is a regression test to cover a
// situation where a table is truncated while concurrent schema changes are
// performed. Prior to the commit introducing this test, all concurrent
// mutations were made public and the corresponding mutation jobs would not be
// dealt with. This could lead to tables which cannot be changed by schema
// changes and have invalid secondary indexes. Instead we now allowlist specific
// interactions and reject the rest. This test exercises these scenarios.
func TestTruncateWithConcurrentMutations(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
type blockType int
const (
blockAfterBackfill blockType = iota // default
blockBeforeResume
)
type validateQuery struct {
stmt string
optionalResults [][]string
}
type testCase struct {
blockType blockType
setupStmts []string
name string
expErrRE string
truncateStmt string
stmts []string
validations []validateQuery
}
run := func(t *testing.T, testC testCase) {
var (
blocked = make(chan struct{})
unblock = make(chan error)
tc *testcluster.TestCluster
)
{
settings := cluster.MakeTestingClusterSettings()
stats.AutomaticStatisticsClusterMode.Override(&settings.SV, false)
scKnobs := &sql.SchemaChangerTestingKnobs{}
blockFunc := func(jobID int64) error {
select {
case blocked <- struct{}{}:
case err := <-unblock:
return err
}
return <-unblock
}
switch testC.blockType {
case blockAfterBackfill:
scKnobs.RunAfterBackfill = blockFunc
case blockBeforeResume:
scKnobs.RunBeforeResume = blockFunc
}
tc = testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
SQLSchemaChanger: scKnobs,
},
},
})
defer tc.Stopper().Stop(ctx)
}

db := tc.ServerConn(0)
tdb := sqlutils.MakeSQLRunner(db)

// Support setup schema changes.
close(unblock)
for _, stmt := range testC.setupStmts {
tdb.Exec(t, stmt)
}

// Block the main schema changes.
unblock = make(chan error)

// Create an index concurrently and make sure it blocks.
var g errgroup.Group
g.Go(func() error {
if len(testC.stmts) == 1 {
_, err := db.Exec(testC.stmts[0])
return err
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
for _, stmt := range testC.stmts {
_, err = tx.Exec(stmt)
if err != nil {
_ = tx.Rollback()
return err
}
}
return tx.Commit()
})
errCh := make(chan error)
go func() { errCh <- g.Wait() }()

select {
case err := <-errCh:
t.Fatal(err) // concurrent txn failed
case <-blocked:
}

_, err := db.Exec(testC.truncateStmt)
if testC.expErrRE == "" {
assert.NoError(t, err)
} else {
assert.Regexp(t, testC.expErrRE, err)
}

close(unblock)
require.NoError(t, <-errCh)

for _, v := range testC.validations {
if v.optionalResults != nil {
tdb.CheckQueryResults(t, v.stmt, v.optionalResults)
} else {
tdb.Exec(t, v.stmt)
}
}
}
const (
commonCreateTable = `CREATE TABLE t (i INT PRIMARY KEY, j INT)`
commonPopulateData = `INSERT INTO t SELECT i, i+1 as j from generate_series(1, 100, 1) as t(i)`
)
commonValidations := []validateQuery{
{
stmt: "ALTER TABLE t ADD COLUMN added_column INT",
},
}
commonIdxValidations := append([]validateQuery{
{
stmt: "SELECT count(*) FROM t",
optionalResults: [][]string{{"0"}},
},
{
stmt: "SELECT count(*) FROM t@idx",
optionalResults: [][]string{{"0"}},
},
}, commonValidations...)

cases := []testCase{
{
name: "add index",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`CREATE INDEX idx ON t(j)`,
},
validations: commonIdxValidations,
},
{
name: "add index with column",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t ADD COLUMN k INT`,
`CREATE INDEX idx ON t(j, k)`,
},
validations: commonIdxValidations,
},
{
blockType: blockBeforeResume,
name: "drop index",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
`ALTER TABLE t ADD COLUMN k INT`,
`CREATE INDEX idx ON t(j, k)`,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`DROP INDEX t@idx`,
},
expErrRE: `unimplemented: cannot perform TRUNCATE on "t" which has indexes being dropped`,
},
{
name: "drop column with user-defined type",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
`CREATE TYPE typ AS ENUM ('a')`,
`ALTER TABLE t ADD COLUMN k typ`,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t DROP COLUMN k`,
},
expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has ` +
`a column \("k"\) being dropped which depends on another object`,
},
{
name: "drop column which uses sequence",
setupStmts: []string{
commonCreateTable,
`SET serial_normalization='sql_sequence'`,
`ALTER TABLE t ADD COLUMN k SERIAL`,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t DROP COLUMN k`,
},
validations: commonValidations,
},
{
name: "drop column which owns sequence",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
`CREATE SEQUENCE s OWNED BY t.j`,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t DROP COLUMN j`,
},
validations: commonValidations,
},
{
name: "alter primary key",
setupStmts: []string{
`CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL)`,
commonPopulateData,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (j)`,
},
expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ongoing primary key change`,
},
{
name: "add column",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t ADD COLUMN k INT`,
},
validations: commonValidations,
},
{
name: "add self fk",
blockType: blockBeforeResume,
setupStmts: []string{
commonCreateTable,
commonPopulateData,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t ADD CONSTRAINT fk FOREIGN KEY (j) REFERENCES t(i)`,
`INSERT INTO t VALUES (101, NULL)`,
},
expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ` +
`ongoing FOREIGN_KEY constraint change`,
},
{
name: "add other fk",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
`CREATE TABLE t2 (i INT PRIMARY KEY)`,
`INSERT INTO t2 SELECT i+1 as i from generate_series(1, 100, 1) as t(i)`,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t ADD CONSTRAINT fk FOREIGN KEY (j) REFERENCES t2(i)`,
},
expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ` +
`ongoing FOREIGN_KEY constraint change`,
},
{
name: "add check constraint",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t ADD CONSTRAINT c CHECK (j > 1)`,
},
expErrRE: `pq: unimplemented: cannot perform TRUNCATE on "t" which has an ongoing CHECK constraint change`,
},
{
name: "drop column",
setupStmts: []string{
commonCreateTable,
commonPopulateData,
},
truncateStmt: "TRUNCATE TABLE t",
stmts: []string{
`ALTER TABLE t DROP COLUMN j`,
},
validations: commonValidations,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) { run(t, tc) })
}
}
Loading

0 comments on commit 8575997

Please sign in to comment.