Skip to content

Commit

Permalink
DDL: support drop partition on the partitioned table with global inde…
Browse files Browse the repository at this point in the history
…xes (pingcap#19222)
  • Loading branch information
ldeng-ustc authored Oct 19, 2020
1 parent 62190f3 commit 937949f
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 93 deletions.
15 changes: 13 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type backfillWorkerType byte
const (
typeAddIndexWorker backfillWorkerType = 0
typeUpdateColumnWorker backfillWorkerType = 1
typeCleanUpIndexWorker backfillWorkerType = 2
)

func (bWT backfillWorkerType) String() string {
Expand All @@ -52,6 +53,8 @@ func (bWT backfillWorkerType) String() string {
return "add index"
case typeUpdateColumnWorker:
return "update column"
case typeCleanUpIndexWorker:
return "clean up index"
default:
return "unknown"
}
Expand Down Expand Up @@ -512,16 +515,24 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
sessCtx := newContext(reorgInfo.d.store)
sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

if bfWorkerType == typeAddIndexWorker {
switch bfWorkerType {
case typeAddIndexWorker:
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker)
} else {
case typeUpdateColumnWorker:
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker)
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker)
default:
return errors.New("unknow backfill type")
}
}
// Shrink the worker size.
Expand Down
39 changes: 39 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -1032,6 +1033,44 @@ func (s *testIntegrationSuite5) TestMultiPartitionDropAndTruncate(c *C) {
result.Check(testkit.Rows(`2010`))
}

func (s *testIntegrationSuite7) TestDropPartitionWithGlobalIndex(c *C) {
config.UpdateGlobal(func(conf *config.Config) {
conf.EnableGlobalIndex = true
})
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists test_global")
tk.MustExec(`create table test_global ( a int, b int, c int)
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20)
);`)
t := testGetTableByName(c, s.ctx, "test", "test_global")
pid := t.Meta().Partition.Definitions[1].ID

tk.MustExec("Alter Table test_global Add Unique Index idx_b (b);")
tk.MustExec("Alter Table test_global Add Unique Index idx_c (c);")
tk.MustExec(`INSERT INTO test_global VALUES (1, 1, 1), (2, 2, 2), (11, 3, 3), (12, 4, 4)`)

tk.MustExec("alter table test_global drop partition p2;")
result := tk.MustQuery("select * from test_global;")
result.Sort().Check(testkit.Rows(`1 1 1`, `2 2 2`))

t = testGetTableByName(c, s.ctx, "test", "test_global")
idxInfo := t.Meta().FindIndexByName("idx_b")
c.Assert(idxInfo, NotNil)
cnt := checkGlobalIndexCleanUpDone(c, s.ctx, t.Meta(), idxInfo, pid)
c.Assert(cnt, Equals, 2)

idxInfo = t.Meta().FindIndexByName("idx_c")
c.Assert(idxInfo, NotNil)
cnt = checkGlobalIndexCleanUpDone(c, s.ctx, t.Meta(), idxInfo, pid)
c.Assert(cnt, Equals, 2)
config.UpdateGlobal(func(conf *config.Config) {
conf.EnableGlobalIndex = false
})
}

func (s *testIntegrationSuite7) TestAlterTableExchangePartition(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
26 changes: 26 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,32 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
c.Assert(handles, HasLen, 0, Commentf("take time %v", time.Since(startTime)))
}

func checkGlobalIndexCleanUpDone(c *C, ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, pid int64) int {
c.Assert(ctx.NewTxn(context.Background()), IsNil)
txn, err := ctx.Txn(true)
c.Assert(err, IsNil)
defer txn.Rollback()

cnt := 0
prefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idxInfo.ID)
it, err := txn.Iter(prefix, nil)
c.Assert(err, IsNil)
for it.Valid() {
if !it.Key().HasPrefix(prefix) {
break
}
segs := tablecodec.SplitIndexValue(it.Value())
c.Assert(segs.PartitionID, NotNil)
_, pi, err := codec.DecodeInt(segs.PartitionID)
c.Assert(err, IsNil)
c.Assert(pi, Not(Equals), pid)
cnt++
err = it.Next()
c.Assert(err, IsNil)
}
return cnt
}

func (s *testDBSuite5) TestAlterPrimaryKey(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table test_add_pk(a int, b int unsigned , c varchar(255) default 'abc', d int as (a+b), e int as (a+1) stored, index idx(b))")
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
ver, err = onDropTableOrView(t, job)
case model.ActionDropTablePartition:
ver, err = onDropTablePartition(d, t, job)
ver, err = w.onDropTablePartition(d, t, job)
case model.ActionTruncateTablePartition:
ver, err = onTruncateTablePartition(d, t, job)
case model.ActionExchangeTablePartition:
Expand Down Expand Up @@ -666,7 +666,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.ActionModifyTableAutoIdCache:
ver, err = onModifyTableAutoIDCache(t, job)
case model.ActionAddTablePartition:
ver, err = onAddTablePartition(d, t, job)
ver, err = w.onAddTablePartition(d, t, job)
case model.ActionModifyTableCharsetAndCollate:
ver, err = onModifyTableCharsetAndCollate(t, job)
case model.ActionRecoverTable:
Expand Down
Loading

0 comments on commit 937949f

Please sign in to comment.