Skip to content

Commit

Permalink
sql: block schema changes whilst adding or dropping TTL
Browse files Browse the repository at this point in the history
This commit blocks other schema changes whilst TTL is running, analagous
to ALTER COLUMN TYPE / ALTER PRIMARY KEY. This saves the state space to
think about in conjunction with other schema changes.

We also block changes to TTL whilst another schema change is running for
similar reasons.

Release note: None
  • Loading branch information
otan committed Feb 13, 2022
1 parent 1e5ebd7 commit 214362e
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 22 deletions.
24 changes: 22 additions & 2 deletions pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,15 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
desc.validateConstraintIDs(vea)
}

// Ensure that mutations cannot be queued if a primary key change or
// an alter column type schema change has either been started in
// Ensure that mutations cannot be queued if a primary key change, TTL change
// or an alter column type schema change has either been started in
// this transaction, or is currently in progress.
var alterPKMutation descpb.MutationID
var alterColumnTypeMutation descpb.MutationID
var modifyTTLMutation descpb.MutationID
var foundAlterPK bool
var foundAlterColumnType bool
var foundModifyTTL bool

for _, m := range desc.Mutations {
// If we have seen an alter primary key mutation, then
Expand Down Expand Up @@ -585,6 +587,20 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}
return
}
if foundModifyTTL {
if modifyTTLMutation == m.MutationID {
vea.Report(pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot perform other schema changes in the same transaction as a TTL mutation",
))
} else {
vea.Report(pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot perform a schema change operation while a TTL change is in progress",
))
}
return
}
if m.GetPrimaryKeySwap() != nil {
foundAlterPK = true
alterPKMutation = m.MutationID
Expand All @@ -593,6 +609,10 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
foundAlterColumnType = true
alterColumnTypeMutation = m.MutationID
}
if m.GetModifyRowLevelTTL() != nil {
foundModifyTTL = true
modifyTTLMutation = m.MutationID
}
}

// Validate that the presence of MutationJobs (from the old schema changer)
Expand Down
48 changes: 48 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/row_level_ttl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ CREATE TABLE public.tbl (
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
) WITH (ttl = 'on', ttl_automatic_column = 'on', ttl_expire_after = '10 days':::INTERVAL)

statement error cannot modify TTL whilst another schema change is in progress
ALTER TABLE tbl RESET (ttl), RESET (ttl_expire_after)


statement error cannot modify TTL whilst another schema change is in progress
BEGIN;
ALTER TABLE tbl RESET (ttl);
ALTER TABLE tbl SET (ttl_select_batch_size = 200)

statement ok
ROLLBACK

statement error cannot perform other schema changes in the same transaction as a TTL mutation
BEGIN;
ALTER TABLE tbl RESET (ttl);
CREATE INDEX tbl_idx ON tbl (text)

statement ok
ROLLBACK

# Test when we drop the TTL, ensure column is dropped and the scheduled job is removed.
statement ok
ALTER TABLE tbl RESET (ttl)
Expand Down Expand Up @@ -358,6 +378,34 @@ CREATE TABLE tbl (
FAMILY (id, text)
)

statement error cannot modify TTL whilst another schema change is in progress
ALTER TABLE tbl SET (ttl_expire_after = '10 minutes'), SET (ttl_select_batch_size = 200)

statement error cannot modify TTL whilst another schema change is in progress
BEGIN;
ALTER TABLE tbl SET (ttl_expire_after = '10 minutes');
ALTER TABLE tbl RESET (ttl_select_batch_size)

statement ok
ROLLBACK


statement error cannot modify TTL whilst another schema change is in progress
BEGIN;
CREATE INDEX tbl_idx ON tbl (text);
ALTER TABLE tbl SET (ttl_expire_after = '10 minutes');

statement ok
ROLLBACK

statement error cannot perform other schema changes in the same transaction as a TTL mutation
BEGIN;
ALTER TABLE tbl SET (ttl_expire_after = '10 minutes');
CREATE INDEX tbl_idx ON tbl (text)

statement ok
ROLLBACK

statement ok
ALTER TABLE tbl SET (ttl_expire_after = '10 minutes', ttl_select_batch_size = 200)

Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/paramparse/paramobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package paramparse

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/geo/geoindex"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
Expand Down Expand Up @@ -412,6 +413,12 @@ func (po *TableStorageParamObserver) onSet(
key string,
datum tree.Datum,
) error {
if strings.HasPrefix(key, "ttl_") && len(po.tableDesc.AllMutations()) > 0 {
return pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot modify TTL whilst another schema change is in progress",
)
}
if p, ok := tableParams[key]; ok {
return p.onSet(ctx, po, semaCtx, evalCtx, key, datum)
}
Expand All @@ -420,6 +427,12 @@ func (po *TableStorageParamObserver) onSet(

// onReset implements the StorageParamObserver interface.
func (po *TableStorageParamObserver) onReset(evalCtx *tree.EvalContext, key string) error {
if strings.HasPrefix(key, "ttl_") && len(po.tableDesc.AllMutations()) > 0 {
return pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot modify TTL whilst another schema change is in progress",
)
}
if p, ok := tableParams[key]; ok {
return p.onReset(po, evalCtx, key)
}
Expand Down
114 changes: 94 additions & 20 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7419,16 +7419,6 @@ CREATE TABLE t.test (id TEXT PRIMARY KEY) WITH (ttl_expire_after = '10 hours');`
expectedShowCreateTable: expectNonTTLTable,
expectSchedule: false,
},
{
desc: "error during ALTER TABLE ... SET (ttl_expire_after ...) when tied to another mutation which fails",
setup: createNonTTLTable,
schemaChange: `BEGIN; ALTER TABLE t.test SET (ttl_expire_after = '10 hours'); CREATE INDEX test_idx ON t.test(id); COMMIT`,
knobs: &sql.SchemaChangerTestingKnobs{
RunBeforeIndexValidation: failFunc,
},
expectedShowCreateTable: expectNonTTLTable,
expectSchedule: false,
},
{
desc: "error during ALTER TABLE ... RESET (ttl) during delete column mutation",
setup: createTTLTable,
Expand All @@ -7449,16 +7439,6 @@ CREATE TABLE t.test (id TEXT PRIMARY KEY) WITH (ttl_expire_after = '10 hours');`
expectedShowCreateTable: expectTTLTable,
expectSchedule: true,
},
{
desc: "error during ALTER TABLE ... SET (ttl_expire_after ...) when tied to another mutation which fails",
setup: createTTLTable,
schemaChange: `BEGIN; ALTER TABLE t.test RESET (ttl); CREATE INDEX test_idx ON t.test(id); COMMIT`,
knobs: &sql.SchemaChangerTestingKnobs{
RunBeforeIndexValidation: failFunc,
},
expectedShowCreateTable: expectTTLTable,
expectSchedule: true,
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -7514,3 +7494,97 @@ CREATE TABLE t.test (id TEXT PRIMARY KEY) WITH (ttl_expire_after = '10 hours');`
})
}
}

func TestSchemaChangeWhileAddingOrDroppingTTL(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testCases := []struct {
desc string
setup string
successfulChange string
conflictingSchemaChange string
expected func(uint32) string
}{
{
desc: `during adding TTL`,
setup: `
CREATE DATABASE t;
CREATE TABLE t.test (x INT);`,
successfulChange: `ALTER TABLE t.test SET (ttl_expire_after = '10 minutes')`,
conflictingSchemaChange: `ALTER TABLE t.test ADD COLUMN y int`,
expected: func(tableID uint32) string {
return fmt.Sprintf(`pq: relation "test" \(%d\): cannot perform a schema change operation while a TTL change is in progress`, tableID)
},
},
{
desc: `during dropping TTL`,
setup: `
CREATE DATABASE t;
CREATE TABLE t.test (x INT) WITH (ttl_expire_after = '10 minutes');`,
successfulChange: `ALTER TABLE t.test RESET (ttl)`,
conflictingSchemaChange: `ALTER TABLE t.test ADD COLUMN y int`,
expected: func(tableID uint32) string {
return fmt.Sprintf(`pq: relation "test" \(%d\): cannot perform a schema change operation while a TTL change is in progress`, tableID)
},
},

{
desc: `TTL change whilst adding column`,
setup: `
CREATE DATABASE t;
CREATE TABLE t.test (x INT);`,
successfulChange: `ALTER TABLE t.test ADD COLUMN y int`,
conflictingSchemaChange: `ALTER TABLE t.test SET (ttl_expire_after = '10 minutes')`,
expected: func(tableID uint32) string {
return `pq: cannot modify TTL whilst another schema change is in progress`
},
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
params, _ := tests.CreateTestServerParams()
childJobStartNotification := make(chan struct{})
waitBeforeContinuing := make(chan struct{})
var doOnce sync.Once
waitFunc := func() error {
doOnce.Do(func() {
childJobStartNotification <- struct{}{}
<-waitBeforeContinuing
})
return nil
}
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeBackfill: waitFunc,
RunBeforeModifyRowLevelTTL: waitFunc,
},
}

s, db, _ := serverutils.StartServer(t, params)
sqlDB := sqlutils.MakeSQLRunner(db)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

sqlDB.Exec(t, tc.setup)

tableID := sqlutils.QueryTableID(t, db, "t", "public", "test")

var wg sync.WaitGroup
wg.Add(1)
go func() {
sqlDB.Exec(t, tc.successfulChange)
wg.Done()
}()

<-childJobStartNotification

expected := tc.expected(tableID)
sqlDB.ExpectErr(t, expected, tc.conflictingSchemaChange)

waitBeforeContinuing <- struct{}{}
wg.Wait()
})
}
}

0 comments on commit 214362e

Please sign in to comment.