Skip to content

Commit

Permalink
ddl: fix cancel the rollbacking back may lead to meta data broken (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored May 24, 2023
1 parent e034d3a commit aa7a386
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
38 changes: 38 additions & 0 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/stretchr/testify/require"
atomicutil "go.uber.org/atomic"
)
Expand Down Expand Up @@ -336,3 +337,40 @@ func TestCancel(t *testing.T) {
}
}
}

func TestCancelForAddUniqueIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)

// Prepare schema.
tk.MustExec("use test")
tk.MustExec(`create table t (c1 int, c2 int, c3 int)`)
tk.MustExec("insert into t values(1, 1, 1)")
tk.MustExec("insert into t values(2, 2, 2)")
tk.MustExec("insert into t values(1, 1, 1)")

hook := &callback.TestDDLCallback{Do: dom}
var testCancelState model.SchemaState
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState == testCancelState && job.State == model.JobStateRollingback {
tkCancel.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
}
}
dom.DDL().SetHook(hook.Clone())

testCancelState = model.StateWriteOnly
tk.MustGetErrCode("alter table t add unique index idx1(c1)", errno.ErrDupEntry)
tbl := external.GetTableByName(t, tk, "test", "t")
require.Equal(t, 0, len(tbl.Meta().Indices))

testCancelState = model.StateDeleteOnly
tk.MustGetErrCode("alter table t add unique index idx1(c1)", errno.ErrDupEntry)
tbl = external.GetTableByName(t, tk, "test", "t")
require.Equal(t, 0, len(tbl.Meta().Indices))

testCancelState = model.StateDeleteReorganization
tk.MustGetErrCode("alter table t add unique index idx1(c1)", errno.ErrDupEntry)
tbl = external.GetTableByName(t, tk, "test", "t")
require.Equal(t, 0, len(tbl.Meta().Indices))
}
18 changes: 5 additions & 13 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package ddl

import (
"context"
"encoding/json"
"fmt"
"runtime"
"strconv"
Expand Down Expand Up @@ -1456,9 +1455,7 @@ func cancelRunningJob(sess *sess.Session, job *model.Job,
}
job.State = model.JobStateCancelling
job.AdminOperator = byWho

// Make sure RawArgs isn't overwritten.
return json.Unmarshal(job.RawArgs, &job.Args)
return nil
}

// pauseRunningJob check and pause the running Job
Expand All @@ -1477,12 +1474,7 @@ func pauseRunningJob(sess *sess.Session, job *model.Job,

job.State = model.JobStatePausing
job.AdminOperator = byWho

if job.RawArgs == nil {
return nil
}

return json.Unmarshal(job.RawArgs, &job.Args)
return nil
}

// resumePausedJob check and resume the Paused Job
Expand All @@ -1502,7 +1494,7 @@ func resumePausedJob(se *sess.Session, job *model.Job,

job.State = model.JobStateQueueing

return json.Unmarshal(job.RawArgs, &job.Args)
return nil
}

// processJobs command on the Job according to the process
Expand Down Expand Up @@ -1558,7 +1550,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera
continue
}

err = updateDDLJob2Table(ns, job, true)
err = updateDDLJob2Table(ns, job, false)
if err != nil {
jobErrs[i] = err
continue
Expand Down Expand Up @@ -1649,7 +1641,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp
continue
}

err = updateDDLJob2Table(ns, job, true)
err = updateDDLJob2Table(ns, job, false)
if err != nil {
jobErrs[job.ID] = err
continue
Expand Down

0 comments on commit aa7a386

Please sign in to comment.