Skip to content

Commit

Permalink
ddl: dynamic adjust add index worker number. (#8295) (#8786)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and zz-jason committed Dec 25, 2018
1 parent fae6021 commit d805e83
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 39 deletions.
62 changes: 62 additions & 0 deletions ddl/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@ package ddl_test

import (
"fmt"
"math/rand"
"sync/atomic"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand Down Expand Up @@ -238,3 +242,61 @@ func (s *testDBSuite) TestFailSchemaSyncer(c *C) {
_, err = tk.Exec("insert into t values(1)")
c.Assert(err, IsNil)
}

func (s *testDBSuite) TestAddIndexWorkerNum(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database if not exists test_db")
tk.MustExec("use test_db")
tk.MustExec("drop table if exists test_add_index")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")

done := make(chan error, 1)
start := -10
num := 4096
// first add some rows
for i := start; i < num; i++ {
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i)
tk.MustExec(sql)
}

is := s.dom.InfoSchema()
schemaName := model.NewCIStr("test_db")
tableName := model.NewCIStr("test_add_index")
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

splitCount := 100
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)

originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter()
lastSetWorkerCnt := originDDLAddIndexWorkerCnt
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
ddl.TestCheckWorkerNumber = lastSetWorkerCnt
defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt)

gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum")

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)
checkNum := 0

LOOP:
for {
select {
case err = <-done:
if err == nil {
break LOOP
}
c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err)))
case <-ddl.TestCheckWorkerNumCh:
lastSetWorkerCnt = int32(rand.Intn(8) + 8)
tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
checkNum++
}
}
c.Assert(checkNum, Greater, 5)
tk.MustExec("admin check table test_add_index")
tk.MustExec("drop table test_add_index")
}
111 changes: 72 additions & 39 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
if task.endIncluded {
rightParenthesis = "]"
}
log.Infof("[ddl-reorg] worker(%v), finish region %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
log.Infof("[ddl-reorg] worker(%v), finish table %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v",
w.id, task.physicalTableID, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds())

return result
Expand Down Expand Up @@ -1045,34 +1045,12 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker
return nil, nil
}

// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle),
// and send these tasks to add index workers, till we finish adding the indices.
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error {
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}
var (
// TestCheckWorkerNumCh use for test adjust add index worker.
TestCheckWorkerNumCh = make(chan struct{}, 0)
// TestCheckWorkerNumber use for test adjust add index worker.
TestCheckWorkerNumber = int32(16)
)

// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
Expand All @@ -1091,6 +1069,9 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
Expand All @@ -1099,16 +1080,68 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
return errors.Trace(err)
idxWorkers := make([]*addIndexWorker, 0, workerCnt)
defer func() {
closeAddIndexWorkers(idxWorkers)
}()

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

// For dynamic adjust add index worker number.
workerCnt = variable.GetDDLReorgWorkerCounter()
// If only have 1 range, we can only start 1 worker.
if len(kvRanges) < int(workerCnt) {
workerCnt = int32(len(kvRanges))
}
// Enlarge the worker size.
for i := len(idxWorkers); i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker.priority = job.Priority
idxWorkers = append(idxWorkers, idxWorker)
go idxWorkers[i].run(reorgInfo.d)
}
// Shrink the worker size.
if len(idxWorkers) > int(workerCnt) {
workers := idxWorkers[workerCnt:]
idxWorkers = idxWorkers[:workerCnt]
closeAddIndexWorkers(workers)
}

// gofail: var checkIndexWorkerNum bool
// if checkIndexWorkerNum {
// num := int(atomic.LoadInt32(&TestCheckWorkerNumber))
// if num != 0 {
// if num > len(kvRanges) {
// if len(idxWorkers) != len(kvRanges) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// } else if num != len(idxWorkers) {
// return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers))
// }
// TestCheckWorkerNumCh <- struct{}{}
// }
//}

log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle)
remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
break
}
startHandle, _, err = decodeHandleRange(remains[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}

// addTableIndex handles the add index reorganization state for a table.
Expand Down

0 comments on commit d805e83

Please sign in to comment.