From b0d7cff69c753ff0673686e8a4f4d32e1671a5b2 Mon Sep 17 00:00:00 2001 From: djshow832 <873581766@qq.com> Date: Wed, 8 Dec 2021 21:16:07 +0800 Subject: [PATCH] run in new txn --- ddl/db_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ executor/builder.go | 16 +++++++--------- kv/txn.go | 21 +++++++++++++++++++-- 3 files changed, 72 insertions(+), 11 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 31d50861bb0ab..677d48a335383 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/util/domainutil" "github.com/pingcap/tidb/util/israce" "github.com/pingcap/tidb/util/mock" + "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" "github.com/tikv/client-go/v2/testutils" @@ -7290,6 +7291,51 @@ func (s *testSerialDBSuite) TestJsonUnmarshalErrWhenPanicInCancellingPath(c *C) c.Assert(err.Error(), Equals, "[kv:1062]Duplicate entry '0' for key 'cc'") } +// Close issue #24172. +// See https://github.com/pingcap/tidb/issues/24172 +func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk1 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(id int)") + + var cancelErr error + var rs []sqlexec.RecordSet + hook := &ddl.TestDDLCallback{} + d := s.dom.DDL() + originalHook := d.GetHook() + d.(ddl.DDLForTest).SetHook(hook) + defer d.(ddl.DDLForTest).SetHook(originalHook) + + // Cancelling will be retried but it still fails, and adding index succeeds. + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { + stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("everytime")`), IsNil) + defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }() + rs, cancelErr = tk1.Se.Execute(context.Background(), stmt) + } + } + tk.MustExec("alter table t add index (id)") + c.Assert(cancelErr.Error(), Equals, "mock commit error") + + // Cancelling will be retried only once and it succeeds at the end. + var jobID int64 + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization { + jobID = job.ID + stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("once")`), IsNil) + defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }() + rs, cancelErr = tk1.Se.Execute(context.Background(), stmt) + } + } + tk.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob) + c.Assert(cancelErr, IsNil) + result := tk1.ResultSetToResultWithCtx(context.Background(), rs[0], Commentf("cancel ddl job fails")) + result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID))) +} + // For Close issue #24288 // see https://github.com/pingcap/tidb/issues/24288 func (s *testDBSuite8) TestDdlMaxLimitOfIdentifier(c *C) { diff --git a/executor/builder.go b/executor/builder.go index 37cb9b63d5f01..f2a575d48560a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -283,15 +283,13 @@ func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Execu baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), jobIDs: v.JobIDs, } - txn, err := e.ctx.Txn(true) - if err != nil { - b.err = err - return nil - } - - e.errs, b.err = admin.CancelJobs(txn, e.jobIDs) - if b.err != nil { - return nil + // Run within a new transaction. If it runs within the session transaction, commit failure won't be reported to the user. + errInTxn := kv.RunInNewTxn(context.Background(), e.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { + e.errs, err = admin.CancelJobs(txn, e.jobIDs) + return + }) + if errInTxn != nil { + b.err = errInTxn } return e } diff --git a/kv/txn.go b/kv/txn.go index 1359e60abb47d..27afbdc7e537c 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -16,10 +16,12 @@ package kv import ( "context" + "errors" "math" "math/rand" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -58,9 +60,24 @@ func RunInNewTxn(ctx context.Context, store Storage, retryable bool, f func(ctx return err } - err = txn.Commit(ctx) + failpoint.Inject("mockCommitErrorInNewTxn", func(val failpoint.Value) { + if v := val.(string); len(v) > 0 { + switch v { + case "once": + if i == 0 { + err = ErrTxnRetryable + } + case "everytime": + failpoint.Return(errors.New("mock commit error")) + } + } + }) + if err == nil { - break + err = txn.Commit(ctx) + if err == nil { + break + } } if retryable && IsTxnRetryableError(err) { logutil.BgLogger().Warn("RunInNewTxn",