Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: automatically check the delete range count #122

Merged
merged 1 commit into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 4 additions & 71 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,11 @@ func TestMultiSchemaChangeDropIndexedColumnsCancelled(t *testing.T) {
// Cancel job when the column 'a' is in delete-reorg.
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StateDeleteReorganization
})
jobIDExt := wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(composeHooks(dom, jobIDExt, hook))
dom.DDL().SetHook(hook)
tk.MustExec("alter table t drop column b, drop column a, drop column d;")
dom.DDL().SetHook(originHook)
hook.MustCancelFailed(t)
tk.MustQuery("select * from t;").Check(testkit.Rows("3"))
checkDelRangeCnt(tk, jobIDExt.jobID, 3)
}

func TestMultiSchemaChangeDropColumnsParallel(t *testing.T) {
Expand Down Expand Up @@ -551,12 +549,11 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
jobIDExt := wrapJobIDExtCallback(originHook)
cancelHook := newCancelJobHook(store, dom, func(job *model.Job) bool {
// Cancel the job when index 't2' is in write-reorg.
return job.MultiSchemaInfo.SubJobs[2].SchemaState == model.StateWriteReorganization
})
dom.DDL().SetHook(composeHooks(dom, jobIDExt, cancelHook))
dom.DDL().SetHook(cancelHook)
tk.MustGetErrCode("alter table t "+
"add index t(a, b), add index t1(a), "+
"add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob)
Expand All @@ -565,8 +562,6 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")
// Check the adding indexes are put to del-ranges.
checkDelRangeCnt(tk, jobIDExt.jobID, 3)

// Test cancel failed when some sub-jobs have been finished.
tk.MustExec("drop table if exists t;")
Expand All @@ -586,14 +581,10 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
}

func TestMultiSchemaChangeDropIndexes(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
originHook := dom.DDL().GetHook()

jobIDExt := wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)

// Test drop same index.
tk.MustExec("drop table if exists t;")
Expand All @@ -603,27 +594,19 @@ func TestMultiSchemaChangeDropIndexes(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int, c1 int, c2 int, primary key(id) nonclustered, key i1(c1), key i2(c2), key i3(c1, c2));")
tk.MustExec("insert into t values (1, 2, 3);")
jobIDExt.Clear()
tk.MustExec("alter table t drop index i1, drop index i2;")
tk.MustGetErrCode("select * from t use index(i1);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i2);", errno.ErrKeyDoesNotExist)
checkDelRangeCnt(tk, jobIDExt.jobID, 2)
jobIDExt.Clear()
tk.MustExec("alter table t drop index i3, drop primary key;")
tk.MustGetErrCode("select * from t use index(primary);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i3);", errno.ErrKeyDoesNotExist)
checkDelRangeCnt(tk, jobIDExt.jobID, 2)

// Test drop index with drop column.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int default 1, b int default 2, c int default 3, index t(a))")
tk.MustExec("insert into t values ();")
jobIDExt.Clear()
tk.MustExec("alter table t drop index t, drop column a")
tk.MustGetErrCode("select * from t force index(t)", errno.ErrKeyDoesNotExist)
checkDelRangeCnt(tk, jobIDExt.jobID, 1)

dom.DDL().SetHook(originHook)
}

func TestMultiSchemaChangeDropIndexesCancelled(t *testing.T) {
Expand Down Expand Up @@ -737,12 +720,9 @@ func TestMultiSchemaChangeRenameIndexes(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int default 1, b int default 2, c int default 3, index t(a))")
tk.MustExec("insert into t values ();")
jobIDExt := wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)
tk.MustExec("alter table t drop column a, rename index t to x")
tk.MustGetErrCode("select * from t use index (x);", errno.ErrKeyDoesNotExist)
tk.MustQuery("select * from t;").Check(testkit.Rows("2 3"))
checkDelRangeCnt(tk, jobIDExt.jobID, 1)

// Test cancel job with renameIndex
tk.MustExec("drop table if exists t")
Expand All @@ -760,11 +740,10 @@ func TestMultiSchemaChangeRenameIndexes(t *testing.T) {
}

func TestMultiSchemaChangeModifyColumns(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
originHook := dom.DDL().GetHook()

// unsupported ddl operations
{
Expand Down Expand Up @@ -817,39 +796,28 @@ func TestMultiSchemaChangeModifyColumns(t *testing.T) {
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index i1(a), index i2(b), index i3(c), index i4(a, b), index i5(a, b, c));")
tk.MustExec("insert into t values (1, 2, 3);")
jobIDExt := wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)
tk.MustExec("alter table t modify column a tinyint, modify column b tinyint, modify column c tinyint;")
dom.DDL().SetHook(originHook)
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
tk.MustQuery("select * from t use index(i1, i2, i3, i4, i5);").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")
checkDelRangeCnt(tk, jobIDExt.jobID, 8)

// Modify index-covered columns with position change.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index i1(a), index i2(b), index i3(c), index i4(a, b), index i5(a, b, c));")
tk.MustExec("insert into t values (1, 2, 3);")
jobIDExt = wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)
tk.MustExec("alter table t modify column a tinyint after c, modify column b tinyint, modify column c tinyint first;")
dom.DDL().SetHook(originHook)
tk.MustQuery("select * from t;").Check(testkit.Rows("3 2 1"))
tk.MustQuery("select * from t use index(i1, i2, i3, i4, i5);").Check(testkit.Rows("3 2 1"))
tk.MustExec("admin check table t;")
checkDelRangeCnt(tk, jobIDExt.jobID, 8)

// Modify columns that require and don't require reorganization of data.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index i1(a), index i2(c, b));")
tk.MustExec("insert into t values (1, 2, 3), (11, 22, 33);")
jobIDExt = wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)
tk.MustExec("alter table t modify column b char(255) after c, modify column a bigint;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 3 2", "11 33 22"))
tk.MustQuery("select * from t use index(i1, i2);").Check(testkit.Rows("1 3 2", "11 33 22"))
tk.MustExec("admin check table t;")
checkDelRangeCnt(tk, jobIDExt.jobID, 1) // only i2 has tmp index.

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a bigint null default '1761233443433596323', index t(a));")
Expand Down Expand Up @@ -883,11 +851,8 @@ func TestMultiSchemaChangeModifyColumns(t *testing.T) {

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int, b int);")
jobIDExt = wrapJobIDExtCallback(originHook)
dom.DDL().SetHook(jobIDExt)
tk.MustExec("insert into t values (1, 2);")
tk.MustGetErrCode("alter table t add index i(b), modify column a int null default 1 after a;", errno.ErrBadField)
checkDelRangeCnt(tk, jobIDExt.jobID, 1)
}

func TestMultiSchemaChangeModifyColumnsCancelled(t *testing.T) {
Expand Down Expand Up @@ -1217,35 +1182,3 @@ func putTheSameDDLJobTwice(t *testing.T, fn func()) {
err = failpoint.Disable("github.com/pingcap/tidb/ddl/mockParallelSameDDLJobTwice")
require.NoError(t, err)
}

func wrapJobIDExtCallback(oldCallback ddl.Callback) *testDDLJobIDCallback {
return &testDDLJobIDCallback{
Callback: oldCallback,
jobID: 0,
}
}

func checkDelRangeCnt(tk *testkit.TestKit, jobID int64, cnt int) {
query := `select sum(cnt) from
(select count(1) cnt from mysql.gc_delete_range where job_id = ? union all
select count(1) cnt from mysql.gc_delete_range_done where job_id = ?) as gdr;`
tk.MustQuery(query, jobID, jobID).Check(testkit.Rows(strconv.Itoa(cnt)))
}

type testDDLJobIDCallback struct {
ddl.Callback
jobID int64
}

func (t *testDDLJobIDCallback) OnJobUpdated(job *model.Job) {
if t.jobID == 0 {
t.jobID = job.ID
}
if t.Callback != nil {
t.Callback.OnJobUpdated(job)
}
}

func (t *testDDLJobIDCallback) Clear() {
t.jobID = 0
}
31 changes: 27 additions & 4 deletions ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *ddl) checkDeleteRangeCnt(job *model.Job) {
logutil.BgLogger().Error("query delete range count failed", zap.Error(err))
panic(err)
}
expectedCnt, err := expectedDeleteRangeCnt(job)
expectedCnt, err := expectedDeleteRangeCnt(delRangeCntCtx{idxIDs: map[int64]struct{}{}}, job)
if err != nil {
logutil.BgLogger().Error("decode job's delete range count failed", zap.Error(err))
panic(err)
Expand Down Expand Up @@ -77,7 +77,11 @@ func queryDeleteRangeCnt(sessPool *sessionPool, jobID int64) (int, error) {
return int(cnt), nil
}

func expectedDeleteRangeCnt(job *model.Job) (int, error) {
func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
if job.State == model.JobStateCancelled {
// Cancelled job should not have any delete range.
return 0, nil
}
switch job.Type {
case model.ActionDropSchema:
var tableIDs []int64
Expand All @@ -100,6 +104,10 @@ func expectedDeleteRangeCnt(job *model.Job) (int, error) {
}
return len(physicalTableIDs), nil
case model.ActionAddIndex, model.ActionAddPrimaryKey:
hasDelRange := job.State == model.JobStateRollbackDone
if !hasDelRange {
return 0, nil
}
var indexID int64
var ifExists bool
var partitionIDs []int64
Expand Down Expand Up @@ -133,12 +141,12 @@ func expectedDeleteRangeCnt(job *model.Job) (int, error) {
return 0, errors.Trace(err)
}
physicalCnt := mathutil.Max(len(partitionIDs), 1)
return physicalCnt * len(indexIDs), nil
return physicalCnt * ctx.deduplicateIdxCnt(indexIDs), nil
case model.ActionMultiSchemaChange:
totalExpectedCnt := 0
for _, sub := range job.MultiSchemaInfo.SubJobs {
p := sub.ToProxyJob(job)
cnt, err := expectedDeleteRangeCnt(&p)
cnt, err := expectedDeleteRangeCnt(ctx, &p)
if err != nil {
return 0, err
}
Expand All @@ -149,6 +157,21 @@ func expectedDeleteRangeCnt(job *model.Job) (int, error) {
return 0, nil
}

type delRangeCntCtx struct {
idxIDs map[int64]struct{}
}

func (ctx *delRangeCntCtx) deduplicateIdxCnt(indexIDs []int64) int {
cnt := 0
for _, id := range indexIDs {
if _, ok := ctx.idxIDs[id]; !ok {
ctx.idxIDs[id] = struct{}{}
cnt++
}
}
return cnt
}

// checkHistoryJobInTest does some sanity check to make sure something is correct after DDL complete.
// It's only check during the test environment, so it would panic directly.
// These checks may be controlled by configuration in the future.
Expand Down