Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix:修复tcc-fence模式空回滚无限重试以及无法解决悬挂bug #684

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/rm/tcc/fence/fence_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (

// WithFence Execute the fence database operation first and then call back the business method
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
if err = DoFence(ctx, tx); err != nil {
if skip, err := DoFence(ctx, tx); err != nil {
return err
} else if skip {
return nil
}

if err := callback(); err != nil {
return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err)
}
Expand All @@ -47,20 +48,20 @@ func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err erro
// case 3: if fencePhase is FencePhaseCommit, will do commit fence operation.
// case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation.
// case 5: if fencePhase not in above case, will return a fence phase illegal error.
func DoFence(ctx context.Context, tx *sql.Tx) error {
func DoFence(ctx context.Context, tx *sql.Tx) (bool, error) {
hd := handler.GetFenceHandler()
phase := tm.GetFencePhase(ctx)

switch phase {
case enum.FencePhaseNotExist:
return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
return false, fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
case enum.FencePhasePrepare:
return hd.PrepareFence(ctx, tx)
return false, hd.PrepareFence(ctx, tx)
case enum.FencePhaseCommit:
return hd.CommitFence(ctx, tx)
return false, hd.CommitFence(ctx, tx)
case enum.FencePhaseRollback:
return hd.RollbackFence(ctx, tx)
}

return fmt.Errorf("fence phase: %v illegal", phase)
return false, fmt.Errorf("fence phase: %v illegal", phase)
}
14 changes: 7 additions & 7 deletions pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,37 +110,37 @@ func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.
return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted)
}

func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) error {
func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) (bool, error) {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName
fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
if err != nil {
return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
return false, fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}

// record is null, mean the need suspend
if fenceDo == nil {
err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended)
if err != nil {
return fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
return false, fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}
log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId)
return nil
return true, tx.Commit()
}

// have rollbacked or suspended
if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
// enable warn level
log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
return nil
return true, tx.Commit()
}
if fenceDo.Status == enum.StatusCommitted {
log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return false, fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
}

return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked)
return false, handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked)
}

func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, br
if err = result.Scan(&xid, &branchId, &actionName, &status, &gmtCreate, &gmtModify); err != nil {
// will return error, if rows is empty
if err.Error() == "sql: no rows in result set" {
return nil, fmt.Errorf("query tcc fence get scan row,no rows in result set, [%w]", err)
return nil, nil
} else {
return nil, fmt.Errorf("query tcc fence get scan row failed, [%w]", err)
}
Expand Down
Loading