Skip to content

Commit

Permalink
fix w-w conflict for add/drop FK (matrixorigin#20804)
Browse files Browse the repository at this point in the history
fix w-w conflict for add/drop FK

Approved by: @badboynt1, @qingxinhome, @aunjgr, @heni02
  • Loading branch information
ouyuanning committed Dec 19, 2024
1 parent 8e80c44 commit 6f7621b
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 25 deletions.
102 changes: 79 additions & 23 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,25 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
return err
}

/*
collect old fk names.
ForeignKeyDef.Name may be empty in previous design.
So, we only use ForeignKeyDef.Name that is no empty.
*/
oldFkNames := make(map[string]bool)
for _, ct := range oldCt.Cts {
switch t := ct.(type) {
case *engine.ForeignKeyDef:
for _, fkey := range t.Fkeys {
if len(fkey.Name) != 0 {
oldFkNames[fkey.Name] = true
}
}
}
}
//added fk in this alter table statement
newAddedFkNames := make(map[string]bool)

if c.proc.GetTxnOperator().Txn().IsPessimistic() {
var retryErr error
// 1. lock origin table metadata in catalog
Expand All @@ -430,6 +449,59 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
}
retryErr = err
}

// 3. lock foreign key's table
for _, action := range qry.Actions {
switch act := action.Action.(type) {
case *plan.AlterTable_Action_Drop:
alterTableDrop := act.Drop
constraintName := alterTableDrop.Name
if alterTableDrop.Typ == plan.AlterTableDrop_FOREIGN_KEY {
//check fk existed in table
if _, has := oldFkNames[constraintName]; !has {
return moerr.NewErrCantDropFieldOrKey(c.proc.Ctx, constraintName)
}
for _, fk := range tableDef.Fkeys {
if fk.Name == constraintName && fk.ForeignTbl != 0 { //skip self ref foreign key
// lock fk table
fkDbName, fkTableName, err := c.e.GetNameById(c.proc.Ctx, c.proc.GetTxnOperator(), fk.ForeignTbl)
if err != nil {
return err
}
if err = lockMoTable(c, fkDbName, fkTableName, lock.LockMode_Exclusive); err != nil {
if !moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return err
}
retryErr = moerr.NewTxnNeedRetryWithDefChangedNoCtx()
}
}
}
}
case *plan.AlterTable_Action_AddFk:
//check fk existed in table
if _, has := oldFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
//check fk existed in this alter table statement
if _, has := newAddedFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
newAddedFkNames[act.AddFk.Fkey.Name] = true

// lock fk table
if !(act.AddFk.DbName != dbName && act.AddFk.TableName != tblName) { //skip self ref foreign key
if err = lockMoTable(c, act.AddFk.DbName, act.AddFk.TableName, lock.LockMode_Exclusive); err != nil {
if !moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return err
}
retryErr = moerr.NewTxnNeedRetryWithDefChangedNoCtx()
}
}
}
}

if retryErr != nil {
return retryErr
}
Expand All @@ -438,25 +510,6 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
Cts: []engine.Constraint{},
}

//added fk in this alter table statement
newAddedFkNames := make(map[string]bool)
/*
collect old fk names.
ForeignKeyDef.Name may be empty in previous design.
So, we only use ForeignKeyDef.Name that is no empty.
*/
oldFkNames := make(map[string]bool)
for _, ct := range oldCt.Cts {
switch t := ct.(type) {
case *engine.ForeignKeyDef:
for _, fkey := range t.Fkeys {
if len(fkey.Name) != 0 {
oldFkNames[fkey.Name] = true
}
}
}
}

removeRefChildTbls := make(map[string]uint64)
var addRefChildTbls []uint64
var newFkeys []*plan.ForeignKeyDef
Expand Down Expand Up @@ -541,11 +594,14 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
if _, has := oldFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
//check fk existed in this alter table statement
if _, has := newAddedFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
if !c.proc.GetTxnOperator().Txn().IsPessimistic() {
//check fk existed in this alter table statement
if _, has := newAddedFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
newAddedFkNames[act.AddFk.Fkey.Name] = true
}
newAddedFkNames[act.AddFk.Fkey.Name] = true

alterKinds = addAlterKind(alterKinds, api.AlterKind_UpdateConstraint)
addRefChildTbls = append(addRefChildTbls, act.AddFk.Fkey.ForeignTbl)
newFkeys = append(newFkeys, act.AddFk.Fkey)
Expand Down
13 changes: 12 additions & 1 deletion test/distributed/cases/foreign_key/fk_base.result
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,15 @@ use db2;
show tables;
Tables_in_db2
drop database db2;
drop account if exists acc1;
drop account if exists acc1;
drop database if exists db1;
create database db1;
use db1;
create table t1(a int primary key);
create table t2(a int primary key, b int);
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a), add constraint fk_t2_t1 foreign key(b) references t1(a);
Duplicate foreign key constraint name 'fk_t2_t1'
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
Duplicate foreign key constraint name 'fk_t2_t1'
drop database db1;
12 changes: 11 additions & 1 deletion test/distributed/cases/foreign_key/fk_base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,14 @@ use db2;
show tables;
drop database db2;
-- @session
drop account if exists acc1;
drop account if exists acc1;

drop database if exists db1;
create database db1;
use db1;
create table t1(a int primary key);
create table t2(a int primary key, b int);
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a), add constraint fk_t2_t1 foreign key(b) references t1(a);
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
drop database db1;
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,18 @@ create table t2(a int);
begin;
insert into t2 values(1);
drop database db1;
commit;
create database db1;
use db1;
create table t1(a int primary key);
create table t2(a int primary key, b int);
begin;
insert into t1 values(1);
use db1;
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
commit;
begin;
insert into t1 values(2);
use db1;
alter table t2 drop foreign key fk_t2_t1;
commit;
21 changes: 21 additions & 0 deletions test/distributed/cases/pessimistic_transaction/ddl_atomicity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,25 @@ insert into t2 values(1);
-- @wait:0:commit
drop database db1;
-- @session}
commit;

create database db1;
use db1;
create table t1(a int primary key);
create table t2(a int primary key, b int);
begin;
insert into t1 values(1);
-- @session:id=1{
use db1;
-- @wait:0:commit
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
-- @session}
commit;
begin;
insert into t1 values(2);
-- @session:id=1{
use db1;
-- @wait:0:commit
alter table t2 drop foreign key fk_t2_t1;
-- @session}
commit;

0 comments on commit 6f7621b

Please sign in to comment.