Skip to content

Commit

Permalink
run in new txn
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 committed Dec 8, 2021
1 parent e15e457 commit b0d7cff
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 11 deletions.
46 changes: 46 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
"github.com/tikv/client-go/v2/testutils"
Expand Down Expand Up @@ -7290,6 +7291,51 @@ func (s *testSerialDBSuite) TestJsonUnmarshalErrWhenPanicInCancellingPath(c *C)
c.Assert(err.Error(), Equals, "[kv:1062]Duplicate entry '0' for key 'cc'")
}

// Close issue #24172.
// See https://github.com/pingcap/tidb/issues/24172
func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int)")

var cancelErr error
var rs []sqlexec.RecordSet
hook := &ddl.TestDDLCallback{}
d := s.dom.DDL()
originalHook := d.GetHook()
d.(ddl.DDLForTest).SetHook(hook)
defer d.(ddl.DDLForTest).SetHook(originalHook)

// Cancelling will be retried but it still fails, and adding index succeeds.
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("everytime")`), IsNil)
defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }()
rs, cancelErr = tk1.Se.Execute(context.Background(), stmt)
}
}
tk.MustExec("alter table t add index (id)")
c.Assert(cancelErr.Error(), Equals, "mock commit error")

// Cancelling will be retried only once and it succeeds at the end.
var jobID int64
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
jobID = job.ID
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("once")`), IsNil)
defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }()
rs, cancelErr = tk1.Se.Execute(context.Background(), stmt)
}
}
tk.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob)
c.Assert(cancelErr, IsNil)
result := tk1.ResultSetToResultWithCtx(context.Background(), rs[0], Commentf("cancel ddl job fails"))
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

// For Close issue #24288
// see https://github.com/pingcap/tidb/issues/24288
func (s *testDBSuite8) TestDdlMaxLimitOfIdentifier(c *C) {
Expand Down
16 changes: 7 additions & 9 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,13 @@ func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Execu
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
jobIDs: v.JobIDs,
}
txn, err := e.ctx.Txn(true)
if err != nil {
b.err = err
return nil
}

e.errs, b.err = admin.CancelJobs(txn, e.jobIDs)
if b.err != nil {
return nil
// Run within a new transaction. If it runs within the session transaction, commit failure won't be reported to the user.
errInTxn := kv.RunInNewTxn(context.Background(), e.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) {
e.errs, err = admin.CancelJobs(txn, e.jobIDs)
return
})
if errInTxn != nil {
b.err = errInTxn
}
return e
}
Expand Down
21 changes: 19 additions & 2 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package kv

import (
"context"
"errors"
"math"
"math/rand"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -58,9 +60,24 @@ func RunInNewTxn(ctx context.Context, store Storage, retryable bool, f func(ctx
return err
}

err = txn.Commit(ctx)
failpoint.Inject("mockCommitErrorInNewTxn", func(val failpoint.Value) {
if v := val.(string); len(v) > 0 {
switch v {
case "once":
if i == 0 {
err = ErrTxnRetryable
}
case "everytime":
failpoint.Return(errors.New("mock commit error"))
}
}
})

if err == nil {
break
err = txn.Commit(ctx)
if err == nil {
break
}
}
if retryable && IsTxnRetryableError(err) {
logutil.BgLogger().Warn("RunInNewTxn",
Expand Down

0 comments on commit b0d7cff

Please sign in to comment.