Skip to content

Commit

Permalink
*: add foreign key cascade delete when execute delete statement (#38377)
Browse files Browse the repository at this point in the history
close #38376
  • Loading branch information
crazycs520 authored Oct 21, 2022
1 parent 8d57ed9 commit 4346898
Show file tree
Hide file tree
Showing 16 changed files with 871 additions and 46 deletions.
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ const (
ErrRowInWrongPartition = 1863
ErrErrorLast = 1863
ErrMaxExecTimeExceeded = 1907
ErrForeignKeyCascadeDepthExceeded = 3008
ErrInvalidFieldSize = 3013
ErrInvalidArgumentForLogarithm = 3020
ErrAggregateOrderNonAggQuery = 3029
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrAccountHasBeenLocked: mysql.Message("Access denied for user '%s'@'%s'. Account is locked.", nil),
ErrWarnConflictingHint: mysql.Message("Hint %s is ignored as conflicting/duplicated.", nil),
ErrUnresolvedHintName: mysql.Message("Unresolved name '%s' for %s hint", nil),
ErrForeignKeyCascadeDepthExceeded: mysql.Message("Foreign key cascade delete/update exceeds max depth of %v.", nil),
ErrInvalidFieldSize: mysql.Message("Invalid size for column '%s'.", nil),
ErrInvalidArgumentForLogarithm: mysql.Message("Invalid argument for logarithm", nil),
ErrAggregateOrderNonAggQuery: mysql.Message("Expression #%d of ORDER BY contains aggregate function and applies to the result of a non-aggregated query", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,11 @@ error = '''
The password hash doesn't have the expected format. Check if the correct password algorithm is being used with the PASSWORD() function.
'''

["executor:3008"]
error = '''
Foreign key cascade delete/update exceeds max depth of %v.
'''

["executor:3523"]
error = '''
Unknown authorization ID %.256s
Expand Down
149 changes: 126 additions & 23 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
return a.handlePessimisticSelectForUpdate(ctx, e)
}

// In function handlePessimisticDML may rebuild a Executor when handlePessimisticLockError,
// so need to return the rebuild executor.
if handled, result, e, err := a.handleNoDelay(ctx, e, isPessimistic); handled || err != nil {
if err != nil {
return result, err
}
err = a.handleForeignKeyTrigger(ctx, e)
a.prepareFKCascadeContext(e)
if handled, result, err := a.handleNoDelay(ctx, e, isPessimistic); handled || err != nil {
return result, err
}

Expand All @@ -561,22 +556,129 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}, nil
}

func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e Executor) error {
func (a *ExecStmt) handleStmtForeignKeyTrigger(ctx context.Context, e Executor) error {
stmtCtx := a.Ctx.GetSessionVars().StmtCtx
if stmtCtx.ForeignKeyTriggerCtx.HasFKCascades {
// If the ExecStmt has foreign key cascade to be executed, we need call `StmtCommit` to commit the ExecStmt itself
// change first.
// Since `UnionScanExec` use `SnapshotIter` and `SnapshotGetter` to read txn mem-buffer, if we don't do `StmtCommit`,
// then the fk cascade executor can't read the mem-buffer changed by the ExecStmt.
a.Ctx.StmtCommit()
}
err := a.handleForeignKeyTrigger(ctx, e, 1)
if err != nil {
err1 := a.handleFKTriggerError(stmtCtx)
if err1 != nil {
return errors.Errorf("handle foreign key trigger error failed, err: %v, original_err: %v", err1, err)
}
return err
}
if stmtCtx.ForeignKeyTriggerCtx.SavepointName != "" {
a.Ctx.GetSessionVars().TxnCtx.ReleaseSavepoint(stmtCtx.ForeignKeyTriggerCtx.SavepointName)
}
return nil
}

var maxForeignKeyCascadeDepth = 15

func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e Executor, depth int) error {
exec, ok := e.(WithForeignKeyTrigger)
if !ok {
return nil
}
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = true
defer func() {
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = false
}()
fkChecks := exec.GetFKChecks()
for _, fkCheck := range fkChecks {
err := fkCheck.doCheck(ctx)
if err != nil {
return err
}
}
fkCascades := exec.GetFKCascades()
for _, fkCascade := range fkCascades {
err := a.handleForeignKeyCascade(ctx, fkCascade, depth)
if err != nil {
return err
}
}
return nil
}

func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeExec, depth int) error {
if len(fkc.fkValues) == 0 {
return nil
}
if depth > maxForeignKeyCascadeDepth {
return ErrForeignKeyCascadeDepthExceeded.GenWithStackByArgs(maxForeignKeyCascadeDepth)
}
e, err := fkc.buildExecutor(ctx)
if err != nil || e == nil {
return err
}
if err := e.Open(ctx); err != nil {
terror.Call(e.Close)
return err
}
err = Next(ctx, e, newFirstChunk(e))
if err != nil {
return err
}
// Call `StmtCommit` uses to flush the fk cascade executor change into txn mem-buffer,
// then the later fk cascade executors can see the mem-buffer changes.
a.Ctx.StmtCommit()
return a.handleForeignKeyTrigger(ctx, e, depth+1)
}

// prepareFKCascadeContext records a transaction savepoint for foreign key cascade when this ExecStmt has foreign key
// cascade behaviour and this ExecStmt is in transaction.
func (a *ExecStmt) prepareFKCascadeContext(e Executor) {
exec, ok := e.(WithForeignKeyTrigger)
if !ok || !exec.HasFKCascades() {
return
}
sessVar := a.Ctx.GetSessionVars()
sessVar.StmtCtx.ForeignKeyTriggerCtx.HasFKCascades = true
if !sessVar.InTxn() {
return
}
txn, err := a.Ctx.Txn(false)
if err != nil || !txn.Valid() {
return
}
// Record a txn savepoint if ExecStmt in transaction, the savepoint is use to do rollback when handle foreign key
// cascade failed.
savepointName := "fk_sp_" + strconv.FormatUint(txn.StartTS(), 10)
memDBCheckpoint := txn.GetMemDBCheckpoint()
sessVar.TxnCtx.AddSavepoint(savepointName, memDBCheckpoint)
sessVar.StmtCtx.ForeignKeyTriggerCtx.SavepointName = savepointName
}

func (a *ExecStmt) handleFKTriggerError(sc *stmtctx.StatementContext) error {
if sc.ForeignKeyTriggerCtx.SavepointName == "" {
return nil
}
txn, err := a.Ctx.Txn(false)
if err != nil || !txn.Valid() {
return err
}
savepointRecord := a.Ctx.GetSessionVars().TxnCtx.RollbackToSavepoint(sc.ForeignKeyTriggerCtx.SavepointName)
if savepointRecord == nil {
// Normally should never run into here, but just in case, rollback the transaction.
err = txn.Rollback()
if err != nil {
return err
}
return errors.Errorf("foreign key cascade savepoint '%s' not found, transaction is rollback, should never happen", sc.ForeignKeyTriggerCtx.SavepointName)
}
txn.RollbackMemDBToCheckpoint(savepointRecord.MemDBCheckpoint)
a.Ctx.GetSessionVars().TxnCtx.ReleaseSavepoint(sc.ForeignKeyTriggerCtx.SavepointName)
return nil
}

func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic bool) (handled bool, rs sqlexec.RecordSet, _ Executor, err error) {
func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic bool) (handled bool, rs sqlexec.RecordSet, err error) {
sc := a.Ctx.GetSessionVars().StmtCtx
defer func() {
// If the stmt have no rs like `insert`, The session tracker detachment will be directly
Expand Down Expand Up @@ -606,20 +708,20 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
if toCheck.Schema().Len() == 0 {
handled = !isExplainAnalyze
if isPessimistic {
e, err := a.handlePessimisticDML(ctx, toCheck)
return handled, nil, e, err
err := a.handlePessimisticDML(ctx, toCheck)
return handled, nil, err
}
r, err := a.handleNoDelayExecutor(ctx, toCheck)
return handled, r, e, err
return handled, r, err
} else if proj, ok := toCheck.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
r, err := a.handleNoDelayExecutor(ctx, e)
return true, r, e, err
return true, r, err
}

return false, nil, e, nil
return false, nil, nil
}

func isNoResultPlan(p plannercore.Plan) bool {
Expand Down Expand Up @@ -765,17 +867,18 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
if err != nil {
return nil, err
}
err = a.handleStmtForeignKeyTrigger(ctx, e)
return nil, err
}

func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) (_ Executor, err error) {
func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) (err error) {
sctx := a.Ctx
// Do not activate the transaction here.
// When autocommit = 0 and transaction in pessimistic mode,
// statements like set xxx = xxx; should not active the transaction.
txn, err := sctx.Txn(false)
if err != nil {
return e, err
return err
}
txnCtx := sctx.GetSessionVars().TxnCtx
defer func() {
Expand Down Expand Up @@ -803,7 +906,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) (_ Exec
startPointGetLocking := time.Now()
_, err = a.handleNoDelayExecutor(ctx, e)
if !txn.Valid() {
return e, err
return err
}
if err != nil {
// It is possible the DML has point get plan that locks the key.
Expand All @@ -812,24 +915,24 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) (_ Exec
if ErrDeadlock.Equal(err) {
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startPointGetLocking).Seconds())
}
return e, err
return err
}
continue
}
keys, err1 := txn.(pessimisticTxn).KeysNeedToLock()
if err1 != nil {
return e, err1
return err1
}
keys = txnCtx.CollectUnchangedRowKeys(keys)
if len(keys) == 0 {
return e, nil
return nil
}
keys = filterTemporaryTableKeys(sctx.GetSessionVars(), keys)
seVars := sctx.GetSessionVars()
keys = filterLockTableKeys(seVars.StmtCtx, keys)
lockCtx, err := newLockCtx(sctx, seVars.LockWaitTimeout, len(keys))
if err != nil {
return e, err
return err
}
var lockKeyStats *util.LockKeysDetails
ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats)
Expand All @@ -840,15 +943,15 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) (_ Exec
seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats)
}
if err == nil {
return e, nil
return nil
}
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
// todo: Report deadlock
if ErrDeadlock.Equal(err) {
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startLocking).Seconds())
}
return e, err
return err
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2301,6 +2301,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
if b.err != nil {
return nil
}
deleteExec.fkCascades, b.err = b.buildTblID2FKCascadeExecs(tblID2table, v.FKCascades)
if b.err != nil {
return nil
}
return deleteExec
}

Expand Down
37 changes: 34 additions & 3 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type DeleteExec struct {
memTracker *memory.Tracker
// fkChecks contains the foreign key checkers. the map is tableID -> []*FKCheckExec
fkChecks map[int64][]*FKCheckExec
// fkCascades contains the foreign key cascade. the map is tableID -> []*FKCascadeExec
fkCascades map[int64][]*FKCascadeExec
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -241,16 +243,31 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
if err != nil {
return err
}
err = e.onRemoveRowForFK(ctx, t, data)
if err != nil {
return err
}
e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState))
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return nil
}

func (e *DeleteExec) onRemoveRowForFK(ctx sessionctx.Context, t table.Table, data []types.Datum) error {
fkChecks := e.fkChecks[t.Meta().ID]
sc := ctx.GetSessionVars().StmtCtx
for _, fkc := range fkChecks {
err = fkc.deleteRowNeedToCheck(sc, data)
err := fkc.deleteRowNeedToCheck(sc, data)
if err != nil {
return err
}
}
fkCascades := e.fkCascades[t.Meta().ID]
for _, fkc := range fkCascades {
err := fkc.onDeleteRow(sc, data)
if err != nil {
return err
}
}
e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState))
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return nil
}

Expand All @@ -277,6 +294,20 @@ func (e *DeleteExec) GetFKChecks() []*FKCheckExec {
return fkChecks
}

// GetFKCascades implements WithForeignKeyTrigger interface.
func (e *DeleteExec) GetFKCascades() []*FKCascadeExec {
fkCascades := []*FKCascadeExec{}
for _, fkcs := range e.fkCascades {
fkCascades = append(fkCascades, fkcs...)
}
return fkCascades
}

// HasFKCascades implements WithForeignKeyTrigger interface.
func (e *DeleteExec) HasFKCascades() bool {
return len(e.fkCascades) > 0
}

// tableRowMapType is a map for unique (Table, Row) pair. key is the tableID.
// the key in map[int64]Row is the joined table handle, which represent a unique reference row.
// the value in map[int64]Row is the deleting row.
Expand Down
21 changes: 11 additions & 10 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,17 @@ var (
ErrSettingNoopVariable = dbterror.ClassExecutor.NewStd(mysql.ErrSettingNoopVariable)
ErrLazyUniquenessCheckFailure = dbterror.ClassExecutor.NewStd(mysql.ErrLazyUniquenessCheckFailure)

ErrBRIEBackupFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEBackupFailed)
ErrBRIERestoreFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIERestoreFailed)
ErrBRIEImportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEImportFailed)
ErrBRIEExportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEExportFailed)
ErrCTEMaxRecursionDepth = dbterror.ClassExecutor.NewStd(mysql.ErrCTEMaxRecursionDepth)
ErrNotSupportedWithSem = dbterror.ClassOptimizer.NewStd(mysql.ErrNotSupportedWithSem)
ErrPluginIsNotLoaded = dbterror.ClassExecutor.NewStd(mysql.ErrPluginIsNotLoaded)
ErrSetPasswordAuthPlugin = dbterror.ClassExecutor.NewStd(mysql.ErrSetPasswordAuthPlugin)
ErrFuncNotEnabled = dbterror.ClassExecutor.NewStdErr(mysql.ErrNotSupportedYet, parser_mysql.Message("%-.32s is not supported. To enable this experimental feature, set '%-.32s' in the configuration file.", nil))
errSavepointNotExists = dbterror.ClassExecutor.NewStd(mysql.ErrSpDoesNotExist)
ErrBRIEBackupFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEBackupFailed)
ErrBRIERestoreFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIERestoreFailed)
ErrBRIEImportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEImportFailed)
ErrBRIEExportFailed = dbterror.ClassExecutor.NewStd(mysql.ErrBRIEExportFailed)
ErrCTEMaxRecursionDepth = dbterror.ClassExecutor.NewStd(mysql.ErrCTEMaxRecursionDepth)
ErrNotSupportedWithSem = dbterror.ClassOptimizer.NewStd(mysql.ErrNotSupportedWithSem)
ErrPluginIsNotLoaded = dbterror.ClassExecutor.NewStd(mysql.ErrPluginIsNotLoaded)
ErrSetPasswordAuthPlugin = dbterror.ClassExecutor.NewStd(mysql.ErrSetPasswordAuthPlugin)
ErrFuncNotEnabled = dbterror.ClassExecutor.NewStdErr(mysql.ErrNotSupportedYet, parser_mysql.Message("%-.32s is not supported. To enable this experimental feature, set '%-.32s' in the configuration file.", nil))
errSavepointNotExists = dbterror.ClassExecutor.NewStd(mysql.ErrSpDoesNotExist)
ErrForeignKeyCascadeDepthExceeded = dbterror.ClassExecutor.NewStd(mysql.ErrForeignKeyCascadeDepthExceeded)

ErrWrongStringLength = dbterror.ClassDDL.NewStd(mysql.ErrWrongStringLength)
errUnsupportedFlashbackTmpTable = dbterror.ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message("Recover/flashback table is not supported on temporary tables", nil))
Expand Down
Loading

0 comments on commit 4346898

Please sign in to comment.