Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support drop indexes for multi-schema change #35883

Merged
merged 9 commits into from
Jul 5, 2022
16 changes: 7 additions & 9 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,13 @@ var allTestCase = []testCancelJob{
{"alter table t_partition drop partition p6", false, model.StateDeleteReorganization, true, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
{"alter table t_partition drop partition p6", false, model.StateNone, true, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
// Drop indexes.
// TODO: fix schema state.
{"alter table t drop index mul_idx1, drop index mul_idx2", true, model.StateNone, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateWriteOnly, true, false, nil},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateWriteOnly, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteOnly, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteOnly, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteReorganization, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, model.StateDeleteReorganization, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", true, subStates{model.StatePublic, model.StatePublic}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateWriteOnly, model.StateWriteOnly}, true, false, nil},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateWriteOnly, model.StateWriteOnly}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteOnly, model.StateWriteOnly}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteOnly, model.StateWriteOnly}, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteReorganization, model.StateWriteOnly}, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
{"alter table t drop index mul_idx1, drop index mul_idx2", false, subStates{model.StateDeleteReorganization, model.StateWriteOnly}, false, true, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
// Alter db placement.
{"alter database db_placement placement policy = 'alter_x'", true, model.StateNone, true, false, []string{"create placement policy alter_x PRIMARY_REGION=\"cn-east-1\", REGIONS=\"cn-east-1\";", "create database db_placement"}},
{"alter database db_placement placement policy = 'alter_x'", false, model.StatePublic, false, true, nil},
Expand Down Expand Up @@ -304,7 +303,6 @@ func TestCancel(t *testing.T) {
for _, prepareSQL := range tc.prepareSQL {
tk.MustExec(prepareSQL)
}

cancel = false
cancelWhenReorgNotStart = true
registHook(hook, true)
Expand Down
16 changes: 5 additions & 11 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,7 +3078,7 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
func allSupported(specs []*ast.AlterTableSpec) bool {
for _, s := range specs {
switch s.Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex:
tangenta marked this conversation as resolved.
Show resolved Hide resolved
default:
return false
}
Expand Down Expand Up @@ -3119,10 +3119,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn,
ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
useMultiSchemaChange = true
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
err = d.DropIndexes(sctx, ident, validSpecs)
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down Expand Up @@ -6124,19 +6123,14 @@ func (d *ddl) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: indexInfo.State,
TableName: t.Meta().Name.L,
Type: jobTp,
BinlogInfo: &model.HistoryInfo{},
SchemaState: indexInfo.State,
Args: []interface{}{indexName},
Args: []interface{}{indexName, ifExists},
}

err = d.DoDDLJob(ctx, job)
// index not exists, but if_exists flags is true, so we ignore this error.
if dbterror.ErrCantDropFieldOrKey.Equal(err) && ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package ddl_test

import (
Expand Down
6 changes: 4 additions & 2 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
case model.ActionAddIndex, model.ActionAddPrimaryKey:
tableID := job.TableID
var indexID int64
var ifExists bool
var partitionIDs []int64
if err := job.DecodeArgs(&indexID, &partitionIDs); err != nil {
if err := job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil {
return errors.Trace(err)
}
if len(partitionIDs) > 0 {
Expand All @@ -343,9 +344,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
var indexName interface{}
var ifExists bool
var indexID int64
var partitionIDs []int64
if err := job.DecodeArgs(&indexName, &indexID, &partitionIDs); err != nil {
if err := job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil {
return errors.Trace(err)
}
if len(partitionIDs) > 0 {
Expand Down
89 changes: 60 additions & 29 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl

import (
"context"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -634,19 +635,23 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}

func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, indexInfo, err := checkDropIndex(t, job)
tblInfo, indexInfo, ifExists, err := checkDropIndex(t, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, nil
}
return ver, errors.Trace(err)
}
if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable {
return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Index"))
}

dependentHiddenCols := make([]*model.ColumnInfo, 0)
for _, indexColumn := range indexInfo.Columns {
if tblInfo.Columns[indexColumn.Offset].Hidden {
dependentHiddenCols = append(dependentHiddenCols, tblInfo.Columns[indexColumn.Offset])
}
if job.MultiSchemaInfo != nil && !job.IsRollingback() && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
job.SchemaState = indexInfo.State
return updateVersionAndTableInfo(d, t, job, tblInfo, false)
}

originalState := indexInfo.State
Expand Down Expand Up @@ -675,24 +680,11 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateDeleteReorganization:
// reorganization -> absent
indexInfo.State = model.StateNone
if len(dependentHiddenCols) > 0 {
firstHiddenOffset := dependentHiddenCols[0].Offset
for i := 0; i < len(dependentHiddenCols); i++ {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, firstHiddenOffset)
}
}
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexInfo.Name.L {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)
removeDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
panic("panic test in cancelling add index")
Expand Down Expand Up @@ -721,38 +713,77 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, error) {
func removeDependentHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffs := make([]int, 0)
for _, indexColumn := range idxInfo.Columns {
col := tblInfo.Columns[indexColumn.Offset]
if col.Hidden {
hiddenColOffs = append(hiddenColOffs, col.Offset)
}
}
// Sort the offset in descending order.
sort.Slice(hiddenColOffs, func(i, j int) bool {
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
return hiddenColOffs[i] > hiddenColOffs[j]
})
// Move all the dependent hidden columns to the end.
endOffset := len(tblInfo.Columns) - 1
for _, offset := range hiddenColOffs {
tblInfo.MoveColumnInfo(offset, endOffset)
}
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(hiddenColOffs)]
}

func removeIndexInfo(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
indices := tblInfo.Indices
offset := -1
for i, idx := range indices {
if idxInfo.ID == idx.ID {
offset = i
break
}
}
if offset == -1 {
// The target index has been removed.
return
}
// Remove the target index.
tblInfo.Indices = append(tblInfo.Indices[:offset], tblInfo.Indices[offset+1:]...)
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, false, errors.Trace(err)
}

var indexName model.CIStr
if err = job.DecodeArgs(&indexName); err != nil {
var ifExists bool
if err = job.DecodeArgs(&indexName, &ifExists); err != nil {
job.State = model.JobStateCancelled
return nil, nil, errors.Trace(err)
return nil, nil, false, errors.Trace(err)
}

indexInfo := tblInfo.FindIndexByName(indexName.L)
if indexInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
return nil, nil, ifExists, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

// Double check for drop index on auto_increment column.
err = checkDropIndexOnAutoIncrementColumn(tblInfo, indexInfo)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, autoid.ErrWrongAutoKey
return nil, nil, false, autoid.ErrWrongAutoKey
}

// Check that drop primary index will not cause invisible implicit primary index.
if err := checkInvisibleIndexesOnPK(tblInfo, []*model.IndexInfo{indexInfo}, job); err != nil {
return nil, nil, errors.Trace(err)
job.State = model.JobStateCancelled
return nil, nil, false, errors.Trace(err)
}

return tblInfo, indexInfo, nil
return tblInfo, indexInfo, false, nil
}

func onDropIndexes(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down
17 changes: 10 additions & 7 deletions ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,16 +886,18 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) {
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i3 doesn't exist"))

// Verify the impact of deletion order when dropping duplicate indexes.
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index if exists i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;",
errno.ErrUnsupportedDDLOperation,
)
tk.MustExec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;")
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i2 doesn't exist"))
}

func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) {
Expand All @@ -911,7 +913,8 @@ func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) {
}
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i2;")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table add index i1(c1)")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i1;")
tk.MustGetErrCode("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i1;",
errno.ErrUnsupportedDDLOperation)
tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column c2;")
tk.MustExec("alter table test_drop_indexes_from_partitioned_table add column c1 int")
tk.MustGetErrCode("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column if exists c1;",
Expand Down
3 changes: 3 additions & 0 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionDropColumn:
colName := job.Args[0].(model.CIStr)
info.DropColumns = append(info.DropColumns, colName)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(model.CIStr)
info.DropIndexes = append(info.DropIndexes, indexName)
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down
83 changes: 83 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,89 @@ func TestMultiSchemaChangeDropColumnsParallel(t *testing.T) {
})
}

func TestMultiSchemaChangeDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

// Test drop same index.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index t(a));")
tk.MustGetErrCode("alter table t drop index t, drop index t", errno.ErrUnsupportedDDLOperation)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int, c1 int, c2 int, primary key(id) nonclustered, key i1(c1), key i2(c2), key i3(c1, c2));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t drop index i1, drop index i2;")
tk.MustGetErrCode("select * from t use index(i1);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i2);", errno.ErrKeyDoesNotExist)
tk.MustExec("alter table t drop index i3, drop primary key;")
tk.MustGetErrCode("select * from t use index(primary);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i3);", errno.ErrKeyDoesNotExist)

// Test drop index with drop column.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int default 1, b int default 2, c int default 3, index t(a))")
tk.MustExec("insert into t values ();")
tk.MustExec("alter table t drop index t, drop column a")
tk.MustGetErrCode("select * from t force index(t)", errno.ErrKeyDoesNotExist)
}

func TestMultiSchemaChangeDropIndexesCancelled(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
originHook := dom.DDL().GetHook()

// Test for cancelling the job in a middle state.
tk.MustExec("create table t (a int, b int, index(a), unique index(b), index idx(a, b));")
hook := newCancelJobHook(store, dom, func(job *model.Job) bool {
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StateDeleteOnly
})
dom.DDL().SetHook(hook)
tk.MustExec("alter table t drop index a, drop index b, drop index idx;")
dom.DDL().SetHook(originHook)
hook.MustCancelFailed(t)
tk.MustGetErrCode("select * from t use index (a);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index (b);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index (idx);", errno.ErrKeyDoesNotExist)

// Test for cancelling the job in none state.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, index(a), unique index(b), index idx(a, b));")
hook = newCancelJobHook(store, dom, func(job *model.Job) bool {
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StatePublic
})
dom.DDL().SetHook(hook)
tk.MustGetErrCode("alter table t drop index a, drop index b, drop index idx;", errno.ErrCancelledDDLJob)
dom.DDL().SetHook(originHook)
hook.MustCancelDone(t)
tk.MustQuery("select * from t use index (a);").Check(testkit.Rows())
tk.MustQuery("select * from t use index (b);").Check(testkit.Rows())
tk.MustQuery("select * from t use index (idx);").Check(testkit.Rows())
}

func TestMultiSchemaChangeDropIndexesParallel(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int, b int, c int, index(a), index(b), index(c));")
putTheSameDDLJobTwice(t, func() {
tk.MustExec("alter table t drop index if exists b, drop index if exists c;")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Note 1091 index b doesn't exist",
"Note 1091 index c doesn't exist"))
})
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index (a), index(b), index(c));")
putTheSameDDLJobTwice(t, func() {
tk.MustGetErrCode("alter table t drop index b, drop index a;", errno.ErrCantDropFieldOrKey)
})
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
Loading