From c9001b3693a09174492d8271d775e498d9ade5a5 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 7 Mar 2022 17:07:49 +0800 Subject: [PATCH] ddl: migrate test-infra to testify for add index tests (#32864) close pingcap/tidb#32863 --- ddl/add_index_test.go | 881 ++++++++++++++++++++++++++++++ ddl/column_type_change_test.go | 85 ++- ddl/db_change_test.go | 42 +- ddl/db_partition_test.go | 4 +- ddl/db_test.go | 965 ++------------------------------- ddl/ddl_test.go | 2 - ddl/failtest/fail_db_test.go | 2 +- ddl/modify_column_test.go | 12 +- ddl/rollingback_test.go | 2 +- ddl/serial_test.go | 16 +- ddl/testutil/testutil.go | 4 +- 11 files changed, 999 insertions(+), 1016 deletions(-) create mode 100644 ddl/add_index_test.go diff --git a/ddl/add_index_test.go b/ddl/add_index_test.go new file mode 100644 index 0000000000000..d98993e8b2562 --- /dev/null +++ b/ddl/add_index_test.go @@ -0,0 +1,881 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "context" + "fmt" + "math" + "math/rand" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/ddl" + testddlutil "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/admin" + "github.com/pingcap/tidb/util/mock" + "github.com/stretchr/testify/require" +) + +const addIndexLease = 600 * time.Millisecond + +func TestAddPrimaryKey1(t *testing.T) { + testAddIndex(t, testPlain, "create table test_add_index (c1 bigint, c2 bigint, c3 bigint, unique key(c1))", "primary") +} + +func TestAddPrimaryKey2(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) + partition by range (c3) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "primary") +} + +func TestAddPrimaryKey3(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) + partition by hash (c3) partitions 4;`, "primary") +} + +func TestAddPrimaryKey4(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) + partition by range columns (c3) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "primary") +} + +func TestAddIndex1(t *testing.T) { + testAddIndex(t, testPlain, + "create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))", "") +} + +func TestAddIndex1WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + "create table test_add_index (c1 bigint, c2 bigint, c3 bigint) SHARD_ROW_ID_BITS = 4 pre_split_regions = 4;", "") +} + +func TestAddIndex2(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) + partition by range (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex2WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) + SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 + partition by range (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex3(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) + partition by hash (c1) partitions 4;`, "") +} + +func TestAddIndex3WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) + SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 + partition by hash (c1) partitions 4;`, "") +} + +func TestAddIndex4(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) + partition by range columns (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex4WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) + SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 + partition by range columns (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex5(t *testing.T) { + testAddIndex(t, testClusteredIndex, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c2, c3))`, "") +} + +type testAddIndexType uint8 + +const ( + testPlain testAddIndexType = 1 + testPartition testAddIndexType = 1 << 1 + testClusteredIndex testAddIndexType = 1 << 2 + testShardRowID testAddIndexType = 1 << 3 +) + +func testAddIndex(t *testing.T, tp testAddIndexType, createTableSQL, idxTp string) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, addIndexLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + isTestPartition := (testPartition & tp) > 0 + isTestShardRowID := (testShardRowID & tp) > 0 + if isTestShardRowID { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1) + tk.MustExec("set global tidb_scatter_region = 1") + defer func() { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0) + tk.MustExec("set global tidb_scatter_region = 0") + }() + } + if isTestPartition { + tk.MustExec("set @@session.tidb_enable_table_partition = '1';") + } else if (testClusteredIndex & tp) > 0 { + tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn + } + tk.MustExec("drop table if exists test_add_index") + tk.MustExec(createTableSQL) + + done := make(chan error, 1) + start := -10 + num := defaultBatchSize + // first add some rows + batchInsert(tk, "test_add_index", start, num) + + // Add some discrete rows. + maxBatch := 20 + batchCnt := 100 + otherKeys := make([]int, 0, batchCnt*maxBatch) + // Make sure there are no duplicate keys. + base := defaultBatchSize * 20 + for i := 1; i < batchCnt; i++ { + if isTestShardRowID { + base = i % 4 << 61 + } + n := base + i*defaultBatchSize + i + for j := 0; j < rand.Intn(maxBatch); j++ { + n += j + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) + tk.MustExec(sql) + otherKeys = append(otherKeys, n) + } + } + // Encounter the value of math.MaxInt64 in middle of + v := math.MaxInt64 - defaultBatchSize/2 + tk.MustExec(fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v)) + otherKeys = append(otherKeys, v) + + addIdxSQL := fmt.Sprintf("alter table test_add_index add %s key c3_index(c3)", idxTp) + testddlutil.SessionExecInGoroutine(store, "test", addIdxSQL, done) + + deletedKeys := make(map[int]struct{}) + + ticker := time.NewTicker(addIndexLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + require.NoError(t, err) + case <-ticker.C: + // When the server performance is particularly poor, + // the adding index operation can not be completed. + // So here is a limit to the number of rows inserted. + if num > defaultBatchSize*10 { + break + } + step := 5 + // delete some rows, and add some data + for i := num; i < num+step; i++ { + n := rand.Intn(num) + deletedKeys[n] = struct{}{} + sql := fmt.Sprintf("delete from test_add_index where c1 = %d", n) + tk.MustExec(sql) + sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + tk.MustExec(sql) + } + num += step + } + } + + if isTestShardRowID { + rows := tk.MustQuery("show table test_add_index regions").Rows() + require.GreaterOrEqual(t, len(rows), 16) + tk.MustExec("admin check table test_add_index") + return + } + + // get exists keys + keys := make([]int, 0, num) + for i := start; i < num; i++ { + if _, ok := deletedKeys[i]; ok { + continue + } + keys = append(keys, i) + } + keys = append(keys, otherKeys...) + + // test index key + expectedRows := make([][]interface{}, 0, len(keys)) + for _, key := range keys { + expectedRows = append(expectedRows, []interface{}{fmt.Sprintf("%v", key)}) + } + tk.MustQuery(fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)).Check(expectedRows) + tk.MustExec("admin check table test_add_index") + if isTestPartition { + return + } + + // TODO: Support explain in future. + // rows := tk.MustQuery("explain select c1 from test_add_index where c3 >= 100").Rows() + // ay := dumpRows(c, rows) + // require.Contains(t, fmt.Sprintf("%v", ay), "c3_index") + + // get all row handles + require.NoError(t, tk.Session().NewTxn(context.Background())) + tbl := tk.GetTableByName("test", "test_add_index") + handles := kv.NewHandleMap() + err := tables.IterRecords(tbl, tk.Session(), tbl.Cols(), + func(h kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) { + handles.Set(h, struct{}{}) + return true, nil + }) + require.NoError(t, err) + + // check in index + var nidx table.Index + idxName := "c3_index" + if len(idxTp) != 0 { + idxName = "primary" + } + for _, tidx := range tbl.Indices() { + if tidx.Meta().Name.L == idxName { + nidx = tidx + break + } + } + // Make sure there is index with name c3_index. + require.NotNil(t, nidx) + require.Greater(t, nidx.Meta().ID, int64(0)) + txn, err := tk.Session().Txn(true) + require.NoError(t, err) + require.NoError(t, txn.Rollback()) + + require.NoError(t, tk.Session().NewTxn(context.Background())) + tk.MustExec("admin check table test_add_index") + tk.MustExec("drop table test_add_index") +} + +func TestAddIndexForGeneratedColumn(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, addIndexLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(y year NOT NULL DEFAULT '2155')") + for i := 0; i < 50; i++ { + tk.MustExec("insert into t values (?)", i) + } + tk.MustExec("insert into t values()") + tk.MustExec("ALTER TABLE t ADD COLUMN y1 year as (y + 2)") + tk.MustExec("ALTER TABLE t ADD INDEX idx_y(y1)") + + tbl := tk.GetTableByName("test", "t") + for _, idx := range tbl.Indices() { + require.False(t, strings.EqualFold(idx.Meta().Name.L, "idx_c2")) + } + // NOTE: this test case contains a bug, it should be uncommented after the bug is fixed. + // TODO: Fix bug https://github.com/pingcap/tidb/issues/12181 + // tk.MustExec("delete from t where y = 2155") + // tk.MustExec("alter table t add index idx_y(y1)") + // tk.MustExec("alter table t drop index idx_y") + + // Fix issue 9311. + tk.MustExec("drop table if exists gcai_table") + tk.MustExec("create table gcai_table (id int primary key);") + tk.MustExec("insert into gcai_table values(1);") + tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d date DEFAULT '9999-12-31';") + tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d1 date as (DATE_SUB(d, INTERVAL 31 DAY));") + tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx(d1);") + tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30")) + tk.MustQuery("select d1 from gcai_table use index(idx)").Check(testkit.Rows("9999-11-30")) + tk.MustExec("admin check table gcai_table") + // The column is PKIsHandle in generated column expression. + tk.MustExec("ALTER TABLE gcai_table ADD COLUMN id1 int as (id+5);") + tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx1(id1);") + tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30 6")) + tk.MustQuery("select id1 from gcai_table use index(idx1)").Check(testkit.Rows("6")) + tk.MustExec("admin check table gcai_table") +} + +// TestAddPrimaryKeyRollback1 is used to test scenarios that will roll back when a duplicate primary key is encountered. +func TestAddPrimaryKeyRollback1(t *testing.T) { + idxName := "PRIMARY" + addIdxSQL := "alter table t1 add primary key c3_index (c3);" + errMsg := "[kv:1062]Duplicate entry '" + strconv.Itoa(defaultBatchSize*2-10) + "' for key 'PRIMARY'" + testAddIndexRollback(t, idxName, addIdxSQL, errMsg, false) +} + +// TestAddPrimaryKeyRollback2 is used to test scenarios that will roll back when a null primary key is encountered. +func TestAddPrimaryKeyRollback2(t *testing.T) { + idxName := "PRIMARY" + addIdxSQL := "alter table t1 add primary key c3_index (c3);" + errMsg := "[ddl:1138]Invalid use of NULL value" + testAddIndexRollback(t, idxName, addIdxSQL, errMsg, true) +} + +func TestAddUniqueIndexRollback(t *testing.T) { + idxName := "c3_index" + addIdxSQL := "create unique index c3_index on t1 (c3)" + errMsg := "[kv:1062]Duplicate entry '" + strconv.Itoa(defaultBatchSize*2-10) + "' for key 'c3_index'" + testAddIndexRollback(t, idxName, addIdxSQL, errMsg, false) +} + +func testAddIndexRollback(t *testing.T, idxName, addIdxSQL, errMsg string, hasNullValsInKey bool) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, addIndexLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (c1 int, c2 int, c3 int, unique key(c1))") + // defaultBatchSize is equal to ddl.defaultBatchSize + base := defaultBatchSize * 2 + count := base + // add some rows + batchInsert(tk, "t1", 0, count) + // add some null rows + if hasNullValsInKey { + for i := count - 10; i < count; i++ { + tk.MustExec("insert into t1 values (?, ?, null)", i+10, i) + } + } else { + // add some duplicate rows + for i := count - 10; i < count; i++ { + tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) + } + } + + done := make(chan error, 1) + go backgroundExecT(store, addIdxSQL, done) + + times := 0 + ticker := time.NewTicker(addIndexLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + require.EqualError(t, err, errMsg) + break LOOP + case <-ticker.C: + if times >= 10 { + break + } + step := 5 + // delete some rows, and add some data + for i := count; i < count+step; i++ { + n := rand.Intn(count) + // (2048, 2038, 2038) and (2038, 2038, 2038) + // Don't delete rows where c1 is 2048 or 2038, otherwise, the entry value in duplicated error message would change. + if n == defaultBatchSize*2-10 || n == defaultBatchSize*2 { + continue + } + tk.MustExec("delete from t1 where c1 = ?", n) + tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) + } + count += step + times++ + } + } + + tbl := tk.GetTableByName("test", "t1") + for _, tidx := range tbl.Indices() { + require.False(t, strings.EqualFold(tidx.Meta().Name.L, idxName)) + } + + // delete duplicated/null rows, then add index + for i := base - 10; i < base; i++ { + tk.MustExec("delete from t1 where c1 = ?", i+10) + } + tk.MustExec(addIdxSQL) + tk.MustExec("drop table t1") +} + +func TestAddIndexWithSplitTable(t *testing.T) { + createSQL := "CREATE TABLE test_add_index(a bigint PRIMARY KEY AUTO_RANDOM(4), b varchar(255), c bigint)" + stSQL := fmt.Sprintf("SPLIT TABLE test_add_index BETWEEN (%d) AND (%d) REGIONS 16;", math.MinInt64, math.MaxInt64) + testAddIndexWithSplitTable(t, createSQL, stSQL) +} + +func TestAddIndexWithShardRowID(t *testing.T) { + createSQL := "create table test_add_index(a bigint, b bigint, c bigint) SHARD_ROW_ID_BITS = 4 pre_split_regions = 4;" + testAddIndexWithSplitTable(t, createSQL, "") +} + +func testAddIndexWithSplitTable(t *testing.T, createSQL, splitTableSQL string) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, addIndexLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + hasAutoRandomField := len(splitTableSQL) > 0 + if !hasAutoRandomField { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1) + tk.MustExec("set global tidb_scatter_region = 1") + defer func() { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0) + tk.MustExec("set global tidb_scatter_region = 0") + }() + } + tk.MustExec(createSQL) + + batchInsertRows := func(tk *testkit.TestKit, needVal bool, tbl string, start, end int) error { + dml := fmt.Sprintf("insert into %s values", tbl) + for i := start; i < end; i++ { + if needVal { + dml += fmt.Sprintf("(%d, %d, %d)", i, i, i) + } else { + dml += "()" + } + if i != end-1 { + dml += "," + } + } + _, err := tk.Exec(dml) + return err + } + + done := make(chan error, 1) + start := -20 + num := defaultBatchSize + // Add some discrete rows. + goCnt := 10 + errCh := make(chan error, goCnt) + for i := 0; i < goCnt; i++ { + base := (i % 8) << 60 + go func(b int, eCh chan error) { + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + eCh <- batchInsertRows(tk1, !hasAutoRandomField, "test_add_index", base+start, base+num) + }(base, errCh) + } + for i := 0; i < goCnt; i++ { + err := <-errCh + require.NoError(t, err) + } + + if hasAutoRandomField { + tk.MustQuery(splitTableSQL).Check(testkit.Rows("15 1")) + } + tk.MustQuery("select @@session.tidb_wait_split_region_finish").Check(testkit.Rows("1")) + rows := tk.MustQuery("show table test_add_index regions").Rows() + require.Len(t, rows, 16) + addIdxSQL := "alter table test_add_index add index idx(a)" + testddlutil.SessionExecInGoroutine(store, "test", addIdxSQL, done) + + ticker := time.NewTicker(addIndexLease / 5) + defer ticker.Stop() + num = 0 +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + require.NoError(t, err) + case <-ticker.C: + // When the server performance is particularly poor, + // the adding index operation can not be completed. + // So here is a limit to the number of rows inserted. + if num >= 1000 { + break + } + step := 20 + // delete, insert and update some data + for i := num; i < num+step; i++ { + sql := fmt.Sprintf("delete from test_add_index where a = %d", i+1) + tk.MustExec(sql) + if hasAutoRandomField { + sql = "insert into test_add_index values ()" + } else { + sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + } + tk.MustExec(sql) + sql = fmt.Sprintf("update test_add_index set b = %d", i*10) + tk.MustExec(sql) + } + num += step + } + } + + tk.MustExec("admin check table test_add_index") +} + +func TestAddAnonymousIndex(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, addIndexLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t_anonymous_index (c1 int, c2 int, C3 int)") + tk.MustExec("alter table t_anonymous_index add index (c1, c2)") + // for dropping empty index + err := tk.ExecToErr("alter table t_anonymous_index drop index") + require.Error(t, err) + // The index name is c1 when adding index (c1, c2). + tk.MustExec("alter table t_anonymous_index drop index c1") + tbl := tk.GetTableByName("test", "t_anonymous_index") + require.Len(t, tbl.Indices(), 0) + // for adding some indices that the first column name is c1 + tk.MustExec("alter table t_anonymous_index add index (c1)") + err = tk.ExecToErr("alter table t_anonymous_index add index c1 (c2)") + require.Error(t, err) + tbl = tk.GetTableByName("test", "t_anonymous_index") + require.Len(t, tbl.Indices(), 1) + require.Equal(t, "c1", tbl.Indices()[0].Meta().Name.L) + // The MySQL will be a warning. + tk.MustExec("alter table t_anonymous_index add index c1_3 (c1)") + tk.MustExec("alter table t_anonymous_index add index (c1, c2, C3)") + // The MySQL will be a warning. + tk.MustExec("alter table t_anonymous_index add index (c1)") + tbl = tk.GetTableByName("test", "t_anonymous_index") + require.Len(t, tbl.Indices(), 4) + tk.MustExec("alter table t_anonymous_index drop index c1") + tk.MustExec("alter table t_anonymous_index drop index c1_2") + tk.MustExec("alter table t_anonymous_index drop index c1_3") + tk.MustExec("alter table t_anonymous_index drop index c1_4") + // for case-insensitive + tk.MustExec("alter table t_anonymous_index add index (C3)") + tk.MustExec("alter table t_anonymous_index drop index c3") + tk.MustExec("alter table t_anonymous_index add index c3 (C3)") + tk.MustExec("alter table t_anonymous_index drop index C3") + // for anonymous index with column name `primary` + tk.MustExec("create table t_primary (`primary` int, b int, key (`primary`))") + tbl = tk.GetTableByName("test", "t_primary") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + tk.MustExec("alter table t_primary add index (`primary`);") + tbl = tk.GetTableByName("test", "t_primary") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) + tk.MustExec("alter table t_primary add primary key(b);") + tbl = tk.GetTableByName("test", "t_primary") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) + require.Equal(t, "primary", tbl.Indices()[2].Meta().Name.L) + tk.MustExec("create table t_primary_2 (`primary` int, key primary_2 (`primary`), key (`primary`))") + tbl = tk.GetTableByName("test", "t_primary_2") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) + tk.MustExec("create table t_primary_3 (`primary_2` int, key(`primary_2`), `primary` int, key(`primary`));") + tbl = tk.GetTableByName("test", "t_primary_3") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) +} + +func TestAddIndexWithPK(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, addIndexLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tests := []struct { + name string + mode variable.ClusteredIndexDefMode + }{ + { + "ClusteredIndexDefModeIntOnly", + variable.ClusteredIndexDefModeIntOnly, + }, + { + "ClusteredIndexDefModeOn", + variable.ClusteredIndexDefModeOn, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tk.Session().GetSessionVars().EnableClusteredIndex = test.mode + tk.MustExec("drop table if exists test_add_index_with_pk") + tk.MustExec("create table test_add_index_with_pk(a int not null, b int not null default '0', primary key(a))") + tk.MustExec("insert into test_add_index_with_pk values(1, 2)") + tk.MustExec("alter table test_add_index_with_pk add index idx (a)") + tk.MustQuery("select a from test_add_index_with_pk").Check(testkit.Rows("1")) + tk.MustExec("insert into test_add_index_with_pk values(2, 2)") + tk.MustExec("alter table test_add_index_with_pk add index idx1 (a, b)") + tk.MustQuery("select * from test_add_index_with_pk").Check(testkit.Rows("1 2", "2 2")) + tk.MustExec("drop table if exists test_add_index_with_pk1") + tk.MustExec("create table test_add_index_with_pk1(a int not null, b int not null default '0', c int, d int, primary key(c))") + tk.MustExec("insert into test_add_index_with_pk1 values(1, 1, 1, 1)") + tk.MustExec("alter table test_add_index_with_pk1 add index idx (c)") + tk.MustExec("insert into test_add_index_with_pk1 values(2, 2, 2, 2)") + tk.MustQuery("select * from test_add_index_with_pk1").Check(testkit.Rows("1 1 1 1", "2 2 2 2")) + tk.MustExec("drop table if exists test_add_index_with_pk2") + tk.MustExec("create table test_add_index_with_pk2(a int not null, b int not null default '0', c int unsigned, d int, primary key(c))") + tk.MustExec("insert into test_add_index_with_pk2 values(1, 1, 1, 1)") + tk.MustExec("alter table test_add_index_with_pk2 add index idx (c)") + tk.MustExec("insert into test_add_index_with_pk2 values(2, 2, 2, 2)") + tk.MustQuery("select * from test_add_index_with_pk2").Check(testkit.Rows("1 1 1 1", "2 2 2 2")) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, c int, primary key(a, b));") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec("create index idx on t (a, b);") + }) + } +} + +func TestCancelAddPrimaryKey(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, addIndexLease) + defer clean() + idxName := "primary" + addIdxSQL := "alter table t1 add primary key idx_c2 (c2);" + testCancelAddIndex(t, store, dom, idxName, addIdxSQL) + + // Check the column's flag when the "add primary key" failed. + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + require.NoError(t, tk.Session().NewTxn(context.Background())) + tbl := tk.GetTableByName("test", "t1") + col1Flag := tbl.Cols()[1].Flag + require.True(t, !mysql.HasNotNullFlag(col1Flag) && !mysql.HasPreventNullInsertFlag(col1Flag) && mysql.HasUnsignedFlag(col1Flag)) +} + +func TestCancelAddIndex(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, addIndexLease) + defer clean() + idxName := "c3_index" + addIdxSQL := "create unique index c3_index on t1 (c3)" + testCancelAddIndex(t, store, dom, idxName, addIdxSQL) +} + +func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxName, addIdxSQL string) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (c1 int, c2 int unsigned, c3 int, unique key(c1))") + + d := dom.DDL() + + // defaultBatchSize is equal to ddl.defaultBatchSize + count := defaultBatchSize * 32 + start := 0 + for i := start; i < count; i += defaultBatchSize { + batchInsert(tk, "t1", i, i+defaultBatchSize) + } + + var c3IdxInfo *model.IndexInfo + hook := &ddl.TestDDLCallback{Do: dom} + originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") + // Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job. + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 32") + defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0])) + // let hook.OnJobUpdatedExported has chance to cancel the job. + // the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob. + // After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case. + var checkErr error + hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(tk, store, hook, idxName) + originalHook := d.GetHook() + jobIDExt := wrapJobIDExtCallback(hook) + d.SetHook(jobIDExt) + done := make(chan error, 1) + go backgroundExecT(store, addIdxSQL, done) + + times := 0 + ticker := time.NewTicker(addIndexLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + require.NoError(t, checkErr) + require.EqualError(t, err, "[ddl:8214]Cancelled DDL job") + break LOOP + case <-ticker.C: + if times >= 10 { + break + } + step := 5 + // delete some rows, and add some data + for i := count; i < count+step; i++ { + n := rand.Intn(count) + tk.MustExec("delete from t1 where c1 = ?", n) + tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) + } + count += step + times++ + } + } + checkDelRangeAddedN(tk, jobIDExt.jobID, c3IdxInfo.ID) + d.SetHook(originalHook) +} + +func backgroundExecOnJobUpdatedExported(tk *testkit.TestKit, store kv.Storage, hook *ddl.TestDDLCallback, idxName string) (func(*model.Job), *model.IndexInfo, error) { + var checkErr error + first := true + c3IdxInfo := &model.IndexInfo{} + hook.OnJobUpdatedExported = func(job *model.Job) { + addIndexNotFirstReorg := (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && + job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 + // If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes. + // When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes. + if !addIndexNotFirstReorg { + // Get the index's meta. + if c3IdxInfo.ID != 0 { + return + } + tbl := tk.GetTableByName("test", "t1") + for _, index := range tbl.Indices() { + if !tables.IsIndexWritable(index) { + continue + } + if index.Meta().Name.L == idxName { + *c3IdxInfo = *index.Meta() + } + } + return + } + // The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes. + if first { + first = false + return + } + if checkErr != nil { + return + } + hookCtx := mock.NewContext() + hookCtx.Store = store + err := hookCtx.NewTxn(context.Background()) + if err != nil { + checkErr = errors.Trace(err) + return + } + jobIDs := []int64{job.ID} + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + // It only tests cancel one DDL job. + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + txn, err = hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + err = txn.Commit(context.Background()) + if err != nil { + checkErr = errors.Trace(err) + } + } + return hook.OnJobUpdatedExported, c3IdxInfo, checkErr +} + +// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started. +func TestCancelAddIndex1(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, addIndexLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(c1 int, c2 int)") + for i := 0; i < 50; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + + var checkErr error + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 { + jobIDs := []int64{job.ID} + hookCtx := mock.NewContext() + hookCtx.Store = store + err := hookCtx.NewTxn(context.Background()) + if err != nil { + checkErr = errors.Trace(err) + return + } + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + + checkErr = txn.Commit(context.Background()) + } + } + originalHook := dom.DDL().GetHook() + dom.DDL().SetHook(hook) + err := tk.ExecToErr("alter table t add index idx_c2(c2)") + require.NoError(t, checkErr) + require.EqualError(t, err, "[ddl:8214]Cancelled DDL job") + + dom.DDL().SetHook(originalHook) + tbl := tk.GetTableByName("test", "t") + for _, idx := range tbl.Indices() { + require.False(t, strings.EqualFold(idx.Meta().Name.L, "idx_c2")) + } + tk.MustExec("alter table t add index idx_c2(c2)") + tk.MustExec("alter table t drop index idx_c2") +} diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index d1570a293dbb6..6bbc0bb25145c 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -126,9 +126,6 @@ func TestColumnTypeChangeStateBetweenInteger(t *testing.T) { require.Equal(t, 2, len(tbl.Cols())) require.NotNil(t, tk.GetModifyColumn("test", "t", "c2", false)) - originalHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - hook := &ddl.TestDDLCallback{Do: dom} var checkErr error hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -160,12 +157,12 @@ func TestColumnTypeChangeStateBetweenInteger(t *testing.T) { } } } - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) // Alter sql will modify column c2 to tinyint not null. SQL := "alter table t modify column c2 tinyint not null" tk.MustExec(SQL) // Assert the checkErr in the job of every state. - require.Nil(t, checkErr) + require.NoError(t, checkErr) // Check the col meta after the column type change. tbl = tk.GetTableByName("test", "t") @@ -194,42 +191,35 @@ func TestRollbackColumnTypeChangeBetweenInteger(t *testing.T) { require.Equal(t, 2, len(tbl.Cols())) require.NotNil(t, tk.GetModifyColumn("test", "t", "c2", false)) - originalHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - hook := &ddl.TestDDLCallback{Do: dom} // Mock roll back at model.StateNone. customizeHookRollbackAtState(hook, tbl, model.StateNone) - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) // Alter sql will modify column c2 to bigint not null. SQL := "alter table t modify column c2 int not null" - _, err := tk.Exec(SQL) - require.Error(t, err) - require.Equal(t, "[ddl:1]MockRollingBackInCallBack-queueing", err.Error()) + err := tk.ExecToErr(SQL) + require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-queueing") assertRollBackedColUnchanged(t, tk) // Mock roll back at model.StateDeleteOnly. customizeHookRollbackAtState(hook, tbl, model.StateDeleteOnly) - dom.DDL().(ddl.DDLForTest).SetHook(hook) - _, err = tk.Exec(SQL) - require.Error(t, err) - require.Equal(t, "[ddl:1]MockRollingBackInCallBack-delete only", err.Error()) + dom.DDL().SetHook(hook) + err = tk.ExecToErr(SQL) + require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-delete only") assertRollBackedColUnchanged(t, tk) // Mock roll back at model.StateWriteOnly. customizeHookRollbackAtState(hook, tbl, model.StateWriteOnly) - dom.DDL().(ddl.DDLForTest).SetHook(hook) - _, err = tk.Exec(SQL) - require.Error(t, err) - require.Equal(t, "[ddl:1]MockRollingBackInCallBack-write only", err.Error()) + dom.DDL().SetHook(hook) + err = tk.ExecToErr(SQL) + require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-write only") assertRollBackedColUnchanged(t, tk) // Mock roll back at model.StateWriteReorg. customizeHookRollbackAtState(hook, tbl, model.StateWriteReorganization) - dom.DDL().(ddl.DDLForTest).SetHook(hook) - _, err = tk.Exec(SQL) - require.Error(t, err) - require.Equal(t, "[ddl:1]MockRollingBackInCallBack-write reorganization", err.Error()) + dom.DDL().SetHook(hook) + err = tk.ExecToErr(SQL) + require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-write reorganization") assertRollBackedColUnchanged(t, tk) } @@ -948,12 +938,9 @@ func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - originalHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - var assertResult bool assertHasAlterWriteReorg := func(tbl table.Table) { - // Restore the assert result to false. + // Restore assertResult to false. assertResult = false hook := &ddl.TestDDLCallback{Do: dom} hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -964,7 +951,7 @@ func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) { assertResult = true } } - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) } // Change int to tinyint. @@ -974,7 +961,7 @@ func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) { tbl := tk.GetTableByName("test", "t") assertHasAlterWriteReorg(tbl) tk.MustExec("alter table t modify column a tinyint(3)") - require.Equal(t, true, assertResult) + require.True(t, assertResult) // Change tinyint to tinyint // Although display length is decreased, default flen is the same, reorg is not needed. @@ -983,7 +970,7 @@ func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) { tbl = tk.GetTableByName("test", "t") assertHasAlterWriteReorg(tbl) tk.MustExec("alter table t modify column a tinyint(1)") - require.Equal(t, false, assertResult) + require.False(t, assertResult) tk.MustExec("drop table if exists t") } @@ -1678,10 +1665,10 @@ func TestChangingColOriginDefaultValue(t *testing.T) { i++ } } - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) tk.MustExec("alter table t modify column b tinyint NOT NULL") - dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - require.Nil(t, checkErr) + dom.DDL().SetHook(originalHook) + require.NoError(t, checkErr) // Since getReorgInfo will stagnate StateWriteReorganization for a ddl round, so insert should exec 3 times. tk.MustQuery("select * from t order by a").Check(testkit.Rows("1 -1", "2 -2", "3 3", "4 4", "5 5")) tk.MustExec("drop table if exists t") @@ -1764,10 +1751,10 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastSucc(t *testing.T) { i++ } - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) tk.MustExec("alter table t modify column c date NOT NULL") - dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - require.Nil(t, checkErr) + dom.DDL().SetHook(originalHook) + require.NoError(t, checkErr) // Since getReorgInfo will stagnate StateWriteReorganization for a ddl round, so insert should exec 3 times. tk.MustQuery("select * from t order by a").Check( testkit.Rows("1 -1 1971-06-09", "2 -2 1971-06-09", "5 5 2021-06-06", "6 6 2021-06-06", "7 7 2021-06-06")) @@ -1824,10 +1811,10 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastFail(t *testing.T) { } } - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) tk.MustExec("alter table t modify column x DATETIME NULL DEFAULT '3771-02-28 13:00:11' AFTER b;") - dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - require.Nil(t, checkErr) + dom.DDL().SetHook(originalHook) + require.NoError(t, checkErr) tk.MustQuery("select * from t order by a").Check(testkit.Rows()) tk.MustExec("drop table if exists t") } @@ -1925,9 +1912,6 @@ func TestDDLExitWhenCancelMeetPanic(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit")) }() - originalHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - hook := &ddl.TestDDLCallback{Do: dom} var jobID int64 hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -1938,21 +1922,20 @@ func TestDDLExitWhenCancelMeetPanic(t *testing.T) { jobID = job.ID } } - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) // when it panics in write-reorg state, the job will be pulled up as a cancelling job. Since drop-index with // write-reorg can't be cancelled, so it will be converted to running state and try again (dead loop). - _, err := tk.Exec("alter table t drop index b") - require.Error(t, err) - require.Equal(t, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled", err.Error()) - require.Equal(t, true, jobID > 0) + err := tk.ExecToErr("alter table t drop index b") + require.EqualError(t, err, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled") + require.Less(t, int64(0), jobID) // Verification of the history job state. var job *model.Job err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) + m := meta.NewMeta(txn) var err1 error - job, err1 = t.GetHistoryDDLJob(jobID) + job, err1 = m.GetHistoryDDLJob(jobID) return errors2.Trace(err1) }) require.NoError(t, err) @@ -2025,7 +2008,7 @@ func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) { jobID = job.ID } } - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) tk1 := testkit.NewTestKit(t, store) tk1.MustExec("use test") diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index d3a857f27b805..d1411688d58a1 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -152,8 +152,8 @@ func (s *stateChangeSuite) TestShowCreateTable() { } d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) - d.(ddl.DDLForTest).SetHook(callback) + defer d.SetHook(originalCallback) + d.SetHook(callback) for _, tc := range testCases { tk.MustExec(tc.sql) s.Require().NoError(checkErr) @@ -200,7 +200,7 @@ func (s *stateChangeSuite) TestDropNotNullColumn() { } } - d.(ddl.DDLForTest).SetHook(callback) + d.SetHook(callback) tk.MustExec("alter table t drop column a") s.Require().NoError(checkErr) sqlNum++ @@ -212,7 +212,7 @@ func (s *stateChangeSuite) TestDropNotNullColumn() { sqlNum++ tk.MustExec("alter table t3 drop column d") s.Require().NoError(checkErr) - d.(ddl.DDLForTest).SetHook(originalCallback) + d.SetHook(originalCallback) tk.MustExec("drop table t, t1, t2, t3") } @@ -315,8 +315,8 @@ func (s *stateChangeSuite) test(alterTableSQL string, testInfo *testExecInfo) { } d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) - d.(ddl.DDLForTest).SetHook(callback) + defer d.SetHook(originalCallback) + d.SetHook(callback) s.tk.MustExec(alterTableSQL) s.Require().NoError(testInfo.compileSQL(4)) s.Require().NoError(testInfo.execSQL(4)) @@ -847,10 +847,10 @@ func (s *stateChangeSuite) runTestInSchemaState( } d := s.dom.DDL() originalCallback := d.GetHook() - d.(ddl.DDLForTest).SetHook(callback) + d.SetHook(callback) s.tk.MustExec(alterTableSQL) s.Require().NoError(checkErr) - d.(ddl.DDLForTest).SetHook(originalCallback) + d.SetHook(originalCallback) if expectQuery != nil { tk := testkit.NewTestKit(s.T(), s.store) @@ -897,7 +897,7 @@ func (s *stateChangeSuite) TestShowIndex() { d := s.dom.DDL() originalCallback := d.GetHook() - d.(ddl.DDLForTest).SetHook(callback) + d.SetHook(callback) alterTableSQL := `alter table t add index c2(c2)` s.tk.MustExec(alterTableSQL) s.Require().NoError(checkErr) @@ -906,7 +906,7 @@ func (s *stateChangeSuite) TestShowIndex() { "t 0 PRIMARY 1 c1 A 0 BTREE YES NO", "t 1 c2 1 c2 A 0 YES BTREE YES NO", )) - d.(ddl.DDLForTest).SetHook(originalCallback) + d.SetHook(originalCallback) s.tk.MustExec(`create table tr( id int, name varchar(50), @@ -1215,7 +1215,7 @@ func (s *stateChangeSuite) prepareTestControlParallelExecSQL() (*testkit.TestKit } d := s.dom.DDL() originalCallback := d.GetHook() - d.(ddl.DDLForTest).SetHook(callback) + d.SetHook(callback) tk1 := testkit.NewTestKit(s.T(), s.store) tk1.MustExec("use test_db_state") @@ -1266,7 +1266,7 @@ func (s *stateChangeSuite) testControlParallelExecSQL(preSQL, sql1, sql2 string, );`) tk1, tk2, ch, originalCallback := s.prepareTestControlParallelExecSQL() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback) + defer s.dom.DDL().SetHook(originalCallback) var err1 error var err2 error @@ -1303,7 +1303,7 @@ func (s *stateChangeSuite) TestParallelUpdateTableReplica() { s.tk.MustExec("alter table t1 set tiflash replica 3 location labels 'a','b';") tk1, tk2, ch, originalCallback := s.prepareTestControlParallelExecSQL() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback) + defer s.dom.DDL().SetHook(originalCallback) t1 := tk1.GetTableByName("test_db_state", "t1") @@ -1345,8 +1345,8 @@ func (s *stateChangeSuite) testParallelExecSQL(sql string) { d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) - d.(ddl.DDLForTest).SetHook(callback) + defer d.SetHook(originalCallback) + d.SetHook(callback) wg.Run(func() { err2 = tk1.ExecToErr(sql) }) @@ -1672,7 +1672,7 @@ func (s *stateChangeSuite) TestCreateExpressionIndex() { var checkErr error d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) + defer d.SetHook(originalCallback) callback := &ddl.TestDDLCallback{} callback.OnJobUpdatedExported = func(job *model.Job) { if checkErr != nil { @@ -1713,7 +1713,7 @@ func (s *stateChangeSuite) TestCreateExpressionIndex() { } } - d.(ddl.DDLForTest).SetHook(callback) + d.SetHook(callback) tk.MustExec("alter table t add index idx((b+1))") s.Require().NoError(checkErr) tk.MustExec("admin check table t") @@ -1738,7 +1738,7 @@ func (s *stateChangeSuite) TestCreateUniqueExpressionIndex() { var checkErr error d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) + defer d.SetHook(originalCallback) callback := &ddl.TestDDLCallback{} callback.OnJobUpdatedExported = func(job *model.Job) { if checkErr != nil { @@ -1825,7 +1825,7 @@ func (s *stateChangeSuite) TestCreateUniqueExpressionIndex() { } } - d.(ddl.DDLForTest).SetHook(callback) + d.SetHook(callback) tk.MustExec("alter table t add unique index idx((a*b+1))") s.Require().NoError(checkErr) tk.MustExec("admin check table t") @@ -1849,7 +1849,7 @@ func (s *stateChangeSuite) TestDropExpressionIndex() { var checkErr error d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) + defer d.SetHook(originalCallback) callback := &ddl.TestDDLCallback{} callback.OnJobUpdatedExported = func(job *model.Job) { if checkErr != nil { @@ -1885,7 +1885,7 @@ func (s *stateChangeSuite) TestDropExpressionIndex() { } } - d.(ddl.DDLForTest).SetHook(callback) + d.SetHook(callback) tk.MustExec("alter table t drop index idx") s.Require().NoError(checkErr) tk.MustExec("admin check table t") diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index a299b4fd48e46..b3750fd1c3a34 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2825,9 +2825,9 @@ func testPartitionCancelAddIndex(t *testing.T, store kv.Storage, d ddl.DDL, leas defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0])) hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExportedT(tk, store, hook, idxName) originHook := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originHook) + defer d.SetHook(originHook) jobIDExt := wrapJobIDExtCallback(hook) - d.(ddl.DDLForTest).SetHook(jobIDExt) + d.SetHook(jobIDExt) done := make(chan error, 1) go backgroundExecT(store, addIdxSQL, done) diff --git a/ddl/db_test.go b/ddl/db_test.go index f618df92618e9..de5161b431f68 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -24,7 +24,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "testing" "time" @@ -97,7 +96,6 @@ var _ = SerialSuites(&testDBSuite6{&testDBSuite{}}) var _ = Suite(&testDBSuite7{&testDBSuite{}}) var _ = Suite(&testDBSuite8{&testDBSuite{}}) var _ = SerialSuites(&testSerialDBSuite{&testDBSuite{}}) -var _ = SerialSuites(&testSerialDBSuite1{&testDBSuite{}}) const defaultBatchSize = 1024 const defaultReorgBatchSize = 256 @@ -169,43 +167,6 @@ type testDBSuite6 struct{ *testDBSuite } type testDBSuite7 struct{ *testDBSuite } type testDBSuite8 struct{ *testDBSuite } type testSerialDBSuite struct{ *testDBSuite } -type testSerialDBSuite1 struct{ *testDBSuite } - -func testAddIndexWithPK(tk *testkit.TestKit) { - tk.MustExec("drop table if exists test_add_index_with_pk") - tk.MustExec("create table test_add_index_with_pk(a int not null, b int not null default '0', primary key(a))") - tk.MustExec("insert into test_add_index_with_pk values(1, 2)") - tk.MustExec("alter table test_add_index_with_pk add index idx (a)") - tk.MustQuery("select a from test_add_index_with_pk").Check(testkit.Rows("1")) - tk.MustExec("insert into test_add_index_with_pk values(2, 2)") - tk.MustExec("alter table test_add_index_with_pk add index idx1 (a, b)") - tk.MustQuery("select * from test_add_index_with_pk").Check(testkit.Rows("1 2", "2 2")) - tk.MustExec("drop table if exists test_add_index_with_pk1") - tk.MustExec("create table test_add_index_with_pk1(a int not null, b int not null default '0', c int, d int, primary key(c))") - tk.MustExec("insert into test_add_index_with_pk1 values(1, 1, 1, 1)") - tk.MustExec("alter table test_add_index_with_pk1 add index idx (c)") - tk.MustExec("insert into test_add_index_with_pk1 values(2, 2, 2, 2)") - tk.MustQuery("select * from test_add_index_with_pk1").Check(testkit.Rows("1 1 1 1", "2 2 2 2")) - tk.MustExec("drop table if exists test_add_index_with_pk2") - tk.MustExec("create table test_add_index_with_pk2(a int not null, b int not null default '0', c int unsigned, d int, primary key(c))") - tk.MustExec("insert into test_add_index_with_pk2 values(1, 1, 1, 1)") - tk.MustExec("alter table test_add_index_with_pk2 add index idx (c)") - tk.MustExec("insert into test_add_index_with_pk2 values(2, 2, 2, 2)") - tk.MustQuery("select * from test_add_index_with_pk2").Check(testkit.Rows("1 1 1 1", "2 2 2 2")) - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (a int, b int, c int, primary key(a, b));") - tk.MustExec("insert into t values (1, 2, 3);") - tk.MustExec("create index idx on t (a, b);") -} - -func (s *testDBSuite7) TestAddIndexWithPK(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use " + s.schemaName) - - testAddIndexWithPK(tk) - tk.Se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn - testAddIndexWithPK(tk) -} func (s *testDBSuite5) TestAddIndexWithDupIndex(c *C) { tk := testkit.NewTestKit(c, s.store) @@ -325,32 +286,6 @@ func backgroundExecT(s kv.Storage, sql string, done chan error) { done <- errors.Trace(err) } -// TestAddPrimaryKeyRollback1 is used to test scenarios that will roll back when a duplicate primary key is encountered. -func (s *testDBSuite8) TestAddPrimaryKeyRollback1(c *C) { - hasNullValsInKey := false - idxName := "PRIMARY" - addIdxSQL := "alter table t1 add primary key c3_index (c3);" - errMsg := "[kv:1062]Duplicate entry '" + strconv.Itoa(defaultBatchSize*2-10) + "' for key 'PRIMARY'" - testAddIndexRollback(c, s.store, s.lease, idxName, addIdxSQL, errMsg, hasNullValsInKey) -} - -// TestAddPrimaryKeyRollback2 is used to test scenarios that will roll back when a null primary key is encountered. -func (s *testDBSuite8) TestAddPrimaryKeyRollback2(c *C) { - hasNullValsInKey := true - idxName := "PRIMARY" - addIdxSQL := "alter table t1 add primary key c3_index (c3);" - errMsg := "[ddl:1138]Invalid use of NULL value" - testAddIndexRollback(c, s.store, s.lease, idxName, addIdxSQL, errMsg, hasNullValsInKey) -} - -func (s *testDBSuite2) TestAddUniqueIndexRollback(c *C) { - hasNullValsInKey := false - idxName := "c3_index" - addIdxSQL := "create unique index c3_index on t1 (c3)" - errMsg := "[kv:1062]Duplicate entry '" + strconv.Itoa(defaultBatchSize*2-10) + "' for key 'c3_index'" - testAddIndexRollback(c, s.store, s.lease, idxName, addIdxSQL, errMsg, hasNullValsInKey) -} - func (s *testSerialDBSuite) TestWriteReorgForColumnTypeChangeOnAmendTxn(c *C) { tk2 := testkit.NewTestKit(c, s.store) tk2.MustExec("use test_db") @@ -361,7 +296,7 @@ func (s *testSerialDBSuite) TestWriteReorgForColumnTypeChangeOnAmendTxn(c *C) { d := s.dom.DDL() originalHook := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalHook) + defer d.SetHook(originalHook) testInsertOnModifyColumn := func(sql string, startColState, commitColState model.SchemaState, retStrs []string, retErr error) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test_db") @@ -395,7 +330,7 @@ func (s *testSerialDBSuite) TestWriteReorgForColumnTypeChangeOnAmendTxn(c *C) { } times++ } - d.(ddl.DDLForTest).SetHook(hook) + d.SetHook(hook) tk.MustExec(sql) if retErr == nil { @@ -475,7 +410,7 @@ func (s *testSerialDBSuite) TestAddExpressionIndexRollback(c *C) { } } } - d.(ddl.DDLForTest).SetHook(hook) + d.SetHook(hook) tk.MustGetErrMsg("alter table t1 add index expr_idx ((pow(c1, c2)));", "[ddl:8202]Cannot decode index value, because [types:1690]DOUBLE value is out of range in 'pow(160, 160)'") c.Assert(checkErr, IsNil) @@ -516,303 +451,6 @@ func batchInsertLegacy(tk *testkit.TestKit, tbl string, start, end int) { tk.MustExec(dml) } -func testAddIndexRollback(c *C, store kv.Storage, lease time.Duration, idxName, addIdxSQL, errMsg string, hasNullValsInKey bool) { - tk := testkit.NewTestKit(c, store) - tk.MustExec("use test_db") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (c1 int, c2 int, c3 int, unique key(c1))") - // defaultBatchSize is equal to ddl.defaultBatchSize - base := defaultBatchSize * 2 - count := base - // add some rows - batchInsertLegacy(tk, "t1", 0, count) - // add some null rows - if hasNullValsInKey { - for i := count - 10; i < count; i++ { - tk.MustExec("insert into t1 values (?, ?, null)", i+10, i) - } - } else { - // add some duplicate rows - for i := count - 10; i < count; i++ { - tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) - } - } - - done := make(chan error, 1) - go backgroundExec(store, addIdxSQL, done) - - times := 0 - ticker := time.NewTicker(lease / 2) - defer ticker.Stop() -LOOP: - for { - select { - case err := <-done: - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, errMsg, Commentf("err:%v", err)) - break LOOP - case <-ticker.C: - if times >= 10 { - break - } - step := 5 - // delete some rows, and add some data - for i := count; i < count+step; i++ { - n := rand.Intn(count) - // (2048, 2038, 2038) and (2038, 2038, 2038) - // Don't delete rows where c1 is 2048 or 2038, otherwise, the entry value in duplicated error message would change. - if n == defaultBatchSize*2-10 || n == defaultBatchSize*2 { - continue - } - tk.MustExec("delete from t1 where c1 = ?", n) - tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) - } - count += step - times++ - } - } - - ctx := tk.Se.(sessionctx.Context) - t := testGetTableByName(c, ctx, "test_db", "t1") - for _, tidx := range t.Indices() { - c.Assert(strings.EqualFold(tidx.Meta().Name.L, idxName), IsFalse) - } - - // delete duplicated/null rows, then add index - for i := base - 10; i < base; i++ { - tk.MustExec("delete from t1 where c1 = ?", i+10) - } - sessionExec(c, store, addIdxSQL) - tk.MustExec("drop table t1") -} - -func (s *testDBSuite8) TestCancelAddPrimaryKey(c *C) { - idxName := "primary" - addIdxSQL := "alter table t1 add primary key idx_c2 (c2);" - testCancelAddIndex(c, s.store, s.dom.DDL(), s.lease, idxName, addIdxSQL, "", s.dom) - - // Check the column's flag when the "add primary key" failed. - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test_db") - ctx := tk.Se.(sessionctx.Context) - c.Assert(ctx.NewTxn(context.Background()), IsNil) - t := testGetTableByName(c, ctx, "test_db", "t1") - col1Flag := t.Cols()[1].Flag - c.Assert(!mysql.HasNotNullFlag(col1Flag) && !mysql.HasPreventNullInsertFlag(col1Flag) && mysql.HasUnsignedFlag(col1Flag), IsTrue) - tk.MustExec("drop table t1") -} - -func (s *testDBSuite7) TestCancelAddIndex(c *C) { - idxName := "c3_index" - addIdxSQL := "create unique index c3_index on t1 (c3)" - testCancelAddIndex(c, s.store, s.dom.DDL(), s.lease, idxName, addIdxSQL, "", s.dom) - - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test_db") - tk.MustExec("drop table t1") -} - -func backgroundExecOnJobUpdatedExported(c *C, store kv.Storage, ctx sessionctx.Context, hook *ddl.TestDDLCallback, idxName string) ( - func(*model.Job), *model.IndexInfo, error) { - var checkErr error - first := true - c3IdxInfo := &model.IndexInfo{} - hook.OnJobUpdatedExported = func(job *model.Job) { - addIndexNotFirstReorg := (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && - job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 - // If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes. - // When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes. - if !addIndexNotFirstReorg { - // Get the index's meta. - if c3IdxInfo.ID != 0 { - return - } - t := testGetTableByName(c, ctx, "test_db", "t1") - for _, index := range t.Indices() { - if !tables.IsIndexWritable(index) { - continue - } - if index.Meta().Name.L == idxName { - *c3IdxInfo = *index.Meta() - } - } - return - } - // The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes. - if first { - first = false - return - } - if checkErr != nil { - return - } - hookCtx := mock.NewContext() - hookCtx.Store = store - err := hookCtx.NewTxn(context.Background()) - if err != nil { - checkErr = errors.Trace(err) - return - } - jobIDs := []int64{job.ID} - txn, err := hookCtx.Txn(true) - if err != nil { - checkErr = errors.Trace(err) - return - } - errs, err := admin.CancelJobs(txn, jobIDs) - if err != nil { - checkErr = errors.Trace(err) - return - } - // It only tests cancel one DDL job. - if errs[0] != nil { - checkErr = errors.Trace(errs[0]) - return - } - txn, err = hookCtx.Txn(true) - if err != nil { - checkErr = errors.Trace(err) - return - } - err = txn.Commit(context.Background()) - if err != nil { - checkErr = errors.Trace(err) - } - } - return hook.OnJobUpdatedExported, c3IdxInfo, checkErr -} - -func testCancelAddIndex(c *C, store kv.Storage, d ddl.DDL, lease time.Duration, idxName, addIdxSQL, sqlModeSQL string, dom *domain.Domain) { - tk := testkit.NewTestKit(c, store) - tk.MustExec("use test_db") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (c1 int, c2 int unsigned, c3 int, unique key(c1))") - // defaultBatchSize is equal to ddl.defaultBatchSize - count := defaultBatchSize * 32 - start := 0 - // add some rows - if len(sqlModeSQL) != 0 { - // Insert some null values. - tk.MustExec(sqlModeSQL) - tk.MustExec("insert into t1 set c1 = ?", 0) - tk.MustExec("insert into t1 set c2 = ?", 1) - tk.MustExec("insert into t1 set c3 = ?", 2) - start = 3 - } - for i := start; i < count; i += defaultBatchSize { - batchInsertLegacy(tk, "t1", i, i+defaultBatchSize) - } - - var c3IdxInfo *model.IndexInfo - hook := &ddl.TestDDLCallback{Do: dom} - originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") - // Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job. - tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 32") - defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0])) - // let hook.OnJobUpdatedExported has chance to cancel the job. - // the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob. - // After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case. - var checkErr error - ctx := tk.Se.(sessionctx.Context) - hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(c, store, ctx, hook, idxName) - originalHook := d.GetHook() - jobIDExt := wrapJobIDExtCallback(hook) - d.(ddl.DDLForTest).SetHook(jobIDExt) - done := make(chan error, 1) - go backgroundExec(store, addIdxSQL, done) - - times := 0 - ticker := time.NewTicker(lease / 2) - defer ticker.Stop() -LOOP: - for { - select { - case err := <-done: - c.Assert(checkErr, IsNil) - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "[ddl:8214]Cancelled DDL job") - break LOOP - case <-ticker.C: - if times >= 10 { - break - } - step := 5 - // delete some rows, and add some data - for i := count; i < count+step; i++ { - n := rand.Intn(count) - tk.MustExec("delete from t1 where c1 = ?", n) - tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) - } - count += step - times++ - } - } - checkDelRangeAdded(tk, jobIDExt.jobID, c3IdxInfo.ID) - d.(ddl.DDLForTest).SetHook(originalHook) -} - -// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started. -func (s *testDBSuite4) TestCancelAddIndex1(c *C) { - tk := testkit.NewTestKit(c, s.store) - s.mustExec(tk, c, "use test_db") - s.mustExec(tk, c, "drop table if exists t") - s.mustExec(tk, c, "create table t(c1 int, c2 int)") - defer s.mustExec(tk, c, "drop table t;") - - for i := 0; i < 50; i++ { - s.mustExec(tk, c, "insert into t values (?, ?)", i, i) - } - - var checkErr error - hook := &ddl.TestDDLCallback{Do: s.dom} - hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 { - jobIDs := []int64{job.ID} - hookCtx := mock.NewContext() - hookCtx.Store = s.store - err := hookCtx.NewTxn(context.Background()) - if err != nil { - checkErr = errors.Trace(err) - return - } - txn, err := hookCtx.Txn(true) - if err != nil { - checkErr = errors.Trace(err) - return - } - errs, err := admin.CancelJobs(txn, jobIDs) - if err != nil { - checkErr = errors.Trace(err) - return - } - - if errs[0] != nil { - checkErr = errors.Trace(errs[0]) - return - } - - checkErr = txn.Commit(context.Background()) - } - } - originalHook := s.dom.DDL().GetHook() - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) - rs, err := tk.Exec("alter table t add index idx_c2(c2)") - if rs != nil { - rs.Close() - } - c.Assert(checkErr, IsNil) - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "[ddl:8214]Cancelled DDL job") - - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - t := s.testGetTable(c, "t") - for _, idx := range t.Indices() { - c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c2"), IsFalse) - } - s.mustExec(tk, c, "alter table t add index idx_c2(c2)") - s.mustExec(tk, c, "alter table t drop index idx_c2") -} - // TestCancelDropIndex tests cancel ddl job which type is drop primary key. func (s *testDBSuite4) TestCancelDropPrimaryKey(c *C) { idxName := "primary" @@ -887,7 +525,7 @@ func testCancelDropIndex(c *C, store kv.Storage, d ddl.DDL, idxName, addIdxSQL, } } originalHook := d.GetHook() - d.(ddl.DDLForTest).SetHook(hook) + d.SetHook(hook) ctx := tk.Se.(sessionctx.Context) for i := range testCases { testCase = &testCases[i] @@ -914,7 +552,7 @@ func testCancelDropIndex(c *C, store kv.Storage, d ddl.DDL, idxName, addIdxSQL, c.Assert(indexInfo, IsNil) } } - d.(ddl.DDLForTest).SetHook(originalHook) + d.SetHook(originalHook) tk.MustExec(addIdxSQL) tk.MustExec(dropIdxSQL) } @@ -957,12 +595,12 @@ func (s *testDBSuite5) TestCancelTruncateTable(c *C) { } } originalHook := s.dom.DDL().GetHook() - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) _, err := tk.Exec("truncate table t") c.Assert(checkErr, IsNil) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[ddl:8214]Cancelled DDL job") - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + s.dom.DDL().SetHook(originalHook) } func (s *testDBSuite5) TestParallelDropSchemaAndDropTable(c *C) { @@ -990,9 +628,9 @@ func (s *testDBSuite5) TestParallelDropSchemaAndDropTable(c *C) { } } originalHook := s.dom.DDL().GetHook() - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) s.mustExec(tk, c, "drop database test_drop_schema_table") - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + s.dom.DDL().SetHook(originalHook) wg.Wait() c.Assert(done, IsTrue) c.Assert(checkErr, NotNil) @@ -1062,7 +700,7 @@ func (s *testDBSuite1) TestCancelRenameIndex(c *C) { } } originalHook := s.dom.DDL().GetHook() - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) rs, err := tk.Exec("alter table t rename index idx_c2 to idx_c3") if rs != nil { rs.Close() @@ -1070,7 +708,7 @@ func (s *testDBSuite1) TestCancelRenameIndex(c *C) { c.Assert(checkErr, IsNil) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[ddl:8214]Cancelled DDL job") - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + s.dom.DDL().SetHook(originalHook) t := s.testGetTable(c, "t") for _, idx := range t.Indices() { c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c3"), IsFalse) @@ -1135,8 +773,8 @@ func (s *testDBSuite2) TestCancelDropTableAndSchema(c *C) { } } originHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + defer s.dom.DDL().SetHook(originHook) + s.dom.DDL().SetHook(hook) var err error sql := "" for i := range testCases { @@ -1171,65 +809,6 @@ func (s *testDBSuite2) TestCancelDropTableAndSchema(c *C) { } } -func (s *testDBSuite3) TestAddAnonymousIndex(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use " + s.schemaName) - s.mustExec(tk, c, "create table t_anonymous_index (c1 int, c2 int, C3 int)") - s.mustExec(tk, c, "alter table t_anonymous_index add index (c1, c2)") - // for dropping empty index - _, err := tk.Exec("alter table t_anonymous_index drop index") - c.Assert(err, NotNil) - // The index name is c1 when adding index (c1, c2). - s.mustExec(tk, c, "alter table t_anonymous_index drop index c1") - t := s.testGetTable(c, "t_anonymous_index") - c.Assert(t.Indices(), HasLen, 0) - // for adding some indices that the first column name is c1 - s.mustExec(tk, c, "alter table t_anonymous_index add index (c1)") - _, err = tk.Exec("alter table t_anonymous_index add index c1 (c2)") - c.Assert(err, NotNil) - t = s.testGetTable(c, "t_anonymous_index") - c.Assert(t.Indices(), HasLen, 1) - idx := t.Indices()[0].Meta().Name.L - c.Assert(idx, Equals, "c1") - // The MySQL will be a warning. - s.mustExec(tk, c, "alter table t_anonymous_index add index c1_3 (c1)") - s.mustExec(tk, c, "alter table t_anonymous_index add index (c1, c2, C3)") - // The MySQL will be a warning. - s.mustExec(tk, c, "alter table t_anonymous_index add index (c1)") - t = s.testGetTable(c, "t_anonymous_index") - c.Assert(t.Indices(), HasLen, 4) - s.mustExec(tk, c, "alter table t_anonymous_index drop index c1") - s.mustExec(tk, c, "alter table t_anonymous_index drop index c1_2") - s.mustExec(tk, c, "alter table t_anonymous_index drop index c1_3") - s.mustExec(tk, c, "alter table t_anonymous_index drop index c1_4") - // for case insensitive - s.mustExec(tk, c, "alter table t_anonymous_index add index (C3)") - s.mustExec(tk, c, "alter table t_anonymous_index drop index c3") - s.mustExec(tk, c, "alter table t_anonymous_index add index c3 (C3)") - s.mustExec(tk, c, "alter table t_anonymous_index drop index C3") - // for anonymous index with column name `primary` - s.mustExec(tk, c, "create table t_primary (`primary` int, b int, key (`primary`))") - t = s.testGetTable(c, "t_primary") - c.Assert(t.Indices()[0].Meta().Name.String(), Equals, "primary_2") - s.mustExec(tk, c, "alter table t_primary add index (`primary`);") - t = s.testGetTable(c, "t_primary") - c.Assert(t.Indices()[0].Meta().Name.String(), Equals, "primary_2") - c.Assert(t.Indices()[1].Meta().Name.String(), Equals, "primary_3") - s.mustExec(tk, c, "alter table t_primary add primary key(b);") - t = s.testGetTable(c, "t_primary") - c.Assert(t.Indices()[0].Meta().Name.String(), Equals, "primary_2") - c.Assert(t.Indices()[1].Meta().Name.String(), Equals, "primary_3") - c.Assert(t.Indices()[2].Meta().Name.L, Equals, "primary") - s.mustExec(tk, c, "create table t_primary_2 (`primary` int, key primary_2 (`primary`), key (`primary`))") - t = s.testGetTable(c, "t_primary_2") - c.Assert(t.Indices()[0].Meta().Name.String(), Equals, "primary_2") - c.Assert(t.Indices()[1].Meta().Name.String(), Equals, "primary_3") - s.mustExec(tk, c, "create table t_primary_3 (`primary_2` int, key(`primary_2`), `primary` int, key(`primary`));") - t = s.testGetTable(c, "t_primary_3") - c.Assert(t.Indices()[0].Meta().Name.String(), Equals, "primary_2") - c.Assert(t.Indices()[1].Meta().Name.String(), Equals, "primary_3") -} - func (s *testDBSuite4) TestAlterLock(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use " + s.schemaName) @@ -1278,393 +857,6 @@ func (s *testDBSuite6) TestAddMultiColumnsIndexClusterIndex(c *C) { tk.MustExec("admin check table t;") } -func (s *testDBSuite6) TestAddPrimaryKey1(c *C) { - testAddIndex(c, s.store, s.lease, testPlain, - "create table test_add_index (c1 bigint, c2 bigint, c3 bigint, unique key(c1))", "primary") -} - -func (s *testDBSuite2) TestAddPrimaryKey2(c *C) { - testAddIndex(c, s.store, s.lease, testPartition, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) - partition by range (c3) ( - partition p0 values less than (3440), - partition p1 values less than (61440), - partition p2 values less than (122880), - partition p3 values less than (204800), - partition p4 values less than maxvalue)`, "primary") -} - -func (s *testDBSuite3) TestAddPrimaryKey3(c *C) { - testAddIndex(c, s.store, s.lease, testPartition, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) - partition by hash (c3) partitions 4;`, "primary") -} - -func (s *testDBSuite4) TestAddPrimaryKey4(c *C) { - testAddIndex(c, s.store, s.lease, testPartition, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) - partition by range columns (c3) ( - partition p0 values less than (3440), - partition p1 values less than (61440), - partition p2 values less than (122880), - partition p3 values less than (204800), - partition p4 values less than maxvalue)`, "primary") -} - -func (s *testDBSuite6) TestAddIndex1(c *C) { - testAddIndex(c, s.store, s.lease, testPlain, - "create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))", "") -} - -func (s *testSerialDBSuite1) TestAddIndex1WithShardRowID(c *C) { - testAddIndex(c, s.store, s.lease, testPartition|testShardRowID, - "create table test_add_index (c1 bigint, c2 bigint, c3 bigint) SHARD_ROW_ID_BITS = 4 pre_split_regions = 4;", "") -} - -func (s *testDBSuite2) TestAddIndex2(c *C) { - testAddIndex(c, s.store, s.lease, testPartition, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) - partition by range (c1) ( - partition p0 values less than (3440), - partition p1 values less than (61440), - partition p2 values less than (122880), - partition p3 values less than (204800), - partition p4 values less than maxvalue)`, "") -} - -func (s *testSerialDBSuite) TestAddIndex2WithShardRowID(c *C) { - testAddIndex(c, s.store, s.lease, testPartition|testShardRowID, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) - SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 - partition by range (c1) ( - partition p0 values less than (3440), - partition p1 values less than (61440), - partition p2 values less than (122880), - partition p3 values less than (204800), - partition p4 values less than maxvalue)`, "") -} - -func (s *testDBSuite3) TestAddIndex3(c *C) { - testAddIndex(c, s.store, s.lease, testPartition, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) - partition by hash (c1) partitions 4;`, "") -} - -func (s *testSerialDBSuite1) TestAddIndex3WithShardRowID(c *C) { - testAddIndex(c, s.store, s.lease, testPartition|testShardRowID, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) - SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 - partition by hash (c1) partitions 4;`, "") -} - -func (s *testDBSuite8) TestAddIndex4(c *C) { - testAddIndex(c, s.store, s.lease, testPartition, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) - partition by range columns (c1) ( - partition p0 values less than (3440), - partition p1 values less than (61440), - partition p2 values less than (122880), - partition p3 values less than (204800), - partition p4 values less than maxvalue)`, "") -} - -func (s *testSerialDBSuite) TestAddIndex4WithShardRowID(c *C) { - testAddIndex(c, s.store, s.lease, testPartition|testShardRowID, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) - SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 - partition by range columns (c1) ( - partition p0 values less than (3440), - partition p1 values less than (61440), - partition p2 values less than (122880), - partition p3 values less than (204800), - partition p4 values less than maxvalue)`, "") -} - -func (s *testDBSuite5) TestAddIndex5(c *C) { - testAddIndex(c, s.store, s.lease, testClusteredIndex, - `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c2, c3))`, "") -} - -type testAddIndexType uint8 - -const ( - testPlain testAddIndexType = 1 - testPartition testAddIndexType = 1 << 1 - testClusteredIndex testAddIndexType = 1 << 2 - testShardRowID testAddIndexType = 1 << 3 -) - -func testAddIndex(c *C, store kv.Storage, lease time.Duration, tp testAddIndexType, createTableSQL, idxTp string) { - tk := testkit.NewTestKit(c, store) - tk.MustExec("use test_db") - isTestPartition := (testPartition & tp) > 0 - isTestShardRowID := (testShardRowID & tp) > 0 - if isTestShardRowID { - atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1) - tk.MustExec("set global tidb_scatter_region = 1") - defer func() { - atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0) - tk.MustExec("set global tidb_scatter_region = 0") - }() - } - if isTestPartition { - tk.MustExec("set @@session.tidb_enable_table_partition = '1';") - } else if (testClusteredIndex & tp) > 0 { - tk.Se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn - } - tk.MustExec("drop table if exists test_add_index") - tk.MustExec(createTableSQL) - - done := make(chan error, 1) - start := -10 - num := defaultBatchSize - // first add some rows - batchInsertLegacy(tk, "test_add_index", start, num) - - // Add some discrete rows. - maxBatch := 20 - batchCnt := 100 - otherKeys := make([]int, 0, batchCnt*maxBatch) - // Make sure there are no duplicate keys. - base := defaultBatchSize * 20 - for i := 1; i < batchCnt; i++ { - if isTestShardRowID { - base = i % 4 << 61 - } - n := base + i*defaultBatchSize + i - for j := 0; j < rand.Intn(maxBatch); j++ { - n += j - sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) - tk.MustExec(sql) - otherKeys = append(otherKeys, n) - } - } - // Encounter the value of math.MaxInt64 in middle of - v := math.MaxInt64 - defaultBatchSize/2 - tk.MustExec(fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v)) - otherKeys = append(otherKeys, v) - - addIdxSQL := fmt.Sprintf("alter table test_add_index add %s key c3_index(c3)", idxTp) - testddlutil.SessionExecInGoroutine(store, addIdxSQL, done) - - deletedKeys := make(map[int]struct{}) - - ticker := time.NewTicker(lease / 2) - defer ticker.Stop() -LOOP: - for { - select { - case err := <-done: - if err == nil { - break LOOP - } - c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) - case <-ticker.C: - // When the server performance is particularly poor, - // the adding index operation can not be completed. - // So here is a limit to the number of rows inserted. - if num > defaultBatchSize*10 { - break - } - step := 5 - // delete some rows, and add some data - for i := num; i < num+step; i++ { - n := rand.Intn(num) - deletedKeys[n] = struct{}{} - sql := fmt.Sprintf("delete from test_add_index where c1 = %d", n) - tk.MustExec(sql) - sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) - tk.MustExec(sql) - } - num += step - } - } - - if isTestShardRowID { - re := tk.MustQuery("show table test_add_index regions;") - rows := re.Rows() - c.Assert(len(rows), GreaterEqual, 16) - tk.MustExec("admin check table test_add_index") - return - } - - // get exists keys - keys := make([]int, 0, num) - for i := start; i < num; i++ { - if _, ok := deletedKeys[i]; ok { - continue - } - keys = append(keys, i) - } - keys = append(keys, otherKeys...) - - // test index key - expectedRows := make([][]interface{}, 0, len(keys)) - for _, key := range keys { - expectedRows = append(expectedRows, []interface{}{key}) - } - rows := tk.MustQuery(fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)).Rows() - matchRows(c, rows, expectedRows) - - tk.MustExec("admin check table test_add_index") - if isTestPartition { - return - } - - // TODO: Support explain in future. - // rows := s.mustQuery(c, "explain select c1 from test_add_index where c3 >= 100") - - // ay := dumpRows(c, rows) - // c.Assert(strings.Contains(fmt.Sprintf("%v", ay), "c3_index"), IsTrue) - - // get all row handles - ctx := tk.Se.(sessionctx.Context) - c.Assert(ctx.NewTxn(context.Background()), IsNil) - t := testGetTableByName(c, ctx, "test_db", "test_add_index") - handles := kv.NewHandleMap() - err := tables.IterRecords(t, ctx, t.Cols(), - func(h kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) { - handles.Set(h, struct{}{}) - return true, nil - }) - c.Assert(err, IsNil) - - // check in index - var nidx table.Index - idxName := "c3_index" - if len(idxTp) != 0 { - idxName = "primary" - } - for _, tidx := range t.Indices() { - if tidx.Meta().Name.L == idxName { - nidx = tidx - break - } - } - // Make sure there is index with name c3_index. - c.Assert(nidx, NotNil) - c.Assert(nidx.Meta().ID, Greater, int64(0)) - txn, err := ctx.Txn(true) - c.Assert(err, IsNil) - err = txn.Rollback() - c.Assert(err, IsNil) - - c.Assert(ctx.NewTxn(context.Background()), IsNil) - - tk.MustExec("admin check table test_add_index") - tk.MustExec("drop table test_add_index") -} - -func (s *testDBSuite1) TestAddIndexWithSplitTable(c *C) { - createSQL := "CREATE TABLE test_add_index(a bigint PRIMARY KEY AUTO_RANDOM(4), b varchar(255), c bigint)" - stSQL := fmt.Sprintf("SPLIT TABLE test_add_index BETWEEN (%d) AND (%d) REGIONS 16;", math.MinInt64, math.MaxInt64) - testAddIndexWithSplitTable(c, s.store, s.lease, createSQL, stSQL) -} - -func (s *testSerialDBSuite) TestAddIndexWithShardRowID(c *C) { - createSQL := "create table test_add_index(a bigint, b bigint, c bigint) SHARD_ROW_ID_BITS = 4 pre_split_regions = 4;" - testAddIndexWithSplitTable(c, s.store, s.lease, createSQL, "") -} - -func testAddIndexWithSplitTable(c *C, store kv.Storage, lease time.Duration, createSQL, splitTableSQL string) { - tk := testkit.NewTestKit(c, store) - tk.MustExec("use test_db") - tk.MustExec("drop table if exists test_add_index") - hasAutoRadomField := len(splitTableSQL) > 0 - if !hasAutoRadomField { - atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1) - tk.MustExec("set global tidb_scatter_region = 1") - defer func() { - atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0) - tk.MustExec("set global tidb_scatter_region = 0") - }() - } - tk.MustExec(createSQL) - - batchInsertRows := func(tk *testkit.TestKit, needVal bool, tbl string, start, end int) error { - dml := fmt.Sprintf("insert into %s values", tbl) - for i := start; i < end; i++ { - if needVal { - dml += fmt.Sprintf("(%d, %d, %d)", i, i, i) - } else { - dml += "()" - } - if i != end-1 { - dml += "," - } - } - _, err := tk.Exec(dml) - return err - } - - done := make(chan error, 1) - start := -20 - num := defaultBatchSize - // Add some discrete rows. - goCnt := 10 - errCh := make(chan error, goCnt) - for i := 0; i < goCnt; i++ { - base := (i % 8) << 60 - go func(b int, eCh chan error) { - tk1 := testkit.NewTestKit(c, store) - tk1.MustExec("use test_db") - eCh <- batchInsertRows(tk1, !hasAutoRadomField, "test_add_index", base+start, base+num) - }(base, errCh) - } - for i := 0; i < goCnt; i++ { - e := <-errCh - c.Assert(e, IsNil) - } - - if hasAutoRadomField { - tk.MustQuery(splitTableSQL).Check(testkit.Rows("15 1")) - } - tk.MustQuery("select @@session.tidb_wait_split_region_finish;").Check(testkit.Rows("1")) - re := tk.MustQuery("show table test_add_index regions;") - rows := re.Rows() - c.Assert(len(rows), Equals, 16) - addIdxSQL := "alter table test_add_index add index idx(a)" - testddlutil.SessionExecInGoroutine(store, addIdxSQL, done) - - ticker := time.NewTicker(lease / 5) - defer ticker.Stop() - num = 0 -LOOP: - for { - select { - case err := <-done: - if err == nil { - break LOOP - } - c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) - case <-ticker.C: - // When the server performance is particularly poor, - // the adding index operation can not be completed. - // So here is a limit to the number of rows inserted. - if num >= 1000 { - break - } - step := 20 - // delete, insert and update some data - for i := num; i < num+step; i++ { - sql := fmt.Sprintf("delete from test_add_index where a = %d", i+1) - tk.MustExec(sql) - if hasAutoRadomField { - sql = "insert into test_add_index values ()" - } else { - sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) - } - tk.MustExec(sql) - sql = fmt.Sprintf("update test_add_index set b = %d", i*10) - tk.MustExec(sql) - } - num += step - } - } - - tk.MustExec("admin check table test_add_index") -} - // TestCancelAddTableAndDropTablePartition tests cancel ddl job which type is add/drop table partition. func (s *testDBSuite1) TestCancelAddTableAndDropTablePartition(c *C) { tk := testkit.NewTestKit(c, s.store) @@ -1726,7 +918,7 @@ func (s *testDBSuite1) TestCancelAddTableAndDropTablePartition(c *C) { } } originalHook := s.dom.DDL().GetHook() - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) var err error sql := "" @@ -1761,7 +953,7 @@ func (s *testDBSuite1) TestCancelAddTableAndDropTablePartition(c *C) { c.Assert(err, NotNil) } } - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + s.dom.DDL().SetHook(originalHook) } func (s *testDBSuite1) TestDropPrimaryKey(c *C) { @@ -1795,7 +987,7 @@ func testDropIndex(c *C, store kv.Storage, lease time.Duration, createSQL, dropI indexID := testGetIndexID(c, ctx, "test_db", "test_drop_index", idxName) jobIDExt, reset := setupJobIDExtCallback(ctx) defer reset() - testddlutil.SessionExecInGoroutine(store, dropIdxSQL, done) + testddlutil.SessionExecInGoroutine(store, "test_db", dropIdxSQL, done) ticker := time.NewTicker(lease / 2) defer ticker.Stop() @@ -1878,7 +1070,7 @@ func (s *testDBSuite3) TestCancelDropColumn(c *C) { } originalHook := s.dom.DDL().GetHook() - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) var err1 error for i := range testCases { var c3IdxID int64 @@ -1924,7 +1116,7 @@ func (s *testDBSuite3) TestCancelDropColumn(c *C) { } } } - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + s.dom.DDL().SetHook(originalHook) s.mustExec(tk, c, "alter table test_drop_column add column c3 int") s.mustExec(tk, c, "alter table test_drop_column drop column c3") } @@ -1981,7 +1173,7 @@ func (s *testDBSuite3) TestCancelDropColumns(c *C) { } originalHook := s.dom.DDL().GetHook() - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) var err1 error for i := range testCases { var c3IdxID int64 @@ -2025,7 +1217,7 @@ func (s *testDBSuite3) TestCancelDropColumns(c *C) { } } } - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + s.dom.DDL().SetHook(originalHook) s.mustExec(tk, c, "alter table test_drop_column add column c3 int, add column c4 int") s.mustExec(tk, c, "alter table test_drop_column drop column c3, drop column c4") } @@ -2376,17 +1568,6 @@ func (s *testDBSuite6) TestColumn(c *C) { tk.MustExec("drop table t2") } -func sessionExec(c *C, s kv.Storage, sql string) { - se, err := session.CreateSession4Test(s) - c.Assert(err, IsNil) - _, err = se.Execute(context.Background(), "use test_db") - c.Assert(err, IsNil) - rs, err := se.Execute(context.Background(), sql) - c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) - c.Assert(rs, IsNil) - se.Close() -} - func (s *testDBSuite) testAddColumn(tk *testkit.TestKit, c *C) { done := make(chan error, 1) @@ -2394,7 +1575,7 @@ func (s *testDBSuite) testAddColumn(tk *testkit.TestKit, c *C) { // add some rows batchInsertLegacy(tk, "t2", 0, num) - testddlutil.SessionExecInGoroutine(s.store, "alter table t2 add column c4 int default -1", done) + testddlutil.SessionExecInGoroutine(s.store, "test_db", "alter table t2 add column c4 int default -1", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -2540,7 +1721,7 @@ func (s *testDBSuite) testDropColumn(tk *testkit.TestKit, c *C) { } // get c4 column id - testddlutil.SessionExecInGoroutine(s.store, "alter table t2 drop column c4", done) + testddlutil.SessionExecInGoroutine(s.store, "test_db", "alter table t2 drop column c4", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -2839,8 +2020,8 @@ func (s *testSerialDBSuite) TestCreateTableWithLike2(c *C) { } } originalHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + defer s.dom.DDL().SetHook(originalHook) + s.dom.DDL().SetHook(hook) // create table when refer table add column tk.MustExec("alter table t1 add column d int") @@ -2886,7 +2067,7 @@ func (s *testSerialDBSuite) TestCreateTableWithLike2(c *C) { c.Assert(err, IsNil) }() - s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) + s.dom.DDL().SetHook(originalHook) tk.MustExec("drop table if exists t1,t2;") tk.MustExec("create table t1 (a int) partition by hash(a) partitions 2;") tk.MustExec("alter table t1 set tiflash replica 3 location labels 'a','b';") @@ -3121,8 +2302,8 @@ func (s *testSerialDBSuite) TestRepairTable(c *C) { } } originalHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook) - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + defer s.dom.DDL().SetHook(originalHook) + s.dom.DDL().SetHook(hook) // Exec the repair statement to override the tableInfo. tk.MustExec("admin repair table origin CREATE TABLE origin (a int primary key nonclustered auto_increment, b varchar(5), c int);") @@ -3839,7 +3020,7 @@ func (s *testDBSuite2) TestAddNotNullColumn(c *C) { tk.MustExec("create table tnn (c1 int primary key auto_increment, c2 int)") tk.MustExec("insert tnn (c2) values (0)" + strings.Repeat(",(0)", 99)) done := make(chan error, 1) - testddlutil.SessionExecInGoroutine(s.store, "alter table tnn add column c3 int not null default 3", done) + testddlutil.SessionExecInGoroutine(s.store, "test_db", "alter table tnn add column c3 int not null default 3", done) updateCnt := 0 out: for { @@ -4298,7 +3479,7 @@ func (s *testDBSuite2) TestTransactionOnAddDropColumn(c *C) { } originHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) + defer s.dom.DDL().SetHook(originHook) hook := &ddl.TestDDLCallback{Do: s.dom} var checkErr error hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -4320,7 +3501,7 @@ func (s *testDBSuite2) TestTransactionOnAddDropColumn(c *C) { } } } - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) done := make(chan error, 1) // test transaction on add column. go backgroundExec(s.store, "alter table t1 add column c int not null after a", done) @@ -4346,7 +3527,7 @@ func (s *testDBSuite3) TestIssue22307(c *C) { s.mustExec(tk, c, "insert into t values(1, 1);") originHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) + defer s.dom.DDL().SetHook(originHook) hook := &ddl.TestDDLCallback{Do: s.dom} var checkErr1, checkErr2 error hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -4356,7 +3537,7 @@ func (s *testDBSuite3) TestIssue22307(c *C) { _, checkErr1 = tk.Exec("update t set a = 3 where b = 1;") _, checkErr2 = tk.Exec("update t set a = 3 order by b;") } - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) done := make(chan error, 1) // test transaction on add column. go backgroundExec(s.store, "alter table t drop column b;", done) @@ -4382,7 +3563,7 @@ func (s *testDBSuite3) TestTransactionWithWriteOnlyColumn(c *C) { } originHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) + defer s.dom.DDL().SetHook(originHook) hook := &ddl.TestDDLCallback{Do: s.dom} var checkErr error hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -4404,7 +3585,7 @@ func (s *testDBSuite3) TestTransactionWithWriteOnlyColumn(c *C) { } } } - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) done := make(chan error, 1) // test transaction on add column. go backgroundExec(s.store, "alter table t1 add column c int not null", done) @@ -4430,7 +3611,7 @@ func (s *testDBSuite4) TestAddColumn2(c *C) { defer s.mustExec(tk, c, "drop table if exists t1, t2") originHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) + defer s.dom.DDL().SetHook(originHook) hook := &ddl.TestDDLCallback{} var writeOnlyTable table.Table hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -4438,7 +3619,7 @@ func (s *testDBSuite4) TestAddColumn2(c *C) { writeOnlyTable, _ = s.dom.InfoSchema().TableByID(job.TableID) } } - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) done := make(chan error, 1) // test transaction on add column. go backgroundExec(s.store, "alter table t1 add column c int not null", done) @@ -4481,7 +3662,7 @@ func (s *testDBSuite4) TestAddColumn2(c *C) { s.mustExec(tk, c, "commit") } - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + s.dom.DDL().SetHook(hook) go backgroundExec(s.store, "alter table t2 add column b int not null default 3", done) err = <-done @@ -4580,52 +3761,6 @@ func (s *testDBSuite4) TestIfExists(c *C) { tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Note|1507|Error in list of partitions to DROP")) } -func testAddIndexForGeneratedColumn(tk *testkit.TestKit, s *testDBSuite5, c *C) { - tk.MustExec("use test_db") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(y year NOT NULL DEFAULT '2155')") - defer s.mustExec(tk, c, "drop table t;") - for i := 0; i < 50; i++ { - s.mustExec(tk, c, "insert into t values (?)", i) - } - tk.MustExec("insert into t values()") - tk.MustExec("ALTER TABLE t ADD COLUMN y1 year as (y + 2)") - _, err := tk.Exec("ALTER TABLE t ADD INDEX idx_y(y1)") - c.Assert(err, IsNil) - - t := s.testGetTable(c, "t") - for _, idx := range t.Indices() { - c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c2"), IsFalse) - } - // NOTE: this test case contains a bug, it should be uncommented after the bug is fixed. - // TODO: Fix bug https://github.com/pingcap/tidb/issues/12181 - // s.mustExec(c, "delete from t where y = 2155") - // s.mustExec(c, "alter table t add index idx_y(y1)") - // s.mustExec(c, "alter table t drop index idx_y") - - // Fix issue 9311. - tk.MustExec("drop table if exists gcai_table") - tk.MustExec("create table gcai_table (id int primary key);") - tk.MustExec("insert into gcai_table values(1);") - tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d date DEFAULT '9999-12-31';") - tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d1 date as (DATE_SUB(d, INTERVAL 31 DAY));") - tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx(d1);") - tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30")) - tk.MustQuery("select d1 from gcai_table use index(idx)").Check(testkit.Rows("9999-11-30")) - tk.MustExec("admin check table gcai_table") - // The column is PKIsHandle in generated column expression. - tk.MustExec("ALTER TABLE gcai_table ADD COLUMN id1 int as (id+5);") - tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx1(id1);") - tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30 6")) - tk.MustQuery("select id1 from gcai_table use index(idx1)").Check(testkit.Rows("6")) - tk.MustExec("admin check table gcai_table") -} - -func (s *testDBSuite5) TestAddIndexForGeneratedColumn(c *C) { - tk := testkit.NewTestKit(c, s.store) - testAddIndexForGeneratedColumn(tk, s, c) -} - func (s *testDBSuite5) TestModifyGeneratedColumn(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("create database if not exists test;") @@ -5679,8 +4814,8 @@ func (s *testDBSuite4) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 ses } d := s.dom.DDL() originalCallback := d.GetHook() - defer d.(ddl.DDLForTest).SetHook(originalCallback) - d.(ddl.DDLForTest).SetHook(callback) + defer d.SetHook(originalCallback) + d.SetHook(callback) var wg util.WaitGroupWrapper var err1 error @@ -6089,9 +5224,9 @@ func (s *testSerialDBSuite) TestCommitTxnWithIndexChange(c *C) { } } originalCallback := do.GetHook() - do.(ddl.DDLForTest).SetHook(hook) + do.SetHook(hook) tk2.MustExec(curCase.idxDDL) - do.(ddl.DDLForTest).SetHook(originalCallback) + do.SetHook(originalCallback) tk2.MustExec("admin check table t1") for i, checkSQL := range curCase.checkSQLs { if len(curCase.rowsExps[i]) > 0 { @@ -6217,8 +5352,8 @@ func (s *testSerialDBSuite) TestColumnTypeChangeGenUniqueChangingName(c *C) { } d := s.dom.DDL() originHook := d.GetHook() - d.(ddl.DDLForTest).SetHook(hook) - defer d.(ddl.DDLForTest).SetHook(originHook) + d.SetHook(hook) + defer d.SetHook(originHook) tk.MustExec("create table if not exists t(c1 varchar(256), c2 bigint, `_col$_c2` varchar(10), unique _idx$_idx(c1), unique idx(c2));") tk.MustExec("alter table test.t change column c2 cC2 tinyint after `_col$_c2`") @@ -6273,7 +5408,7 @@ func (s *testSerialDBSuite) TestColumnTypeChangeGenUniqueChangingName(c *C) { } } } - d.(ddl.DDLForTest).SetHook(hook) + d.SetHook(hook) tk.MustExec("drop table if exists t") tk.MustExec("create table if not exists t(c1 bigint, _col$_c1 bigint, _col$__col$_c1_0 bigint, _col$__col$__col$_c1_0_0 bigint)") @@ -6457,8 +5592,8 @@ func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) { hook := &ddl.TestDDLCallback{} d := s.dom.DDL() originalHook := d.GetHook() - d.(ddl.DDLForTest).SetHook(hook) - defer d.(ddl.DDLForTest).SetHook(originalHook) + d.SetHook(hook) + defer d.SetHook(originalHook) // Test when cancelling cannot be retried and adding index succeeds. hook.OnJobRunBeforeExported = func(job *model.Job) { @@ -6537,7 +5672,7 @@ func testDropIndexes(c *C, store kv.Storage, lease time.Duration, createSQL, dro } jobIDExt, resetHook := setupJobIDExtCallback(ctx) defer resetHook() - testddlutil.SessionExecInGoroutine(store, dropIdxSQL, done) + testddlutil.SessionExecInGoroutine(store, "test_db", dropIdxSQL, done) ticker := time.NewTicker(lease / 2) defer ticker.Stop() @@ -6626,7 +5761,7 @@ func testCancelDropIndexes(c *C, store kv.Storage, d ddl.DDL) { } } originalHook := d.GetHook() - d.(ddl.DDLForTest).SetHook(hook) + d.SetHook(hook) ctx := tk.Se.(sessionctx.Context) for i := range testCases { testCase = &testCases[i] @@ -6661,7 +5796,7 @@ func testCancelDropIndexes(c *C, store kv.Storage, d ddl.DDL) { c.Assert(indexInfos, IsNil) } } - d.(ddl.DDLForTest).SetHook(originalHook) + d.SetHook(originalHook) tk.MustExec(addIdxesSQL) tk.MustExec(dropIdxesSQL) } @@ -6902,7 +6037,7 @@ func (s *testSerialDBSuite) TestAddGeneratedColumnAndInsert(c *C) { } } } - d.(ddl.DDLForTest).SetHook(hook) + d.SetHook(hook) tk.MustExec("alter table t1 add column gc int as ((a+1))") tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("4 5", "10 11")) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 298aa544d2606..1fdc99dc4e955 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -33,8 +33,6 @@ import ( ) type DDLForTest interface { - // SetHook sets the hook. - SetHook(h Callback) // SetInterceptor sets the interceptor. SetInterceptor(h Interceptor) } diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 57596043a6c8c..7b299664d176d 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -389,7 +389,7 @@ func TestAddIndexWorkerNum(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/checkBackfillWorkerNum")) }() - testutil.SessionExecInGoroutine(s.store, "create index c3_index on test_add_index (c3)", done) + testutil.SessionExecInGoroutine(s.store, "test_db", "create index c3_index on test_add_index (c3)", done) checkNum := 0 running := true diff --git a/ddl/modify_column_test.go b/ddl/modify_column_test.go index 3e6a740b5bed3..d97f2557c8a09 100644 --- a/ddl/modify_column_test.go +++ b/ddl/modify_column_test.go @@ -66,8 +66,6 @@ func TestModifyColumnReorgInfo(t *testing.T) { tk.MustQuery("split table t1 between (0) and (8192) regions 8;").Check(testkit.Rows("8 1")) tbl := tk.GetTableByName("test", "t1") - originalHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(originalHook) // Check insert null before job first update. hook := &ddl.TestDDLCallback{Do: dom} @@ -110,7 +108,7 @@ func TestModifyColumnReorgInfo(t *testing.T) { } } require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr", `return("cantDecodeRecordErr")`)) - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) err := tk.ExecToErr(sql) require.EqualError(t, err, "[ddl:8202]Cannot decode index value, because mock can't decode record error") require.NoError(t, checkErr) @@ -897,9 +895,7 @@ func TestModifyColumnTypeWhenInterception(t *testing.T) { } } } - originHook := d.GetHook() - d.(ddl.DDLForTest).SetHook(hook) - defer d.(ddl.DDLForTest).SetHook(originHook) + d.SetHook(hook) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockReorgTimeoutInOneRegion", `return(true)`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockReorgTimeoutInOneRegion")) @@ -973,8 +969,7 @@ func TestModifyColumnRollBack(t *testing.T) { } } - originalHook := dom.DDL().GetHook() - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) done := make(chan error, 1) go backgroundExecT(store, "alter table test.t1 change c2 c2 bigint not null;", done) @@ -989,6 +984,5 @@ func TestModifyColumnRollBack(t *testing.T) { } } require.False(t, mysql.HasNotNullFlag(c2.Flag)) - dom.DDL().(ddl.DDLForTest).SetHook(originalHook) tk.MustExec("drop table t1") } diff --git a/ddl/rollingback_test.go b/ddl/rollingback_test.go index 409e6f94e375e..f06ea93e49aac 100644 --- a/ddl/rollingback_test.go +++ b/ddl/rollingback_test.go @@ -80,7 +80,7 @@ func TestCancelAddIndexJobError(t *testing.T) { } } } - d.(ddl.DDLForTest).SetHook(hook) + d.SetHook(hook) // This will hang on stateDeleteOnly, and the job will be canceled. err := tk.ExecToErr("alter table t_cancel_add_index add index idx(a)") diff --git a/ddl/serial_test.go b/ddl/serial_test.go index 55fb44ecf932e..374cfc2abbd17 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -481,9 +481,7 @@ func TestCancelAddIndexPanic(t *testing.T) { checkErr = txn.Commit(context.Background()) } } - origHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(origHook) - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) rs, err := tk.Exec("alter table t add index idx_c2(c2)") if rs != nil { require.NoError(t, rs.Close()) @@ -664,9 +662,7 @@ func TestRecoverTableByJobIDFail(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr", `return(true)`)) } } - origHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(origHook) - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) // do recover table. tk.MustExec(fmt.Sprintf("recover table by job %d", jobID)) @@ -726,9 +722,7 @@ func TestRecoverTableByTableNameFail(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr", `return(true)`)) } } - origHook := dom.DDL().GetHook() - defer dom.DDL().(ddl.DDLForTest).SetHook(origHook) - dom.DDL().(ddl.DDLForTest).SetHook(hook) + dom.DDL().SetHook(hook) // do recover table. tk.MustExec("recover table t_recover") @@ -812,9 +806,7 @@ func TestCanceledJobTakeTime(t *testing.T) { require.NoError(t, err) }) } - origHook := dom.DDL().GetHook() - dom.DDL().(ddl.DDLForTest).SetHook(hook) - defer dom.DDL().(ddl.DDLForTest).SetHook(origHook) + dom.DDL().SetHook(hook) originalWT := ddl.GetWaitTimeWhenErrorOccurred() ddl.SetWaitTimeWhenErrorOccurred(1 * time.Second) diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index 3b4d2911f0bcb..b9d8d4fed3db1 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -28,8 +28,8 @@ import ( ) // SessionExecInGoroutine export for testing. -func SessionExecInGoroutine(s kv.Storage, sql string, done chan error) { - ExecMultiSQLInGoroutine(s, "test_db", []string{sql}, done) +func SessionExecInGoroutine(s kv.Storage, dbName, sql string, done chan error) { + ExecMultiSQLInGoroutine(s, dbName, []string{sql}, done) } // ExecMultiSQLInGoroutine exports for testing.