Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix w-w conflict for add/drop FK #20804

Merged
merged 10 commits into from
Dec 19, 2024
110 changes: 83 additions & 27 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,25 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
return err
}

/*
collect old fk names.
ForeignKeyDef.Name may be empty in previous design.
So, we only use ForeignKeyDef.Name that is no empty.
*/
oldFkNames := make(map[string]bool)
for _, ct := range oldCt.Cts {
switch t := ct.(type) {
case *engine.ForeignKeyDef:
for _, fkey := range t.Fkeys {
if len(fkey.Name) != 0 {
oldFkNames[fkey.Name] = true
}
}
}
}
//added fk in this alter table statement
newAddedFkNames := make(map[string]bool)

if c.proc.GetTxnOperator().Txn().IsPessimistic() {
var retryErr error
// 1. lock origin table metadata in catalog
Expand All @@ -430,6 +449,59 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
}
retryErr = err
}

// 3. lock foreign key's table
for _, action := range qry.Actions {
switch act := action.Action.(type) {
case *plan.AlterTable_Action_Drop:
alterTableDrop := act.Drop
constraintName := alterTableDrop.Name
if alterTableDrop.Typ == plan.AlterTableDrop_FOREIGN_KEY {
//check fk existed in table
if _, has := oldFkNames[constraintName]; !has {
return moerr.NewErrCantDropFieldOrKey(c.proc.Ctx, constraintName)
}
for _, fk := range tableDef.Fkeys {
if fk.Name == constraintName && fk.ForeignTbl != 0 { //skip self ref foreign key
// lock fk table
fkDbName, fkTableName, err := c.e.GetNameById(c.proc.Ctx, c.proc.GetTxnOperator(), fk.ForeignTbl)
if err != nil {
return err
}
if err = lockMoTable(c, fkDbName, fkTableName, lock.LockMode_Exclusive); err != nil {
if !moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return err
}
retryErr = moerr.NewTxnNeedRetryWithDefChangedNoCtx()
}
}
}
}
case *plan.AlterTable_Action_AddFk:
//check fk existed in table
if _, has := oldFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
//check fk existed in this alter table statement
if _, has := newAddedFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
newAddedFkNames[act.AddFk.Fkey.Name] = true

// lock fk table
if !(act.AddFk.DbName != dbName && act.AddFk.TableName != tblName) { //skip self ref foreign key
if err = lockMoTable(c, act.AddFk.DbName, act.AddFk.TableName, lock.LockMode_Exclusive); err != nil {
if !moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) &&
!moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetryWithDefChanged) {
return err
}
retryErr = moerr.NewTxnNeedRetryWithDefChangedNoCtx()
}
}
}
}

if retryErr != nil {
return retryErr
}
Expand All @@ -438,25 +510,6 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
Cts: []engine.Constraint{},
}

//added fk in this alter table statement
newAddedFkNames := make(map[string]bool)
/*
collect old fk names.
ForeignKeyDef.Name may be empty in previous design.
So, we only use ForeignKeyDef.Name that is no empty.
*/
oldFkNames := make(map[string]bool)
for _, ct := range oldCt.Cts {
switch t := ct.(type) {
case *engine.ForeignKeyDef:
for _, fkey := range t.Fkeys {
if len(fkey.Name) != 0 {
oldFkNames[fkey.Name] = true
}
}
}
}

removeRefChildTbls := make(map[string]uint64)
var addRefChildTbls []uint64
var newFkeys []*plan.ForeignKeyDef
Expand Down Expand Up @@ -537,15 +590,18 @@ func (s *Scope) AlterTableInplace(c *Compile) error {
}
}
case *plan.AlterTable_Action_AddFk:
//check fk existed in table
if _, has := oldFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
//check fk existed in this alter table statement
if _, has := newAddedFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
if !c.proc.GetTxnOperator().Txn().IsPessimistic() {
//check fk existed in table
if _, has := oldFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
//check fk existed in this alter table statement
if _, has := newAddedFkNames[act.AddFk.Fkey.Name]; has {
return moerr.NewErrDuplicateKeyName(c.proc.Ctx, act.AddFk.Fkey.Name)
}
newAddedFkNames[act.AddFk.Fkey.Name] = true
}
newAddedFkNames[act.AddFk.Fkey.Name] = true

alterKinds = addAlterKind(alterKinds, api.AlterKind_UpdateConstraint)
addRefChildTbls = append(addRefChildTbls, act.AddFk.Fkey.ForeignTbl)
newFkeys = append(newFkeys, act.AddFk.Fkey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,18 @@ create table t2(a int);
begin;
insert into t2 values(1);
drop database db1;
commit;
create database db1;
use db1;
create table t1(a int primary key);
create table t2(a int primary key, b int);
begin;
insert into t1 values(1);
use db1;
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
commit;
begin;
insert into t1 values(2);
use db1;
alter table t2 drop foreign key fk_t2_t1;
commit;
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,25 @@ insert into t2 values(1);
-- @wait:0:commit
drop database db1;
-- @session}
commit;

create database db1;
use db1;
create table t1(a int primary key);
create table t2(a int primary key, b int);
begin;
insert into t1 values(1);
-- @session:id=1{
use db1;
-- @wait:0:commit
alter table t2 add constraint fk_t2_t1 foreign key(b) references t1(a);
-- @session}
commit;
begin;
insert into t1 values(2);
-- @session:id=1{
use db1;
-- @wait:0:commit
alter table t2 drop foreign key fk_t2_t1;
-- @session}
commit;
Loading