Skip to content

Commit

Permalink
ddl: support drop indexes for multi-schema change (#35883)
Browse files Browse the repository at this point in the history
ref #14766
  • Loading branch information
tangenta authored Jul 5, 2022
1 parent ff7c120 commit 0483c4c
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 64 deletions.
16 changes: 7 additions & 9 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,13 @@ var allTestCase = []testCancelJob{
{"alter table t_partition drop partition p6", false, model.StateDeleteReorganization, true, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
{"alter table t_partition drop partition p6", false, model.StateNone, true, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
// Drop indexes.
// TODO: fix schema state.
{"alter table t drop index mul_idx1, drop index mul_idx2", true, model.StateNone, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateWriteOnly, true, false, nil},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateWriteOnly, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteOnly, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteOnly, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteReorganization, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteReorganization, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", true, subStates{model.StatePublic, model.StatePublic}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateWriteOnly, model.StateWriteOnly}, true, false, nil},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateWriteOnly, model.StateWriteOnly}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteOnly, model.StateWriteOnly}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteOnly, model.StateWriteOnly}, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteReorganization, model.StateWriteOnly}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteReorganization, model.StateWriteOnly}, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
// Alter db placement.
{"alter database db_placement placement policy = 'alter_x'", true, model.StateNone, true, false, []string{"create placement policy alter_x PRIMARY_REGION=\"cn-east-1\", REGIONS=\"cn-east-1\";", "create database db_placement"}},
{"alter database db_placement placement policy = 'alter_x'", false, model.StatePublic, false, true, nil},
Expand Down Expand Up @@ -304,7 +303,6 @@ func TestCancel(t *testing.T) {
for _, prepareSQL := range tc.prepareSQL {
tk.MustExec(prepareSQL)
}

cancel = false
cancelWhenReorgNotStart = true
registHook(hook, true)
Expand Down
16 changes: 5 additions & 11 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,7 +3078,7 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
func allSupported(specs []*ast.AlterTableSpec) bool {
for _, s := range specs {
switch s.Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey:
default:
return false
}
Expand Down Expand Up @@ -3118,10 +3118,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn,
ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
useMultiSchemaChange = true
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
err = d.DropIndexes(sctx, ident, validSpecs)
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down Expand Up @@ -6124,19 +6123,14 @@ func (d *ddl) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: indexInfo.State,
TableName: t.Meta().Name.L,
Type: jobTp,
BinlogInfo: &model.HistoryInfo{},
SchemaState: indexInfo.State,
Args: []interface{}{indexName},
Args: []interface{}{indexName, ifExists},
}

err = d.DoDDLJob(ctx, job)
// index not exists, but if_exists flags is true, so we ignore this error.
if dbterror.ErrCantDropFieldOrKey.Equal(err) && ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package ddl_test

import (
Expand Down
6 changes: 4 additions & 2 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
case model.ActionAddIndex, model.ActionAddPrimaryKey:
tableID := job.TableID
var indexID int64
var ifExists bool
var partitionIDs []int64
if err := job.DecodeArgs(&indexID, &partitionIDs); err != nil {
if err := job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil {
return errors.Trace(err)
}
if len(partitionIDs) > 0 {
Expand All @@ -346,9 +347,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
var indexName interface{}
var ifExists bool
var indexID int64
var partitionIDs []int64
if err := job.DecodeArgs(&indexName, &indexID, &partitionIDs); err != nil {
if err := job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil {
return errors.Trace(err)
}
if len(partitionIDs) > 0 {
Expand Down
87 changes: 58 additions & 29 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -634,19 +635,23 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}

func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, indexInfo, err := checkDropIndex(t, job)
tblInfo, indexInfo, ifExists, err := checkDropIndex(t, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, nil
}
return ver, errors.Trace(err)
}
if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable {
return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Index"))
}

dependentHiddenCols := make([]*model.ColumnInfo, 0)
for _, indexColumn := range indexInfo.Columns {
if tblInfo.Columns[indexColumn.Offset].Hidden {
dependentHiddenCols = append(dependentHiddenCols, tblInfo.Columns[indexColumn.Offset])
}
if job.MultiSchemaInfo != nil && !job.IsRollingback() && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
job.SchemaState = indexInfo.State
return updateVersionAndTableInfo(d, t, job, tblInfo, false)
}

originalState := indexInfo.State
Expand Down Expand Up @@ -675,24 +680,11 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateDeleteReorganization:
// reorganization -> absent
indexInfo.State = model.StateNone
if len(dependentHiddenCols) > 0 {
firstHiddenOffset := dependentHiddenCols[0].Offset
for i := 0; i < len(dependentHiddenCols); i++ {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, firstHiddenOffset)
}
}
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexInfo.Name.L {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)
removeDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
panic("panic test in cancelling add index")
Expand Down Expand Up @@ -721,38 +713,75 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, error) {
func removeDependentHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffs := make([]int, 0)
for _, indexColumn := range idxInfo.Columns {
col := tblInfo.Columns[indexColumn.Offset]
if col.Hidden {
hiddenColOffs = append(hiddenColOffs, col.Offset)
}
}
// Sort the offset in descending order.
slices.SortFunc(hiddenColOffs, func(a, b int) bool { return a > b })
// Move all the dependent hidden columns to the end.
endOffset := len(tblInfo.Columns) - 1
for _, offset := range hiddenColOffs {
tblInfo.MoveColumnInfo(offset, endOffset)
}
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(hiddenColOffs)]
}

func removeIndexInfo(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
indices := tblInfo.Indices
offset := -1
for i, idx := range indices {
if idxInfo.ID == idx.ID {
offset = i
break
}
}
if offset == -1 {
// The target index has been removed.
return
}
// Remove the target index.
tblInfo.Indices = append(tblInfo.Indices[:offset], tblInfo.Indices[offset+1:]...)
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, false, errors.Trace(err)
}

var indexName model.CIStr
if err = job.DecodeArgs(&indexName); err != nil {
var ifExists bool
if err = job.DecodeArgs(&indexName, &ifExists); err != nil {
job.State = model.JobStateCancelled
return nil, nil, errors.Trace(err)
return nil, nil, false, errors.Trace(err)
}

indexInfo := tblInfo.FindIndexByName(indexName.L)
if indexInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
return nil, nil, ifExists, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

// Double check for drop index on auto_increment column.
err = checkDropIndexOnAutoIncrementColumn(tblInfo, indexInfo)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, autoid.ErrWrongAutoKey
return nil, nil, false, autoid.ErrWrongAutoKey
}

// Check that drop primary index will not cause invisible implicit primary index.
if err := checkInvisibleIndexesOnPK(tblInfo, []*model.IndexInfo{indexInfo}, job); err != nil {
return nil, nil, errors.Trace(err)
job.State = model.JobStateCancelled
return nil, nil, false, errors.Trace(err)
}

return tblInfo, indexInfo, nil
return tblInfo, indexInfo, false, nil
}

func onDropIndexes(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down
17 changes: 10 additions & 7 deletions ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,16 +886,18 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) {
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i3 doesn't exist"))

// Verify the impact of deletion order when dropping duplicate indexes.
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index if exists i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;",
errno.ErrUnsupportedDDLOperation,
)
tk.MustExec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;")
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i2 doesn't exist"))
}

func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) {
Expand All @@ -911,7 +913,8 @@ func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) {
}
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i2;")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table add index i1(c1)")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i1;")
tk.MustGetErrCode("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i1;",
errno.ErrUnsupportedDDLOperation)
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column c2;")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table add column c1 int")
tk.MustGetErrCode("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column if exists c1;",
Expand Down
3 changes: 3 additions & 0 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionDropColumn:
colName := job.Args[0].(model.CIStr)
info.DropColumns = append(info.DropColumns, colName)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(model.CIStr)
info.DropIndexes = append(info.DropIndexes, indexName)
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down
83 changes: 83 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,89 @@ func TestMultiSchemaChangeDropColumnsParallel(t *testing.T) {
})
}

func TestMultiSchemaChangeDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

// Test drop same index.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index t(a));")
tk.MustGetErrCode("alter table t drop index t, drop index t", errno.ErrUnsupportedDDLOperation)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int, c1 int, c2 int, primary key(id) nonclustered, key i1(c1), key i2(c2), key i3(c1, c2));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t drop index i1, drop index i2;")
tk.MustGetErrCode("select * from t use index(i1);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i2);", errno.ErrKeyDoesNotExist)
tk.MustExec("alter table t drop index i3, drop primary key;")
tk.MustGetErrCode("select * from t use index(primary);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i3);", errno.ErrKeyDoesNotExist)

// Test drop index with drop column.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int default 1, b int default 2, c int default 3, index t(a))")
tk.MustExec("insert into t values ();")
tk.MustExec("alter table t drop index t, drop column a")
tk.MustGetErrCode("select * from t force index(t)", errno.ErrKeyDoesNotExist)
}

func TestMultiSchemaChangeDropIndexesCancelled(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
originHook := dom.DDL().GetHook()

// Test for cancelling the job in a middle state.
tk.MustExec("create table t (a int, b int, index(a), unique index(b), index idx(a, b));")
hook := newCancelJobHook(store, dom, func(job *model.Job) bool {
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StateDeleteOnly
})
dom.DDL().SetHook(hook)
tk.MustExec("alter table t drop index a, drop index b, drop index idx;")
dom.DDL().SetHook(originHook)
hook.MustCancelFailed(t)
tk.MustGetErrCode("select * from t use index (a);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index (b);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index (idx);", errno.ErrKeyDoesNotExist)

// Test for cancelling the job in none state.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, index(a), unique index(b), index idx(a, b));")
hook = newCancelJobHook(store, dom, func(job *model.Job) bool {
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StatePublic
})
dom.DDL().SetHook(hook)
tk.MustGetErrCode("alter table t drop index a, drop index b, drop index idx;", errno.ErrCancelledDDLJob)
dom.DDL().SetHook(originHook)
hook.MustCancelDone(t)
tk.MustQuery("select * from t use index (a);").Check(testkit.Rows())
tk.MustQuery("select * from t use index (b);").Check(testkit.Rows())
tk.MustQuery("select * from t use index (idx);").Check(testkit.Rows())
}

func TestMultiSchemaChangeDropIndexesParallel(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int, b int, c int, index(a), index(b), index(c));")
putTheSameDDLJobTwice(t, func() {
tk.MustExec("alter table t drop index if exists b, drop index if exists c;")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Note 1091 index b doesn't exist",
"Note 1091 index c doesn't exist"))
})
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index (a), index(b), index(c));")
putTheSameDDLJobTwice(t, func() {
tk.MustGetErrCode("alter table t drop index b, drop index a;", errno.ErrCantDropFieldOrKey)
})
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
Loading

0 comments on commit 0483c4c

Please sign in to comment.